refactor sampling module

This commit is contained in:
Roman Zeyde
2014-08-09 15:10:35 +03:00
parent 67da70c9df
commit 8ad3fb4371
3 changed files with 40 additions and 29 deletions

47
amodem/sampling.py Executable file → Normal file
View File

@@ -43,34 +43,33 @@ class Sampler(object):
def take(self, size):
frame = np.zeros(size)
count = 0
try:
for frame_index in range(size):
offset = self.offset
# offset = k + (j / self.resolution)
k = int(offset) # integer part
j = int((offset - k) * self.resolution) # fractional part
coeffs = self.filt[j]
end = k + self.width
while self.index < end:
self.buff[:-1] = self.buff[1:]
self.buff[-1] = next(self.src) # throws StopIteration
self.index += 1
for frame_index in range(size):
offset = self.offset
# offset = k + (j / self.resolution)
k = int(offset) # integer part
j = int((offset - k) * self.resolution) # fractional part
coeffs = self.filt[j]
end = k + self.width
while self.index < end:
self.buff[:-1] = self.buff[1:]
self.buff[-1] = next(self.src) # throws StopIteration
self.index += 1
self.offset += self.freq
frame[frame_index] = np.dot(coeffs, self.buff) * self.gain
count = frame_index + 1
except StopIteration:
pass
self.offset += self.freq
frame[frame_index] = np.dot(coeffs, self.buff) * self.gain
return frame
return frame[:count]
if __name__ == '__main__':
def resample(src, dst, df=0.0):
import common
import sys
df, = sys.argv[1:]
df = float(df)
x = common.load(sys.stdin)
x = common.load(src)
sampler = Sampler(x, Interpolator())
sampler.freq += df
y = np.array(list(sampler))
y = common.dumps(y)
sys.stdout.write(y)
y = sampler.take(len(x))
dst.write(common.dumps(y))

4
scripts/resample.py Executable file
View File

@@ -0,0 +1,4 @@
from amodem.sampling import resample
import sys
resample(src=sys.stdin, dst=sys.stdout, df=float(sys.argv[1]))

View File

@@ -1,14 +1,22 @@
from amodem import sampling
from amodem import common
import numpy as np
from io import BytesIO
def test_resample():
x = np.arange(300)
s = sampling.Sampler(x, interp=sampling.Interpolator())
y = s.take(len(x) - s.interp.width - 1)
x = np.sin(2*np.pi * 10 * np.linspace(0, 1, 1001))
src = BytesIO(common.dumps(x))
dst = BytesIO()
sampling.resample(src=src, dst=dst, df=0.0)
y = common.loads(dst.getvalue())
err = x[1:len(y)+1] - y
assert np.max(np.abs(err)) < 1e-10
assert np.max(np.abs(err)) < 1e-4
dst = BytesIO()
sampling.resample(src=BytesIO('\x00\x00'), dst=dst, df=0.0)
assert dst.tell() == 0
def test_coeffs():