From 03a83c9a4c23d9dff843a457b56a2d03946dac1f Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 8 Aug 2014 14:55:11 +0300 Subject: [PATCH] add test_calib.py --- amodem/calib.py | 8 +++++--- tests/test_calib.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 tests/test_calib.py diff --git a/amodem/calib.py b/amodem/calib.py index 52fe8af..cf911d1 100755 --- a/amodem/calib.py +++ b/amodem/calib.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +import sys import numpy as np from . import common @@ -10,6 +11,7 @@ Tsample = 1 t = np.arange(int(Tsample * config.Fs)) * config.Ts sig = np.exp(2j * np.pi * config.Fc * t) sig_dump = common.dumps(sig) +fmt = 'coherence={:.3f} amplitude={:.3f} phase={:+.1f} peak={:.3f}\n' def send(wave_play=wave.play): @@ -24,7 +26,7 @@ def send(wave_play=wave.play): p.kill() -def recv(wave_record=wave.record): +def recv(wave_record=wave.record, reporter=sys.stdout.write): p = wave_record('-', stdout=wave.sp.PIPE) try: while True: @@ -45,11 +47,11 @@ def recv(wave_record=wave.record): phase = np.angle(z) peak = np.max(np.abs(x)) - fmt = 'coherence={:.3f} amplitude={:.3f} phase={:+.1f} peak={:.3f}' - print(fmt.format(coherence, amplitude, phase * 180 / np.pi, peak)) + reporter(fmt.format(coherence, amplitude, phase * 180/np.pi, peak)) except KeyboardInterrupt: p.kill() + if __name__ == '__main__': import argparse p = argparse.ArgumentParser() diff --git a/tests/test_calib.py b/tests/test_calib.py new file mode 100644 index 0000000..308833e --- /dev/null +++ b/tests/test_calib.py @@ -0,0 +1,35 @@ +from amodem import calib + +from io import BytesIO + + +class ProcessMock(object): + def __init__(self): + self.buf = BytesIO() + self.stdin = self + self.stdout = self + + def __call__(self, *args, **kwargs): + return self + + def kill(self): + pass + + def write(self, data): + self.buf.write(data) + if self.buf.tell() > 1e6: + raise KeyboardInterrupt + + def read(self, n): + return self.buf.read(n) + + +def verify(msg): + assert msg == calib.fmt.format(1, 1, 0, 1) + + +def test(): + p = ProcessMock() + calib.send(p) + p.buf.seek(0) + calib.recv(p, reporter=verify)