diff --git a/drift.py b/drift.py index 6116405..77f267b 100644 --- a/drift.py +++ b/drift.py @@ -2,74 +2,12 @@ import numpy as np import recv import common +import sigproc import loop -class Interpolator(object): - def __init__(self, resolution=10000, width=128): - self.width = width - self.resolution = resolution - self.N = resolution * width - u = np.arange(-self.N, self.N, dtype=float) - window = (1 + np.cos(0.5 * np.pi * u / self.N)) / 2.0 - h = np.sinc(u / resolution) * window - self.filt = [] - for index in range(resolution): # split into multiphase filters - filt = h[index::resolution] - filt = filt[::-1] - self.filt.append(filt) - lengths = map(len, self.filt) - assert set(lengths) == set([2*width]) - assert len(self.filt) == resolution - - def get(self, offset): - # offset = k + (j / self.resolution) - k = int(offset) - j = int((offset - k) * self.resolution) - coeffs = self.filt[j] - return coeffs, k - self.width - -class Sampler(object): - def __init__(self, src, interp): - self.src = iter(src) - self.freq = 1.0 - self.interp = interp - coeffs, begin = self.interp.get(0) - self.offset = -begin # should fill samples buffer - self.buff = np.zeros(len(coeffs)) - self.index = 0 - - def __iter__(self): - return self - - def correct(self, offset=0): - assert self.freq + offset > 0 - self.offset += offset - - def next(self): - res = self._sample() - self.offset += self.freq - return res - - def _sample(self): - coeffs, begin = self.interp.get(self.offset) - end = begin + len(coeffs) - while True: - if self.index == end: - return np.dot(coeffs, self.buff) - - self.buff[:-1] = self.buff[1:] - self.buff[-1] = self.src.next() # throws StopIteration - self.index += 1 - -def clip(x, lims): - return min(max(x, lims[0]), lims[1]) - -def loop_filter(P, I): - return - class FreqLoop(object): def __init__(self, x, freq): - self.sampler = Sampler(x, Interpolator()) + self.sampler = sigproc.Sampler(x, sigproc.Interpolator()) self.symbols = recv.extract_symbols(self.sampler, freq) Kp, Ki = 0.2, 0.01 b = np.array([1, -1])*Kp + np.array([0.5, 0.5])*Ki @@ -78,7 +16,7 @@ class FreqLoop(object): def correct(self, actual, expected): self.err = np.angle(expected / actual) / np.pi - self.err = clip(self.err, [-0.1, 0.1]) + self.err = sigproc.clip(self.err, [-0.1, 0.1]) self.correction = self.filt(self.err) self.sampler.correct(offset=self.correction) @@ -107,7 +45,7 @@ def main(): symbols.correction * (f0 / common.Nsym), ]) - S = np.array(list(S)) + S = np.array(S) pylab.figure() diff --git a/sigproc.py b/sigproc.py index 04b0945..5d1e6ed 100644 --- a/sigproc.py +++ b/sigproc.py @@ -15,22 +15,11 @@ def lfilter(b, a, x): y_ = [u] + y_[1:] yield u - -class Filter(object): - def __init__(self, b, a): - self.b = b - self.a = a - - def apply(self, x): - return lfilter(self.b, self.a, x) - - @classmethod - def train(cls, S, training): - A = np.array([ S[1:], S[:-1], training[:-1] ]).T - b = training[1:] - b0, b1, a1 = linalg.lstsq(A, b)[0] - return cls([b0, b1], [1, -a1]) - +def train(S, training): + A = np.array([ S[1:], S[:-1], training[:-1] ]).T + b = training[1:] + b0, b1, a1 = linalg.lstsq(A, b)[0] + return lambda x: lfilter(b=[b0, b1], a=[1, -a1], x=x) class QAM(object): def __init__(self, bits_per_symbol, radii): @@ -63,9 +52,62 @@ class QAM(object): modulator = QAM(bits_per_symbol=2, radii=[1.0]) -def test(): - q = QAM(bits_per_symbol=8, radii=[0.25, 0.5, 0.75, 1.0]) - bits = [(1,1,0,1,0,0,1,0)] - S = q.encode(bits) - res = list(q.decode(list(S))) - assert res == bits, (res) +class Interpolator(object): + def __init__(self, resolution=10000, width=128): + self.width = width + self.resolution = resolution + self.N = resolution * width + u = np.arange(-self.N, self.N, dtype=float) + window = (1 + np.cos(0.5 * np.pi * u / self.N)) / 2.0 + h = np.sinc(u / resolution) * window + self.filt = [] + for index in range(resolution): # split into multiphase filters + filt = h[index::resolution] + filt = filt[::-1] + self.filt.append(filt) + lengths = map(len, self.filt) + assert set(lengths) == set([2*width]) + assert len(self.filt) == resolution + + def get(self, offset): + # offset = k + (j / self.resolution) + k = int(offset) + j = int((offset - k) * self.resolution) + coeffs = self.filt[j] + return coeffs, k - self.width + +class Sampler(object): + def __init__(self, src, interp): + self.src = iter(src) + self.freq = 1.0 + self.interp = interp + coeffs, begin = self.interp.get(0) + self.offset = -begin # should fill samples buffer + self.buff = np.zeros(len(coeffs)) + self.index = 0 + + def __iter__(self): + return self + + def correct(self, offset=0): + assert self.freq + offset > 0 + self.offset += offset + + def next(self): + res = self._sample() + self.offset += self.freq + return res + + def _sample(self): + coeffs, begin = self.interp.get(self.offset) + end = begin + len(coeffs) + while True: + if self.index == end: + return np.dot(coeffs, self.buff) + + self.buff[:-1] = self.buff[1:] + self.buff[-1] = self.src.next() # throws StopIteration + self.index += 1 + +def clip(x, lims): + return min(max(x, lims[0]), lims[1]) diff --git a/test_sigproc.py b/test_sigproc.py new file mode 100644 index 0000000..e64e16c --- /dev/null +++ b/test_sigproc.py @@ -0,0 +1,10 @@ +import sigproc +import itertools + +def test_qam(): + q = sigproc.QAM(bits_per_symbol=8, radii=[0.25, 0.5, 0.75, 1.0]) + bits = [(1,1,0,1,0,0,1,0), (0,1,0,0,0,1,1,1)] + stream = itertools.chain(*bits) + S = q.encode(list(stream)) + decoded = list(q.decode(list(S))) + assert decoded == bits