mirror of
https://github.com/romanz/amodem.git
synced 2026-03-20 01:06:04 +08:00
dsp: move lfilter and IIR to tests
This commit is contained in:
@@ -18,32 +18,6 @@ class FIR(object):
|
||||
self.x_state = x_
|
||||
|
||||
|
||||
class IIR(object):
|
||||
def __init__(self, b, a):
|
||||
self.b = np.array(b) / a[0]
|
||||
self.a = np.array(a[1:]) / a[0]
|
||||
self.x_state = [0] * len(self.b)
|
||||
self.y_state = [0] * (len(self.a) + 1)
|
||||
|
||||
def __call__(self, x):
|
||||
x_, y_ = self.x_state, self.y_state
|
||||
for v in x:
|
||||
x_ = [v] + x_[:-1]
|
||||
y_ = y_[:-1]
|
||||
num = np.dot(x_, self.b)
|
||||
den = np.dot(y_, self.a)
|
||||
y = num - den
|
||||
y_ = [y] + y_
|
||||
yield y
|
||||
self.x_state, self.y_state = x_, y_
|
||||
|
||||
|
||||
def lfilter(b, a, x):
|
||||
f = IIR(b=b, a=a)
|
||||
y = list(f(x))
|
||||
return np.array(y)
|
||||
|
||||
|
||||
class Demux(object):
|
||||
def __init__(self, sampler, omegas, Nsym):
|
||||
self.Nsym = Nsym
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from amodem import dsp
|
||||
from amodem import sampling
|
||||
from amodem import config
|
||||
import utils
|
||||
|
||||
import numpy as np
|
||||
import random
|
||||
@@ -20,11 +21,11 @@ def test_linreg():
|
||||
|
||||
def test_filter():
|
||||
x = range(10)
|
||||
y = dsp.lfilter(b=[1], a=[1], x=x)
|
||||
y = utils.lfilter(b=[1], a=[1], x=x)
|
||||
assert (np.array(x) == y).all()
|
||||
|
||||
x = [1] + [0] * 10
|
||||
y = dsp.lfilter(b=[0.5], a=[1, -0.5], x=x)
|
||||
y = utils.lfilter(b=[0.5], a=[1, -0.5], x=x)
|
||||
assert list(y) == [0.5 ** (i+1) for i in range(len(x))]
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ from numpy.linalg import norm
|
||||
from numpy.random import RandomState
|
||||
import numpy as np
|
||||
|
||||
from amodem import dsp
|
||||
import utils
|
||||
from amodem import equalizer
|
||||
from amodem import config
|
||||
config = config.fastest()
|
||||
@@ -24,14 +24,14 @@ def test_commutation():
|
||||
x = np.random.RandomState(seed=0).normal(size=1000)
|
||||
b = [1, 1j, -1, -1j]
|
||||
a = [1, 0.1]
|
||||
y = dsp.lfilter(x=x, b=b, a=a)
|
||||
y1 = dsp.lfilter(x=dsp.lfilter(x=x, b=b, a=[1]), b=[1], a=a)
|
||||
y2 = dsp.lfilter(x=dsp.lfilter(x=x, b=[1], a=a), b=b, a=[1])
|
||||
y = utils.lfilter(x=x, b=b, a=a)
|
||||
y1 = utils.lfilter(x=utils.lfilter(x=x, b=b, a=[1]), b=[1], a=a)
|
||||
y2 = utils.lfilter(x=utils.lfilter(x=x, b=[1], a=a), b=b, a=[1])
|
||||
assert_approx(y, y1)
|
||||
assert_approx(y, y2)
|
||||
|
||||
z = dsp.lfilter(x=y, b=a, a=[1])
|
||||
z_ = dsp.lfilter(x=x, b=b, a=[1])
|
||||
z = utils.lfilter(x=y, b=a, a=[1])
|
||||
z_ = utils.lfilter(x=x, b=b, a=[1])
|
||||
assert_approx(z, z_)
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ def test_signal():
|
||||
x = np.sign(RandomState(0).normal(size=length))
|
||||
den = np.array([1, -0.6, 0.1])
|
||||
num = np.array([0.5])
|
||||
y = dsp.lfilter(x=x, b=num, a=den)
|
||||
y = utils.lfilter(x=x, b=num, a=den)
|
||||
|
||||
lookahead = 2
|
||||
h = equalizer.train(
|
||||
@@ -60,5 +60,5 @@ def test_signal():
|
||||
h = h[lookahead:]
|
||||
assert_approx(h, den / num)
|
||||
|
||||
x_ = dsp.lfilter(x=y, b=h, a=[1])
|
||||
x_ = utils.lfilter(x=y, b=h, a=[1])
|
||||
assert_approx(x_, x)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from amodem import main
|
||||
from amodem import common
|
||||
from amodem import dsp
|
||||
from amodem import sampling
|
||||
from amodem import config
|
||||
import utils
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
@@ -85,11 +85,11 @@ def test_timing(freq_err):
|
||||
|
||||
|
||||
def test_lowpass():
|
||||
run(1024, chan=lambda x: dsp.lfilter(b=[0.9], a=[1.0, -0.1], x=x))
|
||||
run(1024, chan=lambda x: utils.lfilter(b=[0.9], a=[1.0, -0.1], x=x))
|
||||
|
||||
|
||||
def test_highpass():
|
||||
run(1024, chan=lambda x: dsp.lfilter(b=[0.9], a=[1.0, 0.1], x=x))
|
||||
run(1024, chan=lambda x: utils.lfilter(b=[0.9], a=[1.0, 0.1], x=x))
|
||||
|
||||
|
||||
def test_attenuation():
|
||||
|
||||
27
tests/utils.py
Normal file
27
tests/utils.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
class IIR(object):
|
||||
def __init__(self, b, a):
|
||||
self.b = np.array(b) / a[0]
|
||||
self.a = np.array(a[1:]) / a[0]
|
||||
self.x_state = [0] * len(self.b)
|
||||
self.y_state = [0] * (len(self.a) + 1)
|
||||
|
||||
def __call__(self, x):
|
||||
x_, y_ = self.x_state, self.y_state
|
||||
for v in x:
|
||||
x_ = [v] + x_[:-1]
|
||||
y_ = y_[:-1]
|
||||
num = np.dot(x_, self.b)
|
||||
den = np.dot(y_, self.a)
|
||||
y = num - den
|
||||
y_ = [y] + y_
|
||||
yield y
|
||||
self.x_state, self.y_state = x_, y_
|
||||
|
||||
|
||||
def lfilter(b, a, x):
|
||||
f = IIR(b=b, a=a)
|
||||
y = list(f(x))
|
||||
return np.array(y)
|
||||
Reference in New Issue
Block a user