diff --git a/drift.py b/drift.py index 77f267b..64eb602 100644 --- a/drift.py +++ b/drift.py @@ -3,11 +3,12 @@ import numpy as np import recv import common import sigproc +import sampling import loop class FreqLoop(object): def __init__(self, x, freq): - self.sampler = sigproc.Sampler(x, sigproc.Interpolator()) + self.sampler = sampling.Sampler(x, sampling.Interpolator()) self.symbols = recv.extract_symbols(self.sampler, freq) Kp, Ki = 0.2, 0.01 b = np.array([1, -1])*Kp + np.array([0.5, 0.5])*Ki @@ -26,8 +27,6 @@ class FreqLoop(object): import pylab def main(): - import sigproc - f0 = 10e3 _, x = common.load('recv_10kHz.pcm') x = x[100:] diff --git a/sampling.py b/sampling.py new file mode 100644 index 0000000..2b9d75c --- /dev/null +++ b/sampling.py @@ -0,0 +1,58 @@ +import numpy as np + +class Interpolator(object): + def __init__(self, resolution=10000, width=128): + self.width = width + self.resolution = resolution + self.N = resolution * width + u = np.arange(-self.N, self.N, dtype=float) + window = (1 + np.cos(0.5 * np.pi * u / self.N)) / 2.0 + h = np.sinc(u / resolution) * window + self.filt = [] + for index in range(resolution): # split into multiphase filters + filt = h[index::resolution] + filt = filt[::-1] + self.filt.append(filt) + lengths = map(len, self.filt) + assert set(lengths) == set([2*width]) + assert len(self.filt) == resolution + + def get(self, offset): + # offset = k + (j / self.resolution) + k = int(offset) + j = int((offset - k) * self.resolution) + coeffs = self.filt[j] + return coeffs, k - self.width + +class Sampler(object): + def __init__(self, src, interp): + self.src = iter(src) + self.freq = 1.0 + self.interp = interp + coeffs, begin = self.interp.get(0) + self.offset = -begin # should fill samples buffer + self.buff = np.zeros(len(coeffs)) + self.index = 0 + + def __iter__(self): + return self + + def correct(self, offset=0): + assert self.freq + offset > 0 + self.offset += offset + + def next(self): + res = self._sample() + self.offset += self.freq + return res + + def _sample(self): + coeffs, begin = self.interp.get(self.offset) + end = begin + len(coeffs) + while True: + if self.index == end: + return np.dot(coeffs, self.buff) + + self.buff[:-1] = self.buff[1:] + self.buff[-1] = self.src.next() # throws StopIteration + self.index += 1 diff --git a/sigproc.py b/sigproc.py index 5d1e6ed..606d804 100644 --- a/sigproc.py +++ b/sigproc.py @@ -52,62 +52,5 @@ class QAM(object): modulator = QAM(bits_per_symbol=2, radii=[1.0]) -class Interpolator(object): - def __init__(self, resolution=10000, width=128): - self.width = width - self.resolution = resolution - self.N = resolution * width - u = np.arange(-self.N, self.N, dtype=float) - window = (1 + np.cos(0.5 * np.pi * u / self.N)) / 2.0 - h = np.sinc(u / resolution) * window - self.filt = [] - for index in range(resolution): # split into multiphase filters - filt = h[index::resolution] - filt = filt[::-1] - self.filt.append(filt) - lengths = map(len, self.filt) - assert set(lengths) == set([2*width]) - assert len(self.filt) == resolution - - def get(self, offset): - # offset = k + (j / self.resolution) - k = int(offset) - j = int((offset - k) * self.resolution) - coeffs = self.filt[j] - return coeffs, k - self.width - -class Sampler(object): - def __init__(self, src, interp): - self.src = iter(src) - self.freq = 1.0 - self.interp = interp - coeffs, begin = self.interp.get(0) - self.offset = -begin # should fill samples buffer - self.buff = np.zeros(len(coeffs)) - self.index = 0 - - def __iter__(self): - return self - - def correct(self, offset=0): - assert self.freq + offset > 0 - self.offset += offset - - def next(self): - res = self._sample() - self.offset += self.freq - return res - - def _sample(self): - coeffs, begin = self.interp.get(self.offset) - end = begin + len(coeffs) - while True: - if self.index == end: - return np.dot(coeffs, self.buff) - - self.buff[:-1] = self.buff[1:] - self.buff[-1] = self.src.next() # throws StopIteration - self.index += 1 - def clip(x, lims): return min(max(x, lims[0]), lims[1])