simplify common.iterate()

This commit is contained in:
Roman Zeyde
2014-07-12 16:18:06 +03:00
parent cb28d33908
commit b261f1b102
4 changed files with 31 additions and 29 deletions

View File

@@ -1,4 +1,5 @@
import functools
import itertools
import numpy as np
import logging
@@ -59,25 +60,19 @@ def dumps(sym, n=1):
return data * n
def iterate(data, bufsize, offset=0, advance=1, func=None):
assert bufsize > 0
assert offset >= 0
assert advance > 0
buf = np.zeros(bufsize)
buf_index = 0
for data_index, value in enumerate(data):
if data_index < offset:
continue
def iterate(data, size, func=None):
offset = 0
data = iter(data)
buf[buf_index] = value
buf_index += 1
while True:
buf = list(itertools.islice(data, size))
if len(buf) < size:
return
if buf_index == bufsize:
result = func(buf) if func else buf
yield offset, result
buf[:-advance] = buf[advance:]
buf_index = max(0, buf_index - advance)
offset += advance
buf = np.array(buf)
result = func(buf) if func else buf
yield offset, result
offset += size
class Splitter(object):

View File

@@ -28,7 +28,7 @@ CARRIER_THRESHOLD = int(0.95 * CARRIER_DURATION)
def detect(x, freq):
counter = 0
for offset, buf in iterate(x, Nsym, advance=Nsym):
for offset, buf in iterate(x, Nsym):
coeff = sigproc.coherence(buf, Fc)
if abs(coeff) > COHERENCE_THRESHOLD:
counter += 1
@@ -185,7 +185,7 @@ def main(fname):
if data_bits is None:
log.warning('Training failed!')
else:
data = iterate(data_bits, bufsize=8, advance=8, func=to_byte)
data = iterate(data_bits, 8, func=to_byte)
data = ''.join(c for _, c in data)
import ecc
data = ecc.decode(data)

View File

@@ -91,7 +91,7 @@ def extract_symbols(x, freq, offset=0):
Hc = exp_iwt(-freq, common.Nsym) / (0.5*common.Nsym)
func = lambda y: np.dot(Hc, y)
iterator = common.iterate(x, common.Nsym, advance=common.Nsym, func=func)
iterator = common.iterate(x, common.Nsym, func=func)
for _, symbol in iterator:
yield symbol

View File

@@ -1,20 +1,26 @@
import common
import numpy as np
def iterlist(x, *args, **kwargs):
x = np.array(x)
return [(offset, list(buf)) for offset, buf in common.iterate(x, *args, **kwargs)]
return list((i, list(x)) for i, x in common.iterate(x, *args, **kwargs))
def test_iterate():
N = 10
assert iterlist(range(N), 1) == [(i, [i]) for i in range(N)]
assert iterlist(range(N), 1) == [(i, [i]) for i in range(N)]
assert iterlist(range(N), 2) == [(i, [i, i+1]) for i in range(N-1)]
assert iterlist(range(N), 3) == [(i, [i, i+1, i+2]) for i in range(N-2)]
assert iterlist(range(N), 3, advance=2) == [(i, [i, i+1, i+2]) for i in range(0, N-2, 2)]
assert iterlist(range(N), 3, advance=3) == [(i, [i, i+1, i+2]) for i in range(0, N-2, 3)]
assert iterlist(range(N), 2, offset=5) == [(i, [i, i+1]) for i in range(5, N-1)]
assert iterlist(range(N), 1, func=lambda b: -b) == [(i, [-i]) for i in range(N)]
assert iterlist(range(N), 1) == [
(i, [i]) for i in range(N)]
assert iterlist(range(N), 2) == [
(i, [i, i+1]) for i in range(0, N-1, 2)]
assert iterlist(range(N), 3) == [
(i, [i, i+1, i+2]) for i in range(0, N-2, 3)]
assert iterlist(range(N), 1, func=lambda b: -b) == [
(i, [-i]) for i in range(N)]
def test_split():
L = [(i*2, i*2+1) for i in range(10)]
@@ -30,6 +36,7 @@ def test_split():
except IndexError as e:
assert e.args == (i,)
def test_icapture():
x = range(100)
y = []