test: add simple full test via identity channel

This commit is contained in:
Roman Zeyde
2014-08-04 18:06:55 +03:00
parent 6e9b104aaf
commit 6c6c090e79
4 changed files with 51 additions and 11 deletions

View File

@@ -242,8 +242,7 @@ def main(args):
log.info('Running MODEM @ {:.1f} kbps'.format(modem.modem_bps / 1e3))
fd = sys.stdin
signal = stream.iread(fd)
signal = stream.iread(args.input)
skipped = common.take(signal, args.skip)
log.debug('Skipping first %.3f seconds', len(skipped) / float(modem.baud))
@@ -254,7 +253,7 @@ def main(args):
bits = receive(signal, modem.freqs, gain=1.0/amplitude)
try:
for chunk in decode(bits):
sys.stdout.write(chunk)
args.output.write(chunk)
size = size + len(chunk)
except Exception:
log.exception('Decoding failed')
@@ -283,6 +282,8 @@ if __name__ == '__main__':
p = argparse.ArgumentParser()
p.add_argument('--skip', type=int, default=100,
help='skip initial N samples, due to spurious spikes')
p.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin)
p.add_argument('-o', '--output', type=argparse.FileType('w'), default=sys.stdout)
args = p.parse_args()
try:
main(args)

View File

@@ -85,29 +85,27 @@ def main(args):
import ecc
log.info('Running MODEM @ {:.1f} kbps'.format(modem.modem_bps / 1e3))
fd = sys.stdout
# padding audio with silence
writer.write(fd, np.zeros(int(config.Fs * args.silence_start)))
writer.write(args.output, np.zeros(int(config.Fs * args.silence_start)))
start(fd, sym.carrier[config.carrier_index])
start(args.output, sym.carrier[config.carrier_index])
for c in sym.carrier:
training(fd, c)
training(args.output, c)
training_size = writer.offset
log.info('%.3f seconds of training audio',
training_size / wave.bytes_per_second)
reader = Reader(sys.stdin, 64 << 10)
reader = Reader(args.input, 64 << 10)
data = itertools.chain.from_iterable(reader)
encoded = itertools.chain.from_iterable(ecc.encode(data))
modulate(fd, bits=common.to_bits(encoded))
modulate(args.output, bits=common.to_bits(encoded))
data_size = writer.offset - training_size
log.info('%.3f seconds of data audio, for %.3f kB of data',
data_size / wave.bytes_per_second, reader.total / 1e3)
# padding audio with silence
writer.write(fd, np.zeros(int(config.Fs * args.silence_stop)))
writer.write(args.output, np.zeros(int(config.Fs * args.silence_stop)))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
@@ -117,5 +115,7 @@ if __name__ == '__main__':
p = argparse.ArgumentParser()
p.add_argument('--silence-start', type=float, default=1.0)
p.add_argument('--silence-stop', type=float, default=1.0)
p.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin)
p.add_argument('-o', '--output', type=argparse.FileType('w'), default=sys.stdout)
args = p.parse_args()
main(args)

View File

@@ -45,3 +45,9 @@ def test_icapture():
z.append(i)
assert x == y
assert x == z
def test_dumps_loads():
x = np.array([.1, .4, .2, .6, .3, .5])
y = common.loads(common.dumps(x * 1j))
assert all(x == y)

33
amodem/test/test_full.py Normal file
View File

@@ -0,0 +1,33 @@
import os
from cStringIO import StringIO
import send
import recv
import common
class Args(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def run(chan, size):
tx_data = os.urandom(size)
tx_audio = StringIO()
send.main(Args(silence_start=1, silence_stop=1, input=StringIO(tx_data), output=tx_audio))
data = tx_audio.getvalue()
data = common.loads(data)
data = chan(data)
data = common.dumps(data * 1j)
rx_audio = StringIO(data)
rx_data = StringIO()
recv.main(Args(skip=100, input=rx_audio, output=rx_data))
rx_data = rx_data.getvalue()
assert rx_data == tx_data
def test_small():
run(chan=lambda x: x, size=1024)
def test_large():
run(chan=lambda x: x, size=64 << 10)