mirror of
https://github.com/romanz/amodem.git
synced 2026-04-21 05:36:42 +08:00
qam: refactor modulation out of dsp.py
This commit is contained in:
@@ -4,8 +4,8 @@ import logging
|
|||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
from . import common
|
|
||||||
from .config import Ts, Nsym
|
from .config import Ts, Nsym
|
||||||
|
from .qam import QAM
|
||||||
|
|
||||||
|
|
||||||
class IIR(object):
|
class IIR(object):
|
||||||
@@ -63,64 +63,6 @@ def estimate(x, y, order, lookahead=0):
|
|||||||
return h[::-1]
|
return h[::-1]
|
||||||
|
|
||||||
|
|
||||||
class QAM(object):
|
|
||||||
def __init__(self, symbols):
|
|
||||||
self._enc = {}
|
|
||||||
symbols = np.array(list(symbols))
|
|
||||||
bits_per_symbol = np.log2(len(symbols))
|
|
||||||
bits_per_symbol = np.round(bits_per_symbol)
|
|
||||||
N = (2 ** bits_per_symbol)
|
|
||||||
assert N == len(symbols)
|
|
||||||
bits_per_symbol = int(bits_per_symbol)
|
|
||||||
|
|
||||||
for i, v in enumerate(symbols):
|
|
||||||
bits = [int(i & (1 << j) != 0) for j in range(bits_per_symbol)]
|
|
||||||
self._enc[tuple(bits)] = v
|
|
||||||
|
|
||||||
self._dec = {v: k for k, v in self._enc.items()}
|
|
||||||
self.symbols = symbols
|
|
||||||
self.bits_per_symbol = bits_per_symbol
|
|
||||||
|
|
||||||
reals = np.array(list(sorted(set(symbols.real))))
|
|
||||||
imags = np.array(list(sorted(set(symbols.imag))))
|
|
||||||
|
|
||||||
_mean = lambda u: float(sum(u))/len(u) if len(u) else 1.0
|
|
||||||
self.real_factor = 1.0 / _mean(np.diff(reals))
|
|
||||||
self.imag_factor = 1.0 / _mean(np.diff(imags))
|
|
||||||
self.bias = reals[0] + 1j * imags[0]
|
|
||||||
|
|
||||||
self.symbols_map = {}
|
|
||||||
for S in symbols:
|
|
||||||
s = S - self.bias
|
|
||||||
real_index = round(s.real * self.real_factor)
|
|
||||||
imag_index = round(s.imag * self.imag_factor)
|
|
||||||
self.symbols_map[real_index, imag_index] = (S, self._dec[S])
|
|
||||||
self.real_max = max(k[0] for k in self.symbols_map)
|
|
||||||
self.imag_max = max(k[1] for k in self.symbols_map)
|
|
||||||
|
|
||||||
def encode(self, bits):
|
|
||||||
for _, bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple):
|
|
||||||
yield self._enc[bits_tuple]
|
|
||||||
|
|
||||||
def decode(self, symbols, error_handler=None):
|
|
||||||
real_factor = self.real_factor
|
|
||||||
imag_factor = self.imag_factor
|
|
||||||
real_max = self.real_max
|
|
||||||
imag_max = self.imag_max
|
|
||||||
bias = self.bias
|
|
||||||
|
|
||||||
symbols_map = self.symbols_map
|
|
||||||
for S in symbols:
|
|
||||||
s = S - bias
|
|
||||||
real_index = min(max(s.real * real_factor, 0), real_max)
|
|
||||||
imag_index = min(max(s.imag * imag_factor, 0), imag_max)
|
|
||||||
key = (round(real_index), round(imag_index))
|
|
||||||
decoded_symbol, bits = symbols_map[key]
|
|
||||||
if error_handler:
|
|
||||||
error_handler(received=S, decoded=decoded_symbol)
|
|
||||||
yield bits
|
|
||||||
|
|
||||||
|
|
||||||
class Demux(object):
|
class Demux(object):
|
||||||
def __init__(self, sampler, freqs):
|
def __init__(self, sampler, freqs):
|
||||||
self.sampler = sampler
|
self.sampler = sampler
|
||||||
|
|||||||
60
amodem/qam.py
Normal file
60
amodem/qam.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import numpy as np
|
||||||
|
from . import common
|
||||||
|
|
||||||
|
|
||||||
|
class QAM(object):
|
||||||
|
def __init__(self, symbols):
|
||||||
|
self._enc = {}
|
||||||
|
symbols = np.array(list(symbols))
|
||||||
|
bits_per_symbol = np.log2(len(symbols))
|
||||||
|
bits_per_symbol = np.round(bits_per_symbol)
|
||||||
|
N = (2 ** bits_per_symbol)
|
||||||
|
assert N == len(symbols)
|
||||||
|
bits_per_symbol = int(bits_per_symbol)
|
||||||
|
|
||||||
|
for i, v in enumerate(symbols):
|
||||||
|
bits = [int(i & (1 << j) != 0) for j in range(bits_per_symbol)]
|
||||||
|
self._enc[tuple(bits)] = v
|
||||||
|
|
||||||
|
self._dec = {v: k for k, v in self._enc.items()}
|
||||||
|
self.symbols = symbols
|
||||||
|
self.bits_per_symbol = bits_per_symbol
|
||||||
|
|
||||||
|
reals = np.array(list(sorted(set(symbols.real))))
|
||||||
|
imags = np.array(list(sorted(set(symbols.imag))))
|
||||||
|
|
||||||
|
_mean = lambda u: float(sum(u))/len(u) if len(u) else 1.0
|
||||||
|
self.real_factor = 1.0 / _mean(np.diff(reals))
|
||||||
|
self.imag_factor = 1.0 / _mean(np.diff(imags))
|
||||||
|
self.bias = reals[0] + 1j * imags[0]
|
||||||
|
|
||||||
|
self.symbols_map = {}
|
||||||
|
for S in symbols:
|
||||||
|
s = S - self.bias
|
||||||
|
real_index = round(s.real * self.real_factor)
|
||||||
|
imag_index = round(s.imag * self.imag_factor)
|
||||||
|
self.symbols_map[real_index, imag_index] = (S, self._dec[S])
|
||||||
|
self.real_max = max(k[0] for k in self.symbols_map)
|
||||||
|
self.imag_max = max(k[1] for k in self.symbols_map)
|
||||||
|
|
||||||
|
def encode(self, bits):
|
||||||
|
for _, bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple):
|
||||||
|
yield self._enc[bits_tuple]
|
||||||
|
|
||||||
|
def decode(self, symbols, error_handler=None):
|
||||||
|
real_factor = self.real_factor
|
||||||
|
imag_factor = self.imag_factor
|
||||||
|
real_max = self.real_max
|
||||||
|
imag_max = self.imag_max
|
||||||
|
bias = self.bias
|
||||||
|
|
||||||
|
symbols_map = self.symbols_map
|
||||||
|
for S in symbols:
|
||||||
|
s = S - bias
|
||||||
|
real_index = min(max(s.real * real_factor, 0), real_max)
|
||||||
|
imag_index = min(max(s.imag * imag_factor, 0), imag_max)
|
||||||
|
key = (round(real_index), round(imag_index))
|
||||||
|
decoded_symbol, bits = symbols_map[key]
|
||||||
|
if error_handler:
|
||||||
|
error_handler(received=S, decoded=decoded_symbol)
|
||||||
|
yield bits
|
||||||
@@ -1,5 +1,3 @@
|
|||||||
import random
|
|
||||||
import itertools
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from numpy.linalg import norm
|
from numpy.linalg import norm
|
||||||
|
|
||||||
@@ -8,38 +6,6 @@ from amodem import config
|
|||||||
from amodem import sampling
|
from amodem import sampling
|
||||||
|
|
||||||
|
|
||||||
def test_qam():
|
|
||||||
q = dsp.QAM(config.symbols)
|
|
||||||
r = random.Random(0)
|
|
||||||
m = q.bits_per_symbol
|
|
||||||
bits = [tuple(r.randint(0, 1) for j in range(m)) for i in range(1024)]
|
|
||||||
stream = itertools.chain(*bits)
|
|
||||||
S = list(q.encode(list(stream)))
|
|
||||||
decoded = list(q.decode(S))
|
|
||||||
assert decoded == bits
|
|
||||||
|
|
||||||
noise = lambda A: A*(r.uniform(-1, 1) + 1j*r.uniform(-1, 1))
|
|
||||||
noised_symbols = [(s + noise(1e-3)) for s in S]
|
|
||||||
decoded = list(q.decode(noised_symbols))
|
|
||||||
assert decoded == bits
|
|
||||||
|
|
||||||
|
|
||||||
def quantize(q, s):
|
|
||||||
bits, = list(q.decode([s]))
|
|
||||||
r, = q.encode(bits)
|
|
||||||
index = np.argmin(np.abs(s - q.symbols))
|
|
||||||
expected = q.symbols[index]
|
|
||||||
assert r == expected
|
|
||||||
|
|
||||||
|
|
||||||
def test_overflow():
|
|
||||||
q = dsp.QAM(config.symbols)
|
|
||||||
r = np.random.RandomState(seed=0)
|
|
||||||
for i in range(10000):
|
|
||||||
s = 10*(r.normal() + 1j * r.normal())
|
|
||||||
quantize(q, s)
|
|
||||||
|
|
||||||
|
|
||||||
def test_linreg():
|
def test_linreg():
|
||||||
x = np.array([1, 3, 2, 8, 4, 6, 9, 7, 0, 5])
|
x = np.array([1, 3, 2, 8, 4, 6, 9, 7, 0, 5])
|
||||||
a, b = 12.3, 4.56
|
a, b = 12.3, 4.56
|
||||||
|
|||||||
39
tests/test_qam.py
Normal file
39
tests/test_qam.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
import random
|
||||||
|
import itertools
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from amodem import qam
|
||||||
|
from amodem import config
|
||||||
|
|
||||||
|
|
||||||
|
def test_qam():
|
||||||
|
q = qam.QAM(config.symbols)
|
||||||
|
r = random.Random(0)
|
||||||
|
m = q.bits_per_symbol
|
||||||
|
bits = [tuple(r.randint(0, 1) for j in range(m)) for i in range(1024)]
|
||||||
|
stream = itertools.chain(*bits)
|
||||||
|
S = list(q.encode(list(stream)))
|
||||||
|
decoded = list(q.decode(S))
|
||||||
|
assert decoded == bits
|
||||||
|
|
||||||
|
noise = lambda A: A*(r.uniform(-1, 1) + 1j*r.uniform(-1, 1))
|
||||||
|
noised_symbols = [(s + noise(1e-3)) for s in S]
|
||||||
|
decoded = list(q.decode(noised_symbols))
|
||||||
|
assert decoded == bits
|
||||||
|
|
||||||
|
|
||||||
|
def quantize(q, s):
|
||||||
|
bits, = list(q.decode([s]))
|
||||||
|
r, = q.encode(bits)
|
||||||
|
index = np.argmin(np.abs(s - q.symbols))
|
||||||
|
expected = q.symbols[index]
|
||||||
|
assert r == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_overflow():
|
||||||
|
q = qam.QAM(config.symbols)
|
||||||
|
r = np.random.RandomState(seed=0)
|
||||||
|
for i in range(10000):
|
||||||
|
s = 10*(r.normal() + 1j * r.normal())
|
||||||
|
quantize(q, s)
|
||||||
Reference in New Issue
Block a user