From 3007ea3c9a330fa04c687c9931949e067210b679 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 4 Jul 2014 12:34:51 +0300 Subject: [PATCH] Frequency loop is working on real signal --- drift.py | 150 ++++++++++++++++++++++--------------------------------- 1 file changed, 59 insertions(+), 91 deletions(-) diff --git a/drift.py b/drift.py index 9248689..d162d02 100644 --- a/drift.py +++ b/drift.py @@ -1,47 +1,11 @@ import numpy as np -import itertools import recv import common - -class Filter(object): - def __init__(self, b, a): - self.b = b - self.a = a - self.x = [0] * len(b) - self.y = [0] * len(a) - - def __call__(self, x): - self.x = [x] + self.x[:-1] - assert len(self.x) == len(self.b) - assert len(self.y) == len(self.a) - y = np.dot(self.x, self.b) - np.dot(self.y, self.a) - self.y = [y] + self.y[:-1] - return y - -def overlap_iter(x, n, overlap=0): - assert overlap >= 0 - assert overlap < n - res = [] - x = iter(x) - while True: - res.extend(itertools.islice(x, n - len(res))) - if len(res) < n: - break - yield tuple(res) - res = res[n - overlap:] - -def test_overlap(): - assert list(overlap_iter(range(7), 3, 1)) == [(0,1,2), (2,3,4), (4,5,6)] - assert list(overlap_iter(range(7), 3, 0)) == [(0,1,2), (3,4,5)] - -def calib(S): - for S0, S1 in overlap_iter(S, 2, overlap=1): - dS = S1 / S0 - yield dS +import loop class Interpolator(object): - def __init__(self, resolution=1000, width=20): + def __init__(self, resolution=10000, width=128): self.width = width self.resolution = resolution self.N = resolution * width @@ -65,76 +29,80 @@ class Interpolator(object): return coeffs, k - self.width class Sampler(object): - def __init__(self, src, interp=None): + def __init__(self, src, interp): self.src = iter(src) - self.index = 0 self.freq = 1.0 - self.interp = interp or Interpolator() - self.offset = self.interp.width - self.buff = [] + self.interp = interp + coeffs, begin = self.interp.get(0) + self.offset = -begin # should fill samples buffer + self.buff = np.zeros(len(coeffs)) + self.index = 0 - def sample(self): - coeffs, begin = self.interp.get(self.offset) - end = begin + len(coeffs) - # C = ', '.join(['%.3f' % c for c in coeffs[18:23]]) - # print '%.3f [%s] %d %d' % (self.offset, C, begin, end) - while True: - if self.index == end: - self.buff = self.buff[-len(coeffs):] - return np.dot(coeffs, self.buff) - try: - s = self.src.next() - except StopIteration: - break + def __iter__(self): + return self - self.buff.append(s) - self.index += 1 + def correct(self, offset=0): + assert self.freq + offset > 0 + self.offset += offset def next(self): + res = self._sample() self.offset += self.freq + return res + def _sample(self): + coeffs, begin = self.interp.get(self.offset) + end = begin + len(coeffs) + while True: + if self.index == end: + return np.dot(coeffs, self.buff) + + self.buff[:-1] = self.buff[1:] + self.buff[-1] = self.src.next() + self.index += 1 + +def clip(x, lims): + return min(max(x, lims[0]), lims[1]) + +def loop_filter(P, I): + return loop.Filter(b=[P*(1+I/2.), P*(-1+I/2.)], a=[1]) def main(): import pylab - if 1: - f0 = 10e3 - _, x = common.load('recv_10kHz.pcm') - x = x[100:] - y = [] - sampler = Sampler(x) - sampler.freq = 1.0 + 0.112/f0 - while True: - u = sampler.sample() - if u is None: - break - y.append(u) - sampler.next() - x_ = np.array(y) - S = recv.extract_symbols(x_, f0) - S = np.array(list(S)) - y = S #np.array(list(calib(S))) - phase = np.unwrap(np.angle(y)) + f0 = 10e3 + _, x = common.load('recv_10kHz.pcm') + x = x[100:] + sampler = Sampler(x, Interpolator()) + S = [] + first = None + Y = [] - phase_error = (phase[-1] - phase[0]) - phase_error_per_1ms = phase_error / (len(phase) - 1) - freq_error = phase_error_per_1ms * 1000.0 / (2 * np.pi) - print phase_error, len(phase) - print phase_error_per_1ms - print freq_error + filt = loop_filter(P=0.1, I=0.01) + for s in recv.extract_symbols(sampler, f0): + S.append(s) + if first is None: + first = s + else: + err = np.angle(first / s) / (2*np.pi) + err = clip(err, [-0.1, 0.1]) + filt_err = filt(err) + sampler.correct(offset=filt_err) + Y.append([err, filt_err]) - if 1: - pylab.figure() - pylab.plot(y.real, y.imag, '.') - pylab.grid('on') - pylab.show() - return - I = Interpolator() - f = I.filt + y = np.array(list(S)) + pylab.figure() - pylab.plot(zip(*f[::100])) + pylab.subplot(121) + pylab.plot(y.real, y.imag, '.') + pylab.grid('on') + pylab.axis('equal') + pylab.subplot(122) + pylab.plot(np.array(Y) * 1e3 / 32, '-') + pylab.grid('on') pylab.show() + return if __name__ == '__main__': main()