from amodem import calib from amodem import common from amodem import config config = config.fastest() from io import BytesIO import numpy as np import pytest class ProcessMock(object): def __init__(self): self.buf = BytesIO() self.stdin = self self.stdout = self self.bytes_per_sample = 2 def launch(self, *args, **kwargs): return self __call__ = launch def kill(self): pass def write(self, data): self.buf.write(data) if self.buf.tell() > 1e6: raise KeyboardInterrupt def read(self, n): return self.buf.read(n) def test_success(): p = ProcessMock() calib.send(config, p) p.buf.seek(0) calib.recv(config, p) def test_errors(): class WriteError(ProcessMock): def write(self, data): raise IOError() p = WriteError() calib.send(config, p) assert p.buf.tell() == 0 class ReadError(ProcessMock): def read(self, n): raise KeyboardInterrupt() p = ReadError() calib.recv(config, p, verbose=True) assert p.buf.tell() == 0 @pytest.fixture(params=[0] + [sign * mag for sign in (+1, -1) for mag in (0.1, 1, 10, 100, 1e3, 2e3)]) def freq_err(request): return request.param * 1e-6 def test_drift(freq_err): freq = config.Fc * (1 + freq_err / 1e6) t = np.arange(int(1.0 * config.Fs)) * config.Ts frame_length = 100 rms = 0.5 signal = rms * np.cos(2 * np.pi * freq * t) src = BytesIO(common.dumps(signal)) iters = 0 for r in calib.detector(config, src, frame_length=frame_length): assert not r.error assert abs(r.rms - rms) < 1e-3 assert abs(r.total - rms) < 1e-3 iters += 1 assert iters > 0 assert iters == config.baud / frame_length