calib: print special message for frequency changes.

This commit is contained in:
Roman Zeyde
2015-01-19 10:56:15 +02:00
parent 323145c44b
commit 066c27843e
2 changed files with 60 additions and 22 deletions

View File

@@ -20,20 +20,17 @@ def volume_controller(cmd):
return controller if cmd else (lambda level: None)
def send(config, dst, volume_cmd=None):
def send(config, dst, volume_cmd=None, gain=1.0, limit=None):
volume_ctl = volume_controller(volume_cmd)
volume_ctl(1.0) # full scale output volume
calibration_symbols = int(1.0 * config.Fs)
t = np.arange(0, calibration_symbols) * config.Ts
signals = [np.sin(2 * np.pi * f * t) for f in config.frequencies]
signals = [gain * np.sin(2 * np.pi * f * t) for f in config.frequencies]
signals = [common.dumps(s) for s in signals]
try:
for signal in itertools.cycle(signals):
dst.write(signal)
except KeyboardInterrupt:
pass
for signal in itertools.islice(itertools.cycle(signals), limit):
dst.write(signal)
def frame_iter(config, src, frame_length):
@@ -70,13 +67,13 @@ def detector(config, src, frame_length=200):
success = all(flags)
if success:
message = 'good signal'
msg = 'good signal'
else:
message = 'too {0} signal'.format(errors[flags.index(False)])
msg = 'too {0} signal'.format(errors[flags.index(False)])
yield common.AttributeHolder(dict(
freq=freq, rms=rms, peak=peak, coherency=coherency,
total=total, success=success, message=message
total=total, success=success, msg=msg
))
@@ -103,17 +100,29 @@ def volume_calibration(result_iterator, volume_ctl):
yield result
def iter_window(iterable, size):
block = []
while True:
item = next(iterable)
block.append(item)
block = block[-size:]
if len(block) == size:
yield block
def recv(config, src, verbose=False, volume_cmd=None):
fmt = '{0.freq:6.0f} Hz: {0.message:20s}'
fmt = '{0.freq:6.0f} Hz: {0.msg:20s}'
if verbose:
fields = ['total', 'rms', 'coherency', 'peak']
fmt += ', '.join('{0}={{0.{0}:.4f}}'.format(f) for f in fields)
result_iterator = detector(config=config, src=src)
volume_ctl = volume_controller(volume_cmd)
try:
for result in volume_calibration(result_iterator, volume_ctl):
log.info(fmt.format(result))
except KeyboardInterrupt:
pass
result_iterator = detector(config=config, src=src)
result_iterator = volume_calibration(result_iterator, volume_ctl)
result_iterator = iter_window(result_iterator, size=3)
for r in result_iterator:
# don't log errors during frequency changes
if r[0].success and r[2].success and r[0].freq != r[2].freq:
r[1].msg = r[1].msg if r[1].success else 'frequency change'
log.info(fmt.format(r[1]))

View File

@@ -6,6 +6,7 @@ config = config.fastest()
from io import BytesIO
import numpy as np
import random
import pytest
import mock
@@ -26,9 +27,8 @@ class ProcessMock(object):
pass
def write(self, data):
assert self.buf.tell() < 10e6
self.buf.write(data)
if self.buf.tell() > 1e6:
raise KeyboardInterrupt
def read(self, n):
return self.buf.read(n)
@@ -36,24 +36,53 @@ class ProcessMock(object):
def test_success():
p = ProcessMock()
calib.send(config, p)
calib.send(config, p, gain=0.5, limit=32)
p.buf.seek(0)
calib.recv(config, p)
def test_too_strong():
p = ProcessMock()
calib.send(config, p, gain=1.001, limit=32)
p.buf.seek(0)
for r in calib.detector(config, src=p):
assert not r.success
assert r.msg == 'too strong signal'
def test_too_weak():
p = ProcessMock()
calib.send(config, p, gain=0.01, limit=32)
p.buf.seek(0)
for r in calib.detector(config, src=p):
assert not r.success
assert r.msg == 'too weak signal'
def test_too_noisy():
r = random.Random(0) # generate random binary signal
signal = np.array([r.choice([-1, 1]) for i in range(int(config.Fs))])
src = BytesIO(common.dumps(signal * 0.5))
for r in calib.detector(config, src=src):
assert not r.success
assert r.msg == 'too noisy signal'
def test_errors():
class WriteError(ProcessMock):
def write(self, data):
raise KeyboardInterrupt()
p = WriteError()
calib.send(config, p)
with pytest.raises(KeyboardInterrupt):
calib.send(config, p, limit=32)
assert p.buf.tell() == 0
class ReadError(ProcessMock):
def read(self, n):
raise KeyboardInterrupt()
p = ReadError()
calib.recv(config, p, verbose=True)
with pytest.raises(KeyboardInterrupt):
calib.recv(config, p, verbose=True)
assert p.buf.tell() == 0