refactor equalizers and its tests

This commit is contained in:
Roman Zeyde
2014-08-23 15:59:02 +03:00
parent 1af22dcd72
commit 93a94bb728
2 changed files with 98 additions and 74 deletions

63
amodem/equalizer.py Normal file
View File

@@ -0,0 +1,63 @@
import numpy as np
from numpy.linalg import lstsq
from amodem import dsp, config, send
import itertools
import random
_constellation = [1, 1j, -1, -1j]
def train_symbols(length, seed=0, Nfreq=config.Nfreq):
r = random.Random(seed)
choose = lambda: [r.choice(_constellation) for j in range(Nfreq)]
return np.array([choose() for i in range(length)])
def modulator(symbols, carriers=send.sym.carrier):
gain = 1.0 / len(carriers)
result = []
for s in symbols:
result.append(np.dot(s, carriers))
result = np.concatenate(result).real * gain
assert np.max(np.abs(result)) <= 1
return result
def demodulator(signal, size):
signal = itertools.chain(signal, itertools.repeat(0))
symbols = dsp.Demux(signal, config.frequencies)
return np.array(list(itertools.islice(symbols, size)))
def equalize(signal, symbols, order):
Nsym = config.Nsym
Nfreq = config.Nfreq
carriers = send.sym.carrier
assert symbols.shape[1] == Nfreq
length = symbols.shape[0]
matched = np.array(carriers) * Nfreq / (0.5*Nsym)
matched = matched[:, ::-1].transpose().conj()
y = dsp.lfilter(x=signal, b=matched, a=[1])
A = np.zeros([symbols.size, order], dtype=np.complex128)
b = np.zeros([symbols.size], dtype=np.complex128)
index = 0
for j in range(Nfreq):
for i in range(length):
offset = (i+1)*Nsym
row = y[offset-order:offset, j]
A[index, :] = row
b[index] = symbols[i, j]
index += 1
A = np.array(A)
b = np.array(b)
h, residuals, rank, sv = lstsq(A, b)
h = h[::-1].real
return h

View File

@@ -1,7 +1,12 @@
from amodem import train, dsp, config, send
from numpy.linalg import norm, lstsq
from numpy.linalg import norm
import numpy as np
import itertools
from amodem import train, dsp, config, send
from amodem import equalizer
def assert_approx(x, y, e=1e-12):
assert norm(x - y) < e * norm(x)
def test_fir():
@@ -10,8 +15,8 @@ def test_fir():
rx = dsp.lfilter(x=tx, b=[1], a=a)
h_ = dsp.estimate(x=rx, y=tx, order=len(a))
tx_ = dsp.lfilter(x=rx, b=h_, a=[1])
assert norm(h_ - a) < 1e-12
assert (norm(tx - tx_) / norm(tx)) < 1e-12
assert_approx(h_, a)
assert_approx(tx, tx_)
def test_iir():
@@ -23,41 +28,14 @@ def test_iir():
tx_ = dsp.lfilter(x=rx, b=h_, a=[1])
h_expected = np.array([alpha ** i for i in range(len(h_))])
assert norm(h_ - h_expected) < 1e-12
assert (norm(tx - tx_) / norm(tx)) < 1e-12
import random
_constellation = [1, 1j, -1, -1j]
def train_symbols(length, seed=0, Nfreq=config.Nfreq):
r = random.Random(seed)
choose = lambda: [r.choice(_constellation) for j in range(Nfreq)]
return np.array([choose() for i in range(length)])
def modulator(length):
symbols = train_symbols(length)
carriers = send.sym.carrier
gain = 1.0 / len(carriers)
result = []
for s in symbols:
result.append(np.dot(s, carriers))
result = np.concatenate(result).real * gain
assert np.max(np.abs(result)) <= 1
return result
def demodulator(signal):
signal = itertools.chain(signal, itertools.repeat(0))
return dsp.Demux(signal, config.frequencies)
assert_approx(h_, h_expected)
assert_approx(tx, tx_)
def test_training():
L = 1000
t1 = train_symbols(L)
t2 = train_symbols(L)
t1 = equalizer.train_symbols(L)
t2 = equalizer.train_symbols(L)
assert (t1 == t2).all()
@@ -68,55 +46,38 @@ def test_commutation():
y = dsp.lfilter(x=x, b=b, a=a)
y1 = dsp.lfilter(x=dsp.lfilter(x=x, b=b, a=[1]), b=[1], a=a)
y2 = dsp.lfilter(x=dsp.lfilter(x=x, b=[1], a=a), b=b, a=[1])
assert norm(y - y1) < 1e-10
assert norm(y - y2) < 1e-10
assert_approx(y, y1)
assert_approx(y, y2)
z = dsp.lfilter(x=y, b=a, a=[1])
z_ = dsp.lfilter(x=x, b=b, a=[1])
assert norm(z - z_) < 1e-10
assert_approx(z, z_)
def test_modem():
L = 1000
sent = train_symbols(L)
sent = equalizer.train_symbols(L)
gain = len(send.sym.carrier)
x = modulator(L) * gain
s = demodulator(x)
received = np.array(list(itertools.islice(s, L)))
err = sent - received
assert norm(err) < 1e-10
x = equalizer.modulator(sent) * gain
received = equalizer.demodulator(x, L)
assert_approx(sent, received)
def test_equalizer():
N = 32
s = train_symbols(length=100, Nfreq=1).real.squeeze()
x = [v for v in s for i in range(N)]
matched = [1.0 / N] * N
z = dsp.lfilter(x=x, b=matched, a=[1])
assert norm(z[N-1::N] - s) < 1e-12
def test_isi():
length = 100
gain = float(config.Nfreq)
den = np.array([1, 0.125])
num = np.array([1])
symbols = equalizer.train_symbols(length=length)
x = equalizer.modulator(symbols)
assert_approx(equalizer.demodulator(gain * x, size=length), symbols)
den = np.array([1, -0.6, 0.1])
num = np.array([0.5])
y = dsp.lfilter(x=x, b=num, a=den)
y = dsp.lfilter(x=y, b=matched, a=[1])
A = []
b = []
h = equalizer.equalize(y, symbols, order=len(den))
assert_approx(h, den / num)
r = 2
for i in range(len(s)):
offset = (i+1)*N
row = y[offset-r:offset]
A.append(row)
b.append(s[i])
A = np.array(A)
b = np.array(b)
h, residuals, rank, sv = lstsq(A, b)
h = h[::-1]
print(h)
y1 = dsp.lfilter(x=x, b=num, a=den)
y2 = dsp.lfilter(x=y1, b=h, a=[1])
y3 = dsp.lfilter(x=y2, b=matched, a=[1])
z = y3[N-1::N]
assert norm(z - s) < 1e-12
y = dsp.lfilter(x=y, b=h, a=[1])
z = equalizer.demodulator(gain * y, size=length)
assert_approx(z, symbols)