mirror of
https://github.com/romanz/amodem.git
synced 2026-04-19 12:46:00 +08:00
don't use global configuration
This commit is contained in:
@@ -1,27 +1,29 @@
|
||||
from amodem import wave
|
||||
from amodem import audio
|
||||
import subprocess as sp
|
||||
import signal
|
||||
|
||||
|
||||
def test_launch():
|
||||
p = wave.launch(tool='true', fname='fname')
|
||||
p = audio.ALSA(tool='true', Fs=32000).launch(fname='fname')
|
||||
assert p.wait() == 0
|
||||
|
||||
def test_exit():
|
||||
p = wave.launch(tool='python', fname='-', stdin=sp.PIPE)
|
||||
p = audio.ALSA(tool='python', Fs=32000).launch(fname='-', stdin=sp.PIPE)
|
||||
s = b'import sys; sys.exit(42)'
|
||||
p.stdin.write(s)
|
||||
p.stdin.close()
|
||||
assert p.wait() == 42
|
||||
|
||||
def test_io():
|
||||
p = wave.launch(tool='python', fname='-', stdin=sp.PIPE, stdout=sp.PIPE)
|
||||
p = audio.ALSA(tool='python', Fs=32000)
|
||||
p = p.launch(fname='-', stdin=sp.PIPE, stdout=sp.PIPE)
|
||||
s = b'Hello World!'
|
||||
p.stdin.write(b'print("' + s + b'")\n')
|
||||
p.stdin.close()
|
||||
assert p.stdout.read(len(s)) == s
|
||||
|
||||
def test_kill():
|
||||
p = wave.launch(tool='python', fname='-', stdin=sp.PIPE, stdout=sp.PIPE)
|
||||
p = audio.ALSA(tool='python', Fs=32000)
|
||||
p = p.launch(fname='-', stdin=sp.PIPE, stdout=sp.PIPE)
|
||||
p.kill()
|
||||
assert p.wait() == -signal.SIGKILL
|
||||
@@ -1,4 +1,5 @@
|
||||
from amodem import calib
|
||||
from amodem import config
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
@@ -8,10 +9,13 @@ class ProcessMock(object):
|
||||
self.buf = BytesIO()
|
||||
self.stdin = self
|
||||
self.stdout = self
|
||||
self.bytes_per_sample = 2
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
def launch(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
__call__ = launch
|
||||
|
||||
def kill(self):
|
||||
pass
|
||||
|
||||
@@ -26,9 +30,9 @@ class ProcessMock(object):
|
||||
|
||||
def test_success():
|
||||
p = ProcessMock()
|
||||
calib.send(p)
|
||||
calib.send(config, p)
|
||||
p.buf.seek(0)
|
||||
calib.recv(p)
|
||||
calib.recv(config, p)
|
||||
|
||||
|
||||
def test_errors():
|
||||
@@ -37,11 +41,11 @@ def test_errors():
|
||||
def _write(data):
|
||||
raise IOError()
|
||||
p.write = _write
|
||||
calib.send(p)
|
||||
calib.send(config, p)
|
||||
assert p.buf.tell() == 0
|
||||
|
||||
def _read(data):
|
||||
raise KeyboardInterrupt()
|
||||
p.read = _read
|
||||
calib.recv(p, verbose=True)
|
||||
calib.recv(config, p, verbose=True)
|
||||
assert p.buf.tell() == 0
|
||||
|
||||
@@ -59,11 +59,12 @@ def test_estimate():
|
||||
|
||||
|
||||
def test_demux():
|
||||
freqs = [1e3, 2e3]
|
||||
carriers = [dsp.exp_iwt(f, config.Nsym) for f in freqs]
|
||||
freqs = np.array([1e3, 2e3])
|
||||
omegas = 2 * np.pi * freqs / config.Fs
|
||||
carriers = [dsp.exp_iwt(2*np.pi*f/config.Fs, config.Nsym) for f in freqs]
|
||||
syms = [3, 2j]
|
||||
sig = np.dot(syms, carriers)
|
||||
res = dsp.Demux(sampling.Sampler(sig.real), freqs)
|
||||
res = dsp.Demux(sampling.Sampler(sig.real), omegas, config.Nsym)
|
||||
res = np.array(list(res))
|
||||
assert np.max(np.abs(res - syms)) < 1e-12
|
||||
|
||||
|
||||
@@ -13,8 +13,9 @@ def assert_approx(x, y, e=1e-12):
|
||||
|
||||
def test_training():
|
||||
L = 1000
|
||||
t1 = equalizer.train_symbols(L)
|
||||
t2 = equalizer.train_symbols(L)
|
||||
e = equalizer.Equalizer(config)
|
||||
t1 = e.train_symbols(L)
|
||||
t2 = e.train_symbols(L)
|
||||
assert (t1 == t2).all()
|
||||
|
||||
|
||||
@@ -35,10 +36,11 @@ def test_commutation():
|
||||
|
||||
def test_modem():
|
||||
L = 1000
|
||||
sent = equalizer.train_symbols(L)
|
||||
e = equalizer.Equalizer(config)
|
||||
sent = e.train_symbols(L)
|
||||
gain = config.Nfreq
|
||||
x = equalizer.modulator(sent) * gain
|
||||
received = equalizer.demodulator(x, L)
|
||||
x = e.modulator(sent) * gain
|
||||
received = e.demodulator(x, L)
|
||||
assert_approx(sent, received)
|
||||
|
||||
|
||||
@@ -46,23 +48,24 @@ def test_symbols():
|
||||
length = 100
|
||||
gain = float(config.Nfreq)
|
||||
|
||||
symbols = equalizer.train_symbols(length=length)
|
||||
x = equalizer.modulator(symbols) * gain
|
||||
assert_approx(equalizer.demodulator(x, size=length), symbols)
|
||||
e = equalizer.Equalizer(config)
|
||||
symbols = e.train_symbols(length=length)
|
||||
x = e.modulator(symbols) * gain
|
||||
assert_approx(e.demodulator(x, size=length), symbols)
|
||||
|
||||
den = np.array([1, -0.6, 0.1])
|
||||
num = np.array([0.5])
|
||||
y = dsp.lfilter(x=x, b=num, a=den)
|
||||
|
||||
lookahead = 2
|
||||
h = equalizer.equalize_symbols(
|
||||
h = e.equalize_symbols(
|
||||
signal=y, symbols=symbols, order=len(den), lookahead=lookahead
|
||||
)
|
||||
assert norm(h[:lookahead]) < 1e-12
|
||||
assert_approx(h[lookahead:], den / num)
|
||||
|
||||
y = dsp.lfilter(x=y, b=h[lookahead:], a=[1])
|
||||
z = equalizer.demodulator(y, size=length)
|
||||
z = e.demodulator(y, size=length)
|
||||
assert_approx(z, symbols)
|
||||
|
||||
|
||||
@@ -72,9 +75,10 @@ def test_signal():
|
||||
den = np.array([1, -0.6, 0.1])
|
||||
num = np.array([0.5])
|
||||
y = dsp.lfilter(x=x, b=num, a=den)
|
||||
e = equalizer.Equalizer(config)
|
||||
|
||||
lookahead = 2
|
||||
h = equalizer.equalize_signal(
|
||||
h = e.equalize_signal(
|
||||
signal=y, expected=x, order=len(den), lookahead=lookahead)
|
||||
assert norm(h[:lookahead]) < 1e-12
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import numpy as np
|
||||
|
||||
from amodem import config
|
||||
from amodem import dsp
|
||||
from amodem import recv
|
||||
from amodem import train
|
||||
from amodem import sampling
|
||||
@@ -10,29 +11,34 @@ def test_detect():
|
||||
P = sum(train.prefix)
|
||||
t = np.arange(P * config.Nsym) * config.Ts
|
||||
x = np.cos(2 * np.pi * config.Fc * t)
|
||||
samples, amp = recv.detect(x, config.Fc)
|
||||
|
||||
detector = recv.Detector(config)
|
||||
samples, amp = detector.run(x)
|
||||
assert abs(1 - amp) < 1e-12
|
||||
|
||||
x = np.cos(2 * np.pi * (2*config.Fc) * t)
|
||||
try:
|
||||
recv.detect(x, config.Fc)
|
||||
detector.run(x)
|
||||
assert False
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def test_prefix():
|
||||
symbol = np.cos(2 * np.pi * config.Fc * np.arange(config.Nsym) * config.Ts)
|
||||
omega = 2 * np.pi * config.Fc / config.Fs
|
||||
symbol = np.cos(omega * np.arange(config.Nsym))
|
||||
signal = np.concatenate([c * symbol for c in train.prefix])
|
||||
|
||||
sampler = sampling.Sampler(signal)
|
||||
r = recv.Receiver()
|
||||
freq_err = r._prefix(sampler, freq=config.Fc)
|
||||
def symbols_stream(signal):
|
||||
sampler = sampling.Sampler(signal)
|
||||
return dsp.Demux(sampler=sampler, omegas=[omega], Nsym=config.Nsym)
|
||||
r = recv.Receiver(config)
|
||||
freq_err = r._prefix(symbols_stream(signal))
|
||||
assert abs(freq_err) < 1e-16
|
||||
|
||||
try:
|
||||
silence = 0 * signal
|
||||
r._prefix(sampling.Sampler(silence), freq=config.Fc)
|
||||
r._prefix(symbols_stream(silence))
|
||||
assert False
|
||||
except ValueError:
|
||||
pass
|
||||
@@ -40,6 +46,7 @@ def test_prefix():
|
||||
|
||||
def test_find_start():
|
||||
sym = np.cos(2 * np.pi * config.Fc * np.arange(config.Nsym) * config.Ts)
|
||||
detector = recv.Detector(config)
|
||||
|
||||
length = 200
|
||||
prefix = postfix = np.tile(0 * sym, 50)
|
||||
@@ -48,6 +55,6 @@ def test_find_start():
|
||||
prefix = [0] * offset
|
||||
bufs = [prefix, prefix, carrier, postfix]
|
||||
buf = np.concatenate(bufs)
|
||||
start = recv.find_start(buf, length*config.Nsym)
|
||||
start = detector.find_start(buf, length*config.Nsym)
|
||||
expected = offset + len(prefix)
|
||||
assert expected == start
|
||||
|
||||
@@ -8,6 +8,7 @@ from amodem import recv
|
||||
from amodem import common
|
||||
from amodem import dsp
|
||||
from amodem import sampling
|
||||
from amodem import config
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level=logging.DEBUG,
|
||||
@@ -27,7 +28,7 @@ class Args(object):
|
||||
def run(size, chan=None, df=0, success=True):
|
||||
tx_data = os.urandom(size)
|
||||
tx_audio = BytesIO()
|
||||
send.main(Args(silence_start=1, silence_stop=1,
|
||||
send.main(Args(config=config, silence_start=1, silence_stop=1,
|
||||
input=BytesIO(tx_data), output=tx_audio))
|
||||
|
||||
data = tx_audio.getvalue()
|
||||
@@ -43,7 +44,8 @@ def run(size, chan=None, df=0, success=True):
|
||||
rx_audio = BytesIO(data)
|
||||
|
||||
rx_data = BytesIO()
|
||||
result = recv.main(Args(skip=0, input=rx_audio, output=rx_data))
|
||||
result = recv.main(Args(config=config,
|
||||
skip=0, input=rx_audio, output=rx_data))
|
||||
rx_data = rx_data.getvalue()
|
||||
|
||||
assert result == success
|
||||
@@ -61,7 +63,7 @@ def test_small(small_size):
|
||||
|
||||
|
||||
def test_error():
|
||||
skip = 1 * send.config.Fs # remove trailing silence
|
||||
skip = 32000 # remove trailing silence
|
||||
run(1024, chan=lambda x: x[:-skip], success=False)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user