mirror of
https://github.com/romanz/amodem.git
synced 2026-04-15 02:35:27 +08:00
calib: fix UTs
This commit is contained in:
@@ -11,6 +11,7 @@ from . import sampling
|
||||
|
||||
ALLOWED_EXCEPTIONS = (IOError, KeyboardInterrupt)
|
||||
|
||||
|
||||
def volume_controller(cmd):
|
||||
def controller(level):
|
||||
assert 0 < level <= 1
|
||||
@@ -83,8 +84,8 @@ def detector(config, src, frame_length=200):
|
||||
except ALLOWED_EXCEPTIONS:
|
||||
pass
|
||||
|
||||
def volume_calibration(result_iterator, volume_ctl):
|
||||
|
||||
def volume_calibration(result_iterator, volume_ctl):
|
||||
min_level = 0.01
|
||||
max_level = 1.0
|
||||
level = 0.5
|
||||
@@ -101,11 +102,12 @@ def volume_calibration(result_iterator, volume_ctl):
|
||||
level = min(max(level, min_level), max_level)
|
||||
step = step * 0.5
|
||||
|
||||
volume_ctl(level) # should run "before" first iteration
|
||||
volume_ctl(level) # should run "before" first actual iteration
|
||||
|
||||
if index > 0: # skip dummy (first result)
|
||||
yield result
|
||||
|
||||
|
||||
def recv(config, src, verbose=False, volume_cmd=None):
|
||||
fmt = '{0.freq:6.0f} Hz: {0.message:20s}'
|
||||
if verbose:
|
||||
@@ -113,19 +115,7 @@ def recv(config, src, verbose=False, volume_cmd=None):
|
||||
fmt += ', '.join('{0}={{0.{0}:.4f}}'.format(f) for f in fields)
|
||||
|
||||
result_iterator = detector(config=config, src=src)
|
||||
if volume_cmd:
|
||||
log.info('Using automatic calibration (via "%s")', volume_cmd)
|
||||
|
||||
volume_ctl = volume_controller(volume_cmd)
|
||||
|
||||
errors = []
|
||||
for result in volume_calibration(result_iterator, volume_ctl):
|
||||
errors.append(not result.success)
|
||||
errors = errors[-3:]
|
||||
|
||||
msg = fmt.format(result)
|
||||
if all(errors):
|
||||
log.error(msg)
|
||||
else:
|
||||
log.info(msg)
|
||||
|
||||
log.info(fmt.format(result))
|
||||
|
||||
@@ -7,6 +7,7 @@ from io import BytesIO
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
|
||||
class ProcessMock(object):
|
||||
@@ -71,10 +72,31 @@ def test_drift(freq_err):
|
||||
src = BytesIO(common.dumps(signal))
|
||||
iters = 0
|
||||
for r in calib.detector(config, src, frame_length=frame_length):
|
||||
assert not r.error
|
||||
assert r.success is True
|
||||
assert abs(r.rms - rms) < 1e-3
|
||||
assert abs(r.total - rms) < 1e-3
|
||||
iters += 1
|
||||
|
||||
assert iters > 0
|
||||
assert iters == config.baud / frame_length
|
||||
|
||||
|
||||
def test_volume():
|
||||
with mock.patch('subprocess.check_call') as check_call:
|
||||
ctl = calib.volume_controller('volume-control')
|
||||
ctl(0.01)
|
||||
ctl(0.421)
|
||||
ctl(0.369)
|
||||
ctl(1)
|
||||
assert check_call.mock_calls == [
|
||||
mock.call(shell=True, args='volume-control 1%'),
|
||||
mock.call(shell=True, args='volume-control 42%'),
|
||||
mock.call(shell=True, args='volume-control 37%'),
|
||||
mock.call(shell=True, args='volume-control 100%')
|
||||
]
|
||||
with pytest.raises(AssertionError):
|
||||
ctl(0)
|
||||
with pytest.raises(AssertionError):
|
||||
ctl(-0.5)
|
||||
with pytest.raises(AssertionError):
|
||||
ctl(12.3)
|
||||
|
||||
Reference in New Issue
Block a user