fix stream implementation

This commit is contained in:
Roman Zeyde
2014-08-07 17:04:27 +03:00
parent 2030b0942f
commit 0b91ef76c5
2 changed files with 22 additions and 18 deletions

View File

@@ -1,21 +1,25 @@
import time import time
import logging
log = logging.getLogger(__name__)
class Timeout(Exception):
pass
class Reader(object): class Reader(object):
SAMPLES = 4096 def __init__(self, fd, data_type=None, bufsize=4096,
BUFSIZE = int(SAMPLES * wave.bytes_per_sample) eof=False, timeout=2.0, wait=0.2):
WAIT = 0.1
TIMEOUT = 2.0
def __init__(self, fd, data_type=None, bufsize=None, eof=False):
self.fd = fd self.fd = fd
self.check = None
self.total = 0
self.bufsize = bufsize if (bufsize is not None) else self.BUFSIZE
self.eof = eof
self.data_type = data_type if (data_type is not None) else lambda x: x self.data_type = data_type if (data_type is not None) else lambda x: x
self.bufsize = bufsize
self.eof = eof
self.timeout = timeout
self.wait = wait
self.total = 0
self.check = None
def __iter__(self): def __iter__(self):
return self return self
@@ -26,7 +30,7 @@ class Reader(object):
def next(self): def next(self):
block = bytearray() block = bytearray()
if self.eof: if self.eof:
data = self.fd.read(self.BUFSIZE) data = self.fd.read(self.bufsize)
if data: if data:
self.total += len(data) self.total += len(data)
block.extend(data) block.extend(data)
@@ -34,20 +38,20 @@ class Reader(object):
else: else:
raise StopIteration() raise StopIteration()
finish_time = time.time() + self.TIMEOUT finish_time = time.time() + self.timeout
while time.time() <= finish_time: while time.time() <= finish_time:
left = self.BUFSIZE - len(block) left = self.bufsize - len(block)
data = self.fd.read(left) data = self.fd.read(left)
if data: if data:
self.total += len(data) self.total += len(data)
block.extend(data) block.extend(data)
if len(block) == self.BUFSIZE: if len(block) == self.bufsize:
values = self.data_type(block) values = self.data_type(block)
if self.check: if self.check:
self.check(values) self.check(values)
return values return values
time.sleep(self.WAIT) time.sleep(self.wait)
raise IOError('timeout') raise Timeout(self.timeout)

View File

@@ -31,5 +31,5 @@ def test_read():
try: try:
for buf in f: for buf in f:
pass pass
except IOError as e: except stream.Timeout as e:
assert str(e) == 'timeout' assert e.args == (f.timeout,)