Files
amodem/sigproc.py
2014-08-01 10:42:08 +03:00

112 lines
2.8 KiB
Python

import numpy as np
from numpy import linalg
import common
from config import Ts, Nsym
class Filter(object):
def __init__(self, b, a):
self.b = np.array(b) / a[0]
self.a = np.array(a[1:]) / a[0]
def __call__(self, x):
x_ = [0] * len(self.b)
y_ = [0] * len(self.a)
for v in x:
x_ = [v] + x_[:-1]
y = np.dot(x_, self.b) - np.dot(y_, self.a)
y_ = [y] + y_[1:]
yield y
def train(S, training):
A = np.array([S[1:], S[:-1], training[:-1]]).T
b = training[1:]
b0, b1, a1 = linalg.lstsq(A, b)[0]
return Filter(b=[b0, b1], a=[1, -a1])
class QAM(object):
def __init__(self, symbols):
self._enc = {}
symbols = np.array(list(symbols))
bits_per_symbol = np.log2(len(symbols))
bits_per_symbol = np.round(bits_per_symbol)
N = (2 ** bits_per_symbol)
assert N == len(symbols)
bits_per_symbol = int(bits_per_symbol)
for i, v in enumerate(symbols):
bits = tuple(int(i & (1 << j) != 0) for j in range(bits_per_symbol))
self._enc[bits] = v
self._dec = {v: k for k, v in self._enc.items()}
self.symbols = symbols
self.bits_per_symbol = bits_per_symbol
def encode(self, bits):
for _, bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple):
yield self._enc[bits_tuple]
def decode(self, symbols, error_handler=None):
for s in symbols:
index = np.argmin(np.abs(s - self.symbols))
S = self.symbols[index]
if error_handler:
error_handler(received=s, decoded=S)
yield self._dec[S]
class MODEM(object):
def __init__(self, config):
self.qam = QAM(config.symbols)
self.baud = config.baud
self.freqs = config.frequencies
self.bits_per_baud = self.qam.bits_per_symbol * len(self.freqs)
self.modem_bps = self.baud * self.bits_per_baud
def clip(x, lims):
return min(max(x, lims[0]), lims[1])
def power(x):
return np.dot(x.conj(), x).real / len(x)
def exp_iwt(freq, n):
iwt = 2j * np.pi * freq * np.arange(n) * Ts
return np.exp(iwt)
def norm(x):
return np.sqrt(np.dot(x.conj(), x).real)
def coherence(x, freq):
n = len(x)
Hc = exp_iwt(-freq, n) / np.sqrt(0.5*n)
norm_x = norm(x)
if norm_x:
return np.dot(Hc, x) / norm_x
else:
return 0.0
def extract_symbols(x, freq, offset=0):
Hc = exp_iwt(-freq, Nsym) / (0.5*Nsym)
for _, symbol in common.iterate(x, Nsym):
yield np.dot(Hc, symbol)
def linear_regression(x, y):
''' Find (a,b) such that y = a*x + b. '''
x = np.array(x)
y = np.array(y)
ones = np.ones(len(x))
M = np.array([x, ones]).T
a, b = linalg.lstsq(M, y)[0]
return a, b