mirror of
https://github.com/romanz/amodem.git
synced 2026-03-06 23:05:57 +08:00
add test_calib.py
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
import sys
|
||||
import numpy as np
|
||||
|
||||
from . import common
|
||||
@@ -10,6 +11,7 @@ Tsample = 1
|
||||
t = np.arange(int(Tsample * config.Fs)) * config.Ts
|
||||
sig = np.exp(2j * np.pi * config.Fc * t)
|
||||
sig_dump = common.dumps(sig)
|
||||
fmt = 'coherence={:.3f} amplitude={:.3f} phase={:+.1f} peak={:.3f}\n'
|
||||
|
||||
|
||||
def send(wave_play=wave.play):
|
||||
@@ -24,7 +26,7 @@ def send(wave_play=wave.play):
|
||||
p.kill()
|
||||
|
||||
|
||||
def recv(wave_record=wave.record):
|
||||
def recv(wave_record=wave.record, reporter=sys.stdout.write):
|
||||
p = wave_record('-', stdout=wave.sp.PIPE)
|
||||
try:
|
||||
while True:
|
||||
@@ -45,11 +47,11 @@ def recv(wave_record=wave.record):
|
||||
phase = np.angle(z)
|
||||
peak = np.max(np.abs(x))
|
||||
|
||||
fmt = 'coherence={:.3f} amplitude={:.3f} phase={:+.1f} peak={:.3f}'
|
||||
print(fmt.format(coherence, amplitude, phase * 180 / np.pi, peak))
|
||||
reporter(fmt.format(coherence, amplitude, phase * 180/np.pi, peak))
|
||||
except KeyboardInterrupt:
|
||||
p.kill()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
p = argparse.ArgumentParser()
|
||||
|
||||
35
tests/test_calib.py
Normal file
35
tests/test_calib.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from amodem import calib
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
|
||||
class ProcessMock(object):
|
||||
def __init__(self):
|
||||
self.buf = BytesIO()
|
||||
self.stdin = self
|
||||
self.stdout = self
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def kill(self):
|
||||
pass
|
||||
|
||||
def write(self, data):
|
||||
self.buf.write(data)
|
||||
if self.buf.tell() > 1e6:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
def read(self, n):
|
||||
return self.buf.read(n)
|
||||
|
||||
|
||||
def verify(msg):
|
||||
assert msg == calib.fmt.format(1, 1, 0, 1)
|
||||
|
||||
|
||||
def test():
|
||||
p = ProcessMock()
|
||||
calib.send(p)
|
||||
p.buf.seek(0)
|
||||
calib.recv(p, reporter=verify)
|
||||
Reference in New Issue
Block a user