diff --git a/amodem/calib.py b/amodem/calib.py index 471b44e..299f5fb 100644 --- a/amodem/calib.py +++ b/amodem/calib.py @@ -20,20 +20,17 @@ def volume_controller(cmd): return controller if cmd else (lambda level: None) -def send(config, dst, volume_cmd=None): +def send(config, dst, volume_cmd=None, gain=1.0, limit=None): volume_ctl = volume_controller(volume_cmd) volume_ctl(1.0) # full scale output volume calibration_symbols = int(1.0 * config.Fs) t = np.arange(0, calibration_symbols) * config.Ts - signals = [np.sin(2 * np.pi * f * t) for f in config.frequencies] + signals = [gain * np.sin(2 * np.pi * f * t) for f in config.frequencies] signals = [common.dumps(s) for s in signals] - try: - for signal in itertools.cycle(signals): - dst.write(signal) - except KeyboardInterrupt: - pass + for signal in itertools.islice(itertools.cycle(signals), limit): + dst.write(signal) def frame_iter(config, src, frame_length): @@ -70,13 +67,13 @@ def detector(config, src, frame_length=200): success = all(flags) if success: - message = 'good signal' + msg = 'good signal' else: - message = 'too {0} signal'.format(errors[flags.index(False)]) + msg = 'too {0} signal'.format(errors[flags.index(False)]) yield common.AttributeHolder(dict( freq=freq, rms=rms, peak=peak, coherency=coherency, - total=total, success=success, message=message + total=total, success=success, msg=msg )) @@ -103,17 +100,29 @@ def volume_calibration(result_iterator, volume_ctl): yield result +def iter_window(iterable, size): + block = [] + while True: + item = next(iterable) + block.append(item) + block = block[-size:] + if len(block) == size: + yield block + + def recv(config, src, verbose=False, volume_cmd=None): - fmt = '{0.freq:6.0f} Hz: {0.message:20s}' + fmt = '{0.freq:6.0f} Hz: {0.msg:20s}' if verbose: fields = ['total', 'rms', 'coherency', 'peak'] fmt += ', '.join('{0}={{0.{0}:.4f}}'.format(f) for f in fields) - result_iterator = detector(config=config, src=src) volume_ctl = volume_controller(volume_cmd) - try: - for result in volume_calibration(result_iterator, volume_ctl): - log.info(fmt.format(result)) - except KeyboardInterrupt: - pass + result_iterator = detector(config=config, src=src) + result_iterator = volume_calibration(result_iterator, volume_ctl) + result_iterator = iter_window(result_iterator, size=3) + for r in result_iterator: + # don't log errors during frequency changes + if r[0].success and r[2].success and r[0].freq != r[2].freq: + r[1].msg = r[1].msg if r[1].success else 'frequency change' + log.info(fmt.format(r[1])) diff --git a/tests/test_calib.py b/tests/test_calib.py index fc1e61a..ceedd86 100644 --- a/tests/test_calib.py +++ b/tests/test_calib.py @@ -6,6 +6,7 @@ config = config.fastest() from io import BytesIO import numpy as np +import random import pytest import mock @@ -26,9 +27,8 @@ class ProcessMock(object): pass def write(self, data): + assert self.buf.tell() < 10e6 self.buf.write(data) - if self.buf.tell() > 1e6: - raise KeyboardInterrupt def read(self, n): return self.buf.read(n) @@ -36,24 +36,53 @@ class ProcessMock(object): def test_success(): p = ProcessMock() - calib.send(config, p) + calib.send(config, p, gain=0.5, limit=32) p.buf.seek(0) calib.recv(config, p) +def test_too_strong(): + p = ProcessMock() + calib.send(config, p, gain=1.001, limit=32) + p.buf.seek(0) + for r in calib.detector(config, src=p): + assert not r.success + assert r.msg == 'too strong signal' + + +def test_too_weak(): + p = ProcessMock() + calib.send(config, p, gain=0.01, limit=32) + p.buf.seek(0) + for r in calib.detector(config, src=p): + assert not r.success + assert r.msg == 'too weak signal' + + +def test_too_noisy(): + r = random.Random(0) # generate random binary signal + signal = np.array([r.choice([-1, 1]) for i in range(int(config.Fs))]) + src = BytesIO(common.dumps(signal * 0.5)) + for r in calib.detector(config, src=src): + assert not r.success + assert r.msg == 'too noisy signal' + + def test_errors(): class WriteError(ProcessMock): def write(self, data): raise KeyboardInterrupt() p = WriteError() - calib.send(config, p) + with pytest.raises(KeyboardInterrupt): + calib.send(config, p, limit=32) assert p.buf.tell() == 0 class ReadError(ProcessMock): def read(self, n): raise KeyboardInterrupt() p = ReadError() - calib.recv(config, p, verbose=True) + with pytest.raises(KeyboardInterrupt): + calib.recv(config, p, verbose=True) assert p.buf.tell() == 0