mirror of
https://github.com/romanz/amodem.git
synced 2026-04-22 06:16:26 +08:00
calib: remove AttributeHolder
This commit is contained in:
@@ -72,10 +72,10 @@ def detector(config, src, frame_length=200):
|
|||||||
else:
|
else:
|
||||||
msg = 'too {0} signal'.format(errors[flags.index(False)])
|
msg = 'too {0} signal'.format(errors[flags.index(False)])
|
||||||
|
|
||||||
yield common.AttributeHolder(dict(
|
yield dict(
|
||||||
freq=freq, rms=rms, peak=peak, coherency=coherency,
|
freq=freq, rms=rms, peak=peak, coherency=coherency,
|
||||||
total=total, success=success, msg=msg
|
total=total, success=success, msg=msg
|
||||||
))
|
)
|
||||||
|
|
||||||
|
|
||||||
def volume_calibration(result_iterator, volume_ctl):
|
def volume_calibration(result_iterator, volume_ctl):
|
||||||
@@ -90,7 +90,7 @@ def volume_calibration(result_iterator, volume_ctl):
|
|||||||
for index, result in enumerate(itertools.chain([None], result_iterator)):
|
for index, result in enumerate(itertools.chain([None], result_iterator)):
|
||||||
if index % iters_per_update == 0:
|
if index % iters_per_update == 0:
|
||||||
if index > 0: # skip dummy (first result)
|
if index > 0: # skip dummy (first result)
|
||||||
sign = 1 if (result.total < target_level) else -1
|
sign = 1 if (result['total'] < target_level) else -1
|
||||||
level = level + step * sign
|
level = level + step * sign
|
||||||
level = min(max(level, min_level), max_level)
|
level = min(max(level, min_level), max_level)
|
||||||
step = step * 0.5
|
step = step * 0.5
|
||||||
@@ -112,10 +112,11 @@ def iter_window(iterable, size):
|
|||||||
|
|
||||||
|
|
||||||
def recv(config, src, verbose=False, volume_cmd=None, dump_audio=None):
|
def recv(config, src, verbose=False, volume_cmd=None, dump_audio=None):
|
||||||
fmt = '{0.freq:6.0f} Hz: {0.msg:20s}'
|
fmt = '{freq:6.0f} Hz: {msg:20s}'
|
||||||
|
log.info('verbose: %s', verbose)
|
||||||
if verbose:
|
if verbose:
|
||||||
fields = ['total', 'rms', 'coherency', 'peak']
|
fields = ['total', 'rms', 'coherency', 'peak']
|
||||||
fmt += ', '.join('{0}={{0.{0}:.4f}}'.format(f) for f in fields)
|
fmt += ', '.join('{0}={{{0}:.4f}}'.format(f) for f in fields)
|
||||||
|
|
||||||
volume_ctl = volume_controller(volume_cmd)
|
volume_ctl = volume_controller(volume_cmd)
|
||||||
|
|
||||||
@@ -125,6 +126,8 @@ def recv(config, src, verbose=False, volume_cmd=None, dump_audio=None):
|
|||||||
result_iterator = volume_calibration(result_iterator, volume_ctl)
|
result_iterator = volume_calibration(result_iterator, volume_ctl)
|
||||||
for _prev, curr, _next in iter_window(result_iterator, size=3):
|
for _prev, curr, _next in iter_window(result_iterator, size=3):
|
||||||
# don't log errors during frequency changes
|
# don't log errors during frequency changes
|
||||||
if _prev.success and _next.success and _prev.freq != _next.freq:
|
if _prev['success'] and _next['success']:
|
||||||
curr.msg = curr.msg if curr.success else 'frequency change'
|
if _prev['freq'] != _next['freq']:
|
||||||
log.info(fmt.format(curr))
|
if not curr['success']:
|
||||||
|
curr['msg'] = 'frequency change'
|
||||||
|
log.info(fmt.format(**curr))
|
||||||
|
|||||||
@@ -85,14 +85,3 @@ class Dummy(object):
|
|||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
def __call__(self, *args, **kwargs):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
||||||
class AttributeHolder(object):
|
|
||||||
|
|
||||||
def __init__(self, d):
|
|
||||||
self.__dict__.update(d)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
items = sorted(self.__dict__.items())
|
|
||||||
args = ', '.join('{0}={1}'.format(k, v) for k, v in items)
|
|
||||||
return '{0}({1})'.format(self.__class__.__name__, args)
|
|
||||||
|
|||||||
@@ -39,8 +39,8 @@ def test_too_strong():
|
|||||||
calib.send(config, p, gain=1.001, limit=32)
|
calib.send(config, p, gain=1.001, limit=32)
|
||||||
p.buf.seek(0)
|
p.buf.seek(0)
|
||||||
for r in calib.detector(config, src=p):
|
for r in calib.detector(config, src=p):
|
||||||
assert not r.success
|
assert not r['success']
|
||||||
assert r.msg == 'too strong signal'
|
assert r['msg'] == 'too strong signal'
|
||||||
|
|
||||||
|
|
||||||
def test_too_weak():
|
def test_too_weak():
|
||||||
@@ -48,8 +48,8 @@ def test_too_weak():
|
|||||||
calib.send(config, p, gain=0.01, limit=32)
|
calib.send(config, p, gain=0.01, limit=32)
|
||||||
p.buf.seek(0)
|
p.buf.seek(0)
|
||||||
for r in calib.detector(config, src=p):
|
for r in calib.detector(config, src=p):
|
||||||
assert not r.success
|
assert not r['success']
|
||||||
assert r.msg == 'too weak signal'
|
assert r['msg'] == 'too weak signal'
|
||||||
|
|
||||||
|
|
||||||
def test_too_noisy():
|
def test_too_noisy():
|
||||||
@@ -57,8 +57,8 @@ def test_too_noisy():
|
|||||||
signal = np.array([r.choice([-1, 1]) for i in range(int(config.Fs))])
|
signal = np.array([r.choice([-1, 1]) for i in range(int(config.Fs))])
|
||||||
src = BytesIO(common.dumps(signal * 0.5))
|
src = BytesIO(common.dumps(signal * 0.5))
|
||||||
for r in calib.detector(config, src=src):
|
for r in calib.detector(config, src=src):
|
||||||
assert not r.success
|
assert not r['success']
|
||||||
assert r.msg == 'too noisy signal'
|
assert r['msg'] == 'too noisy signal'
|
||||||
|
|
||||||
|
|
||||||
def test_errors():
|
def test_errors():
|
||||||
@@ -94,9 +94,9 @@ def test_drift(freq_err):
|
|||||||
src = BytesIO(common.dumps(signal))
|
src = BytesIO(common.dumps(signal))
|
||||||
iters = 0
|
iters = 0
|
||||||
for r in calib.detector(config, src, frame_length=frame_length):
|
for r in calib.detector(config, src, frame_length=frame_length):
|
||||||
assert r.success is True
|
assert r['success'] is True
|
||||||
assert abs(r.rms - rms) < 1e-3
|
assert abs(r['rms'] - rms) < 1e-3
|
||||||
assert abs(r.total - rms) < 1e-3
|
assert abs(r['total'] - rms) < 1e-3
|
||||||
iters += 1
|
iters += 1
|
||||||
|
|
||||||
assert iters > 0
|
assert iters > 0
|
||||||
|
|||||||
@@ -54,14 +54,6 @@ def test_izip():
|
|||||||
assert list(common.izip([x, y])) == list(zip(x, y))
|
assert list(common.izip([x, y])) == list(zip(x, y))
|
||||||
|
|
||||||
|
|
||||||
def test_holder():
|
|
||||||
d = {'x': 1, 'y': 2.3}
|
|
||||||
a = common.AttributeHolder(d)
|
|
||||||
assert a.x == d['x']
|
|
||||||
assert a.y == d['y']
|
|
||||||
assert repr(a) == 'AttributeHolder(x=1, y=2.3)'
|
|
||||||
|
|
||||||
|
|
||||||
def test_configs():
|
def test_configs():
|
||||||
default = config.Configuration()
|
default = config.Configuration()
|
||||||
fastest = config.fastest()
|
fastest = config.fastest()
|
||||||
|
|||||||
Reference in New Issue
Block a user