mirror of
https://github.com/romanz/amodem.git
synced 2026-03-17 07:05:59 +08:00
move DSP stuff to sigproc
This commit is contained in:
@@ -48,10 +48,6 @@ def dumps(sym, n=1):
|
||||
data = sym.tostring()
|
||||
return data * n
|
||||
|
||||
def norm(x):
|
||||
return np.sqrt(np.dot(x.conj(), x).real)
|
||||
|
||||
|
||||
def iterate(data, bufsize, offset=0, advance=1, func=None):
|
||||
assert bufsize > 0
|
||||
assert offset >= 0
|
||||
|
||||
25
recv.py
25
recv.py
@@ -16,23 +16,10 @@ COHERENCE_THRESHOLD = 0.95
|
||||
CARRIER_DURATION = 300
|
||||
CARRIER_THRESHOLD = int(0.95 * CARRIER_DURATION)
|
||||
|
||||
def power(x):
|
||||
return np.dot(x.conj(), x).real / len(x)
|
||||
|
||||
def exp_iwt(freq, n):
|
||||
iw = 2j * np.pi * freq
|
||||
t = np.arange(n) * Ts
|
||||
return np.exp(iw * t)
|
||||
|
||||
def coherence(x, freq):
|
||||
n = len(x)
|
||||
Hc = exp_iwt(-freq, n) / np.sqrt(0.5*n)
|
||||
return np.dot(Hc, x) / norm(x)
|
||||
|
||||
def detect(x, freq):
|
||||
counter = 0
|
||||
for offset, buf in iterate(x, Nsym, advance=Nsym):
|
||||
coeff = coherence(buf, Fc)
|
||||
coeff = sigproc.coherence(buf, Fc)
|
||||
if abs(coeff) > COHERENCE_THRESHOLD:
|
||||
counter += 1
|
||||
else:
|
||||
@@ -48,7 +35,7 @@ def find_start(x, start):
|
||||
begin, end = start - WINDOW, start + length + WINDOW
|
||||
x_ = x[begin:end]
|
||||
|
||||
Hc = exp_iwt(Fc, len(x_))
|
||||
Hc = sigproc.exp_iwt(Fc, len(x_))
|
||||
P = np.abs(Hc.conj() * x_) ** 2
|
||||
cumsumP = P.cumsum()
|
||||
start = begin + np.argmax(cumsumP[length:] - cumsumP[:-length])
|
||||
@@ -56,7 +43,7 @@ def find_start(x, start):
|
||||
return start
|
||||
|
||||
def extract_symbols(x, freq, offset=0):
|
||||
Hc = exp_iwt(-freq, Nsym) / (0.5*Nsym)
|
||||
Hc = sigproc.exp_iwt(-freq, Nsym) / (0.5*Nsym)
|
||||
func = lambda y: np.dot(Hc, y)
|
||||
for _, symbol in iterate(x, Nsym, advance=Nsym, func=func):
|
||||
yield symbol
|
||||
@@ -97,7 +84,7 @@ def receive(x, freqs):
|
||||
return None
|
||||
|
||||
noise = y - train_result
|
||||
Pnoise = power(noise)
|
||||
Pnoise = sigproc.power(noise)
|
||||
log.debug('{:10.1f}Hz: Noise sigma={:.4f}, SNR={:.1f} dB'.format( freq, Pnoise**0.5, 10*np.log10(1/Pnoise) ))
|
||||
|
||||
x = x[len(training)*Nsym:]
|
||||
@@ -126,11 +113,11 @@ def main(fname):
|
||||
|
||||
begin, end = result
|
||||
x_ = x[begin:end]
|
||||
Hc = exp_iwt(-Fc, len(x_))
|
||||
Hc = sigproc.exp_iwt(-Fc, len(x_))
|
||||
Zc = np.dot(Hc, x_) / (0.5*len(x_))
|
||||
amp = abs(Zc)
|
||||
log.info('Carrier detected at ~{:.1f} ms @ {:.1f} kHz: coherence={:.3f}%, amplitude={:.3f}'.format(
|
||||
begin * Tsym * 1e3 / Nsym, Fc / 1e3, abs(coherence(x_, Fc)) * 100, amp
|
||||
begin * Tsym * 1e3 / Nsym, Fc / 1e3, abs(sigproc.coherence(x_, Fc)) * 100, amp
|
||||
))
|
||||
|
||||
start = find_start(x, begin)
|
||||
|
||||
17
sigproc.py
17
sigproc.py
@@ -1,6 +1,8 @@
|
||||
import numpy as np
|
||||
from numpy import linalg
|
||||
|
||||
import common
|
||||
|
||||
def lfilter(b, a, x):
|
||||
b = np.array(b) / a[0]
|
||||
a = np.array(a[1:]) / a[0]
|
||||
@@ -54,3 +56,18 @@ modulator = QAM(bits_per_symbol=2, radii=[1.0])
|
||||
|
||||
def clip(x, lims):
|
||||
return min(max(x, lims[0]), lims[1])
|
||||
|
||||
def power(x):
|
||||
return np.dot(x.conj(), x).real / len(x)
|
||||
|
||||
def exp_iwt(freq, n):
|
||||
iwt = 2j * np.pi * freq * np.arange(n) * common.Ts
|
||||
return np.exp(iwt)
|
||||
|
||||
def norm(x):
|
||||
return np.sqrt(np.dot(x.conj(), x).real)
|
||||
|
||||
def coherence(x, freq):
|
||||
n = len(x)
|
||||
Hc = exp_iwt(-freq, n) / np.sqrt(0.5*n)
|
||||
return np.dot(Hc, x) / norm(x)
|
||||
|
||||
Reference in New Issue
Block a user