From 93a94bb728f6876d8a31eb8a7ff464f78de51af1 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 23 Aug 2014 15:59:02 +0300 Subject: [PATCH] refactor equalizers and its tests --- amodem/equalizer.py | 63 +++++++++++++++++++++++ tests/test_equalizer.py | 109 +++++++++++++--------------------------- 2 files changed, 98 insertions(+), 74 deletions(-) create mode 100644 amodem/equalizer.py diff --git a/amodem/equalizer.py b/amodem/equalizer.py new file mode 100644 index 0000000..7deb29a --- /dev/null +++ b/amodem/equalizer.py @@ -0,0 +1,63 @@ +import numpy as np +from numpy.linalg import lstsq + +from amodem import dsp, config, send + +import itertools +import random + +_constellation = [1, 1j, -1, -1j] + + +def train_symbols(length, seed=0, Nfreq=config.Nfreq): + r = random.Random(seed) + choose = lambda: [r.choice(_constellation) for j in range(Nfreq)] + return np.array([choose() for i in range(length)]) + + +def modulator(symbols, carriers=send.sym.carrier): + gain = 1.0 / len(carriers) + result = [] + for s in symbols: + result.append(np.dot(s, carriers)) + result = np.concatenate(result).real * gain + assert np.max(np.abs(result)) <= 1 + return result + + +def demodulator(signal, size): + signal = itertools.chain(signal, itertools.repeat(0)) + symbols = dsp.Demux(signal, config.frequencies) + return np.array(list(itertools.islice(symbols, size))) + + +def equalize(signal, symbols, order): + Nsym = config.Nsym + Nfreq = config.Nfreq + carriers = send.sym.carrier + + assert symbols.shape[1] == Nfreq + length = symbols.shape[0] + + matched = np.array(carriers) * Nfreq / (0.5*Nsym) + matched = matched[:, ::-1].transpose().conj() + y = dsp.lfilter(x=signal, b=matched, a=[1]) + + A = np.zeros([symbols.size, order], dtype=np.complex128) + b = np.zeros([symbols.size], dtype=np.complex128) + + index = 0 + for j in range(Nfreq): + for i in range(length): + offset = (i+1)*Nsym + row = y[offset-order:offset, j] + A[index, :] = row + b[index] = symbols[i, j] + index += 1 + + A = np.array(A) + b = np.array(b) + h, residuals, rank, sv = lstsq(A, b) + h = h[::-1].real + + return h diff --git a/tests/test_equalizer.py b/tests/test_equalizer.py index 048cb78..ac1cf6b 100644 --- a/tests/test_equalizer.py +++ b/tests/test_equalizer.py @@ -1,7 +1,12 @@ -from amodem import train, dsp, config, send -from numpy.linalg import norm, lstsq +from numpy.linalg import norm import numpy as np -import itertools + +from amodem import train, dsp, config, send +from amodem import equalizer + + +def assert_approx(x, y, e=1e-12): + assert norm(x - y) < e * norm(x) def test_fir(): @@ -10,8 +15,8 @@ def test_fir(): rx = dsp.lfilter(x=tx, b=[1], a=a) h_ = dsp.estimate(x=rx, y=tx, order=len(a)) tx_ = dsp.lfilter(x=rx, b=h_, a=[1]) - assert norm(h_ - a) < 1e-12 - assert (norm(tx - tx_) / norm(tx)) < 1e-12 + assert_approx(h_, a) + assert_approx(tx, tx_) def test_iir(): @@ -23,41 +28,14 @@ def test_iir(): tx_ = dsp.lfilter(x=rx, b=h_, a=[1]) h_expected = np.array([alpha ** i for i in range(len(h_))]) - assert norm(h_ - h_expected) < 1e-12 - assert (norm(tx - tx_) / norm(tx)) < 1e-12 - -import random - -_constellation = [1, 1j, -1, -1j] - - -def train_symbols(length, seed=0, Nfreq=config.Nfreq): - r = random.Random(seed) - choose = lambda: [r.choice(_constellation) for j in range(Nfreq)] - return np.array([choose() for i in range(length)]) - - -def modulator(length): - symbols = train_symbols(length) - carriers = send.sym.carrier - gain = 1.0 / len(carriers) - result = [] - for s in symbols: - result.append(np.dot(s, carriers)) - result = np.concatenate(result).real * gain - assert np.max(np.abs(result)) <= 1 - return result - - -def demodulator(signal): - signal = itertools.chain(signal, itertools.repeat(0)) - return dsp.Demux(signal, config.frequencies) + assert_approx(h_, h_expected) + assert_approx(tx, tx_) def test_training(): L = 1000 - t1 = train_symbols(L) - t2 = train_symbols(L) + t1 = equalizer.train_symbols(L) + t2 = equalizer.train_symbols(L) assert (t1 == t2).all() @@ -68,55 +46,38 @@ def test_commutation(): y = dsp.lfilter(x=x, b=b, a=a) y1 = dsp.lfilter(x=dsp.lfilter(x=x, b=b, a=[1]), b=[1], a=a) y2 = dsp.lfilter(x=dsp.lfilter(x=x, b=[1], a=a), b=b, a=[1]) - assert norm(y - y1) < 1e-10 - assert norm(y - y2) < 1e-10 + assert_approx(y, y1) + assert_approx(y, y2) z = dsp.lfilter(x=y, b=a, a=[1]) z_ = dsp.lfilter(x=x, b=b, a=[1]) - assert norm(z - z_) < 1e-10 + assert_approx(z, z_) def test_modem(): L = 1000 - sent = train_symbols(L) + sent = equalizer.train_symbols(L) gain = len(send.sym.carrier) - x = modulator(L) * gain - s = demodulator(x) - received = np.array(list(itertools.islice(s, L))) - err = sent - received - assert norm(err) < 1e-10 + x = equalizer.modulator(sent) * gain + received = equalizer.demodulator(x, L) + assert_approx(sent, received) -def test_equalizer(): - N = 32 - s = train_symbols(length=100, Nfreq=1).real.squeeze() - x = [v for v in s for i in range(N)] - matched = [1.0 / N] * N - z = dsp.lfilter(x=x, b=matched, a=[1]) - assert norm(z[N-1::N] - s) < 1e-12 +def test_isi(): + length = 100 + gain = float(config.Nfreq) - den = np.array([1, 0.125]) - num = np.array([1]) + symbols = equalizer.train_symbols(length=length) + x = equalizer.modulator(symbols) + assert_approx(equalizer.demodulator(gain * x, size=length), symbols) + + den = np.array([1, -0.6, 0.1]) + num = np.array([0.5]) y = dsp.lfilter(x=x, b=num, a=den) - y = dsp.lfilter(x=y, b=matched, a=[1]) - A = [] - b = [] + h = equalizer.equalize(y, symbols, order=len(den)) + assert_approx(h, den / num) - r = 2 - for i in range(len(s)): - offset = (i+1)*N - row = y[offset-r:offset] - A.append(row) - b.append(s[i]) - A = np.array(A) - b = np.array(b) - h, residuals, rank, sv = lstsq(A, b) - h = h[::-1] - print(h) - - y1 = dsp.lfilter(x=x, b=num, a=den) - y2 = dsp.lfilter(x=y1, b=h, a=[1]) - y3 = dsp.lfilter(x=y2, b=matched, a=[1]) - z = y3[N-1::N] - assert norm(z - s) < 1e-12 + y = dsp.lfilter(x=y, b=h, a=[1]) + z = equalizer.demodulator(gain * y, size=length) + assert_approx(z, symbols)