diff --git a/amodem/dsp.py b/amodem/dsp.py index 8709303..65a018b 100644 --- a/amodem/dsp.py +++ b/amodem/dsp.py @@ -4,8 +4,8 @@ import logging log = logging.getLogger(__name__) -from . import common from .config import Ts, Nsym +from .qam import QAM class IIR(object): @@ -63,64 +63,6 @@ def estimate(x, y, order, lookahead=0): return h[::-1] -class QAM(object): - def __init__(self, symbols): - self._enc = {} - symbols = np.array(list(symbols)) - bits_per_symbol = np.log2(len(symbols)) - bits_per_symbol = np.round(bits_per_symbol) - N = (2 ** bits_per_symbol) - assert N == len(symbols) - bits_per_symbol = int(bits_per_symbol) - - for i, v in enumerate(symbols): - bits = [int(i & (1 << j) != 0) for j in range(bits_per_symbol)] - self._enc[tuple(bits)] = v - - self._dec = {v: k for k, v in self._enc.items()} - self.symbols = symbols - self.bits_per_symbol = bits_per_symbol - - reals = np.array(list(sorted(set(symbols.real)))) - imags = np.array(list(sorted(set(symbols.imag)))) - - _mean = lambda u: float(sum(u))/len(u) if len(u) else 1.0 - self.real_factor = 1.0 / _mean(np.diff(reals)) - self.imag_factor = 1.0 / _mean(np.diff(imags)) - self.bias = reals[0] + 1j * imags[0] - - self.symbols_map = {} - for S in symbols: - s = S - self.bias - real_index = round(s.real * self.real_factor) - imag_index = round(s.imag * self.imag_factor) - self.symbols_map[real_index, imag_index] = (S, self._dec[S]) - self.real_max = max(k[0] for k in self.symbols_map) - self.imag_max = max(k[1] for k in self.symbols_map) - - def encode(self, bits): - for _, bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple): - yield self._enc[bits_tuple] - - def decode(self, symbols, error_handler=None): - real_factor = self.real_factor - imag_factor = self.imag_factor - real_max = self.real_max - imag_max = self.imag_max - bias = self.bias - - symbols_map = self.symbols_map - for S in symbols: - s = S - bias - real_index = min(max(s.real * real_factor, 0), real_max) - imag_index = min(max(s.imag * imag_factor, 0), imag_max) - key = (round(real_index), round(imag_index)) - decoded_symbol, bits = symbols_map[key] - if error_handler: - error_handler(received=S, decoded=decoded_symbol) - yield bits - - class Demux(object): def __init__(self, sampler, freqs): self.sampler = sampler diff --git a/amodem/qam.py b/amodem/qam.py new file mode 100644 index 0000000..aef59f0 --- /dev/null +++ b/amodem/qam.py @@ -0,0 +1,60 @@ +import numpy as np +from . import common + + +class QAM(object): + def __init__(self, symbols): + self._enc = {} + symbols = np.array(list(symbols)) + bits_per_symbol = np.log2(len(symbols)) + bits_per_symbol = np.round(bits_per_symbol) + N = (2 ** bits_per_symbol) + assert N == len(symbols) + bits_per_symbol = int(bits_per_symbol) + + for i, v in enumerate(symbols): + bits = [int(i & (1 << j) != 0) for j in range(bits_per_symbol)] + self._enc[tuple(bits)] = v + + self._dec = {v: k for k, v in self._enc.items()} + self.symbols = symbols + self.bits_per_symbol = bits_per_symbol + + reals = np.array(list(sorted(set(symbols.real)))) + imags = np.array(list(sorted(set(symbols.imag)))) + + _mean = lambda u: float(sum(u))/len(u) if len(u) else 1.0 + self.real_factor = 1.0 / _mean(np.diff(reals)) + self.imag_factor = 1.0 / _mean(np.diff(imags)) + self.bias = reals[0] + 1j * imags[0] + + self.symbols_map = {} + for S in symbols: + s = S - self.bias + real_index = round(s.real * self.real_factor) + imag_index = round(s.imag * self.imag_factor) + self.symbols_map[real_index, imag_index] = (S, self._dec[S]) + self.real_max = max(k[0] for k in self.symbols_map) + self.imag_max = max(k[1] for k in self.symbols_map) + + def encode(self, bits): + for _, bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple): + yield self._enc[bits_tuple] + + def decode(self, symbols, error_handler=None): + real_factor = self.real_factor + imag_factor = self.imag_factor + real_max = self.real_max + imag_max = self.imag_max + bias = self.bias + + symbols_map = self.symbols_map + for S in symbols: + s = S - bias + real_index = min(max(s.real * real_factor, 0), real_max) + imag_index = min(max(s.imag * imag_factor, 0), imag_max) + key = (round(real_index), round(imag_index)) + decoded_symbol, bits = symbols_map[key] + if error_handler: + error_handler(received=S, decoded=decoded_symbol) + yield bits diff --git a/tests/test_dsp.py b/tests/test_dsp.py index d586c9e..cdc109a 100644 --- a/tests/test_dsp.py +++ b/tests/test_dsp.py @@ -1,5 +1,3 @@ -import random -import itertools import numpy as np from numpy.linalg import norm @@ -8,38 +6,6 @@ from amodem import config from amodem import sampling -def test_qam(): - q = dsp.QAM(config.symbols) - r = random.Random(0) - m = q.bits_per_symbol - bits = [tuple(r.randint(0, 1) for j in range(m)) for i in range(1024)] - stream = itertools.chain(*bits) - S = list(q.encode(list(stream))) - decoded = list(q.decode(S)) - assert decoded == bits - - noise = lambda A: A*(r.uniform(-1, 1) + 1j*r.uniform(-1, 1)) - noised_symbols = [(s + noise(1e-3)) for s in S] - decoded = list(q.decode(noised_symbols)) - assert decoded == bits - - -def quantize(q, s): - bits, = list(q.decode([s])) - r, = q.encode(bits) - index = np.argmin(np.abs(s - q.symbols)) - expected = q.symbols[index] - assert r == expected - - -def test_overflow(): - q = dsp.QAM(config.symbols) - r = np.random.RandomState(seed=0) - for i in range(10000): - s = 10*(r.normal() + 1j * r.normal()) - quantize(q, s) - - def test_linreg(): x = np.array([1, 3, 2, 8, 4, 6, 9, 7, 0, 5]) a, b = 12.3, 4.56 diff --git a/tests/test_qam.py b/tests/test_qam.py new file mode 100644 index 0000000..29faf65 --- /dev/null +++ b/tests/test_qam.py @@ -0,0 +1,39 @@ +import random +import itertools + +import numpy as np + +from amodem import qam +from amodem import config + + +def test_qam(): + q = qam.QAM(config.symbols) + r = random.Random(0) + m = q.bits_per_symbol + bits = [tuple(r.randint(0, 1) for j in range(m)) for i in range(1024)] + stream = itertools.chain(*bits) + S = list(q.encode(list(stream))) + decoded = list(q.decode(S)) + assert decoded == bits + + noise = lambda A: A*(r.uniform(-1, 1) + 1j*r.uniform(-1, 1)) + noised_symbols = [(s + noise(1e-3)) for s in S] + decoded = list(q.decode(noised_symbols)) + assert decoded == bits + + +def quantize(q, s): + bits, = list(q.decode([s])) + r, = q.encode(bits) + index = np.argmin(np.abs(s - q.symbols)) + expected = q.symbols[index] + assert r == expected + + +def test_overflow(): + q = qam.QAM(config.symbols) + r = np.random.RandomState(seed=0) + for i in range(10000): + s = 10*(r.normal() + 1j * r.normal()) + quantize(q, s)