mirror of
https://github.com/romanz/amodem.git
synced 2026-04-05 20:26:24 +08:00
tests: move to root directory
This commit is contained in:
53
tests/test_common.py
Normal file
53
tests/test_common.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import common
|
||||
import numpy as np
|
||||
|
||||
|
||||
def iterlist(x, *args, **kwargs):
|
||||
x = np.array(x)
|
||||
return list((i, list(x)) for i, x in common.iterate(x, *args, **kwargs))
|
||||
|
||||
|
||||
def test_iterate():
|
||||
N = 10
|
||||
assert iterlist(range(N), 1) == [
|
||||
(i, [i]) for i in range(N)]
|
||||
|
||||
assert iterlist(range(N), 2) == [
|
||||
(i, [i, i+1]) for i in range(0, N-1, 2)]
|
||||
|
||||
assert iterlist(range(N), 3) == [
|
||||
(i, [i, i+1, i+2]) for i in range(0, N-2, 3)]
|
||||
|
||||
assert iterlist(range(N), 1, func=lambda b: -np.array(b)) == [
|
||||
(i, [-i]) for i in range(N)]
|
||||
|
||||
|
||||
def test_split():
|
||||
L = [(i*2, i*2+1) for i in range(10)]
|
||||
iters = common.split(L, n=2)
|
||||
assert zip(*iters) == L
|
||||
|
||||
for i in [0, 1]:
|
||||
iters = common.split(L, n=2)
|
||||
iters[i].next()
|
||||
try:
|
||||
iters[i].next()
|
||||
assert False
|
||||
except IndexError as e:
|
||||
assert e.args == (i,)
|
||||
|
||||
|
||||
def test_icapture():
|
||||
x = range(100)
|
||||
y = []
|
||||
z = []
|
||||
for i in common.icapture(x, result=y):
|
||||
z.append(i)
|
||||
assert x == y
|
||||
assert x == z
|
||||
|
||||
|
||||
def test_dumps_loads():
|
||||
x = np.array([.1, .4, .2, .6, .3, .5])
|
||||
y = common.loads(common.dumps(x * 1j))
|
||||
assert all(x == y)
|
||||
15
tests/test_ecc.py
Normal file
15
tests/test_ecc.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import ecc
|
||||
import random
|
||||
import itertools
|
||||
|
||||
|
||||
def concat(chunks):
|
||||
return bytearray(itertools.chain.from_iterable(chunks))
|
||||
|
||||
|
||||
def test_random():
|
||||
r = random.Random(0)
|
||||
x = bytearray(r.randrange(0, 256) for i in range(64 * 1024))
|
||||
y = ecc.encode(x)
|
||||
x_ = concat(ecc.decode(y))
|
||||
assert x_ == x
|
||||
49
tests/test_full.py
Normal file
49
tests/test_full.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import os
|
||||
from cStringIO import StringIO
|
||||
import numpy as np
|
||||
|
||||
import send
|
||||
import recv
|
||||
import common
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level=logging.DEBUG,
|
||||
format='%(asctime)s %(levelname)-12s %(message)s')
|
||||
|
||||
class Args(object):
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def run(size, chan):
|
||||
tx_data = os.urandom(size)
|
||||
tx_audio = StringIO()
|
||||
send.main(Args(silence_start=1, silence_stop=1, input=StringIO(tx_data), output=tx_audio))
|
||||
|
||||
data = tx_audio.getvalue()
|
||||
data = common.loads(data)
|
||||
data = chan(data)
|
||||
data = common.dumps(data * 1j)
|
||||
rx_audio = StringIO(data)
|
||||
|
||||
rx_data = StringIO()
|
||||
recv.main(Args(skip=100, input=rx_audio, output=rx_data))
|
||||
rx_data = rx_data.getvalue()
|
||||
|
||||
assert rx_data == tx_data
|
||||
|
||||
def test_small():
|
||||
run(1024, lambda x: x)
|
||||
|
||||
def test_large():
|
||||
run(54321, lambda x: x)
|
||||
|
||||
def test_attenuation():
|
||||
run(5120, lambda x: x * 0.1)
|
||||
|
||||
def test_low_noise():
|
||||
r = np.random.RandomState(seed=0)
|
||||
run(5120, lambda x: x + r.normal(size=len(x), scale=0.0001))
|
||||
|
||||
def test_medium_noise():
|
||||
r = np.random.RandomState(seed=0)
|
||||
run(5120, lambda x: x + r.normal(size=len(x), scale=0.001))
|
||||
17
tests/test_sampling.py
Normal file
17
tests/test_sampling.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import sampling
|
||||
import numpy as np
|
||||
|
||||
|
||||
def test_resample():
|
||||
x = np.arange(300)
|
||||
s = sampling.Sampler(x)
|
||||
y = np.array(list(s))
|
||||
|
||||
err = x[1:len(y)+1] - y
|
||||
assert np.max(np.abs(err)) < 1e-10
|
||||
|
||||
|
||||
def test_coeffs():
|
||||
I = sampling.Interpolator(width=4, resolution=16)
|
||||
err = I.filt[0] - [0, 0, 0, 1, 0, 0, 0, 0]
|
||||
assert np.max(np.abs(err)) < 1e-10
|
||||
25
tests/test_sigproc.py
Normal file
25
tests/test_sigproc.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import sigproc
|
||||
import itertools
|
||||
import config
|
||||
import numpy as np
|
||||
import random
|
||||
|
||||
|
||||
def test_qam():
|
||||
q = sigproc.QAM(config.symbols)
|
||||
r = random.Random(0)
|
||||
m = q.bits_per_symbol
|
||||
bits = [tuple(r.randint(0, 1) for j in range(m)) for i in range(1024)]
|
||||
stream = itertools.chain(*bits)
|
||||
S = q.encode(list(stream))
|
||||
decoded = list(q.decode(list(S)))
|
||||
assert decoded == bits
|
||||
|
||||
|
||||
def test_linreg():
|
||||
x = np.array([1, 3, 2, 8, 4, 6, 9, 7, 0, 5])
|
||||
a, b = 12.3, 4.56
|
||||
y = a * x + b
|
||||
a_, b_ = sigproc.linear_regression(x, y)
|
||||
assert abs(a - a_) < 1e-10
|
||||
assert abs(b - b_) < 1e-10
|
||||
22
tests/test_stream.py
Normal file
22
tests/test_stream.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import stream
|
||||
import wave
|
||||
|
||||
|
||||
def test():
|
||||
p = wave.record('-', stdout=wave.sp.PIPE)
|
||||
f = stream.Reader(p.stdout)
|
||||
|
||||
result = zip(range(10), f)
|
||||
p.stop()
|
||||
|
||||
j = 0
|
||||
for i, buf in result:
|
||||
assert i == j
|
||||
assert len(buf) == f.SAMPLES
|
||||
j += 1
|
||||
|
||||
try:
|
||||
for buf in f:
|
||||
pass
|
||||
except IOError as e:
|
||||
assert str(e) == 'timeout'
|
||||
Reference in New Issue
Block a user