From 6c6c090e79bca99bc1a5b3370adbaf024948f795 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Mon, 4 Aug 2014 18:06:55 +0300 Subject: [PATCH] test: add simple full test via identity channel --- amodem/recv.py | 7 ++++--- amodem/send.py | 16 ++++++++-------- amodem/test/test_common.py | 6 ++++++ amodem/test/test_full.py | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 51 insertions(+), 11 deletions(-) create mode 100644 amodem/test/test_full.py diff --git a/amodem/recv.py b/amodem/recv.py index f69be02..08866fa 100755 --- a/amodem/recv.py +++ b/amodem/recv.py @@ -242,8 +242,7 @@ def main(args): log.info('Running MODEM @ {:.1f} kbps'.format(modem.modem_bps / 1e3)) - fd = sys.stdin - signal = stream.iread(fd) + signal = stream.iread(args.input) skipped = common.take(signal, args.skip) log.debug('Skipping first %.3f seconds', len(skipped) / float(modem.baud)) @@ -254,7 +253,7 @@ def main(args): bits = receive(signal, modem.freqs, gain=1.0/amplitude) try: for chunk in decode(bits): - sys.stdout.write(chunk) + args.output.write(chunk) size = size + len(chunk) except Exception: log.exception('Decoding failed') @@ -283,6 +282,8 @@ if __name__ == '__main__': p = argparse.ArgumentParser() p.add_argument('--skip', type=int, default=100, help='skip initial N samples, due to spurious spikes') + p.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin) + p.add_argument('-o', '--output', type=argparse.FileType('w'), default=sys.stdout) args = p.parse_args() try: main(args) diff --git a/amodem/send.py b/amodem/send.py index e04742a..6322224 100755 --- a/amodem/send.py +++ b/amodem/send.py @@ -85,29 +85,27 @@ def main(args): import ecc log.info('Running MODEM @ {:.1f} kbps'.format(modem.modem_bps / 1e3)) - fd = sys.stdout - # padding audio with silence - writer.write(fd, np.zeros(int(config.Fs * args.silence_start))) + writer.write(args.output, np.zeros(int(config.Fs * args.silence_start))) - start(fd, sym.carrier[config.carrier_index]) + start(args.output, sym.carrier[config.carrier_index]) for c in sym.carrier: - training(fd, c) + training(args.output, c) training_size = writer.offset log.info('%.3f seconds of training audio', training_size / wave.bytes_per_second) - reader = Reader(sys.stdin, 64 << 10) + reader = Reader(args.input, 64 << 10) data = itertools.chain.from_iterable(reader) encoded = itertools.chain.from_iterable(ecc.encode(data)) - modulate(fd, bits=common.to_bits(encoded)) + modulate(args.output, bits=common.to_bits(encoded)) data_size = writer.offset - training_size log.info('%.3f seconds of data audio, for %.3f kB of data', data_size / wave.bytes_per_second, reader.total / 1e3) # padding audio with silence - writer.write(fd, np.zeros(int(config.Fs * args.silence_stop))) + writer.write(args.output, np.zeros(int(config.Fs * args.silence_stop))) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, @@ -117,5 +115,7 @@ if __name__ == '__main__': p = argparse.ArgumentParser() p.add_argument('--silence-start', type=float, default=1.0) p.add_argument('--silence-stop', type=float, default=1.0) + p.add_argument('-i', '--input', type=argparse.FileType('r'), default=sys.stdin) + p.add_argument('-o', '--output', type=argparse.FileType('w'), default=sys.stdout) args = p.parse_args() main(args) diff --git a/amodem/test/test_common.py b/amodem/test/test_common.py index bff44be..6eadb10 100644 --- a/amodem/test/test_common.py +++ b/amodem/test/test_common.py @@ -45,3 +45,9 @@ def test_icapture(): z.append(i) assert x == y assert x == z + + +def test_dumps_loads(): + x = np.array([.1, .4, .2, .6, .3, .5]) + y = common.loads(common.dumps(x * 1j)) + assert all(x == y) diff --git a/amodem/test/test_full.py b/amodem/test/test_full.py new file mode 100644 index 0000000..3468cf0 --- /dev/null +++ b/amodem/test/test_full.py @@ -0,0 +1,33 @@ +import os +from cStringIO import StringIO + +import send +import recv +import common + +class Args(object): + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + +def run(chan, size): + tx_data = os.urandom(size) + tx_audio = StringIO() + send.main(Args(silence_start=1, silence_stop=1, input=StringIO(tx_data), output=tx_audio)) + + data = tx_audio.getvalue() + data = common.loads(data) + data = chan(data) + data = common.dumps(data * 1j) + rx_audio = StringIO(data) + + rx_data = StringIO() + recv.main(Args(skip=100, input=rx_audio, output=rx_data)) + rx_data = rx_data.getvalue() + + assert rx_data == tx_data + +def test_small(): + run(chan=lambda x: x, size=1024) + +def test_large(): + run(chan=lambda x: x, size=64 << 10)