From 88838fc72e6a8b17b45ea64a81b741ecb850f7cf Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Mon, 21 Jul 2014 11:18:17 +0300 Subject: [PATCH] add file streaming support --- stream.py | 33 +++++++++++++++++++++++++++++++++ test_stream.py | 22 ++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 stream.py create mode 100644 test_stream.py diff --git a/stream.py b/stream.py new file mode 100644 index 0000000..c8074e0 --- /dev/null +++ b/stream.py @@ -0,0 +1,33 @@ +import time +import common +import wave + + +class FileBuffer(object): + + SAMPLES = 4096 + BUFSIZE = int(SAMPLES * wave.bytes_per_sample) + WAIT = 0.1 + TIMEOUT = 2.0 + + def __init__(self, fd): + self.fd = fd + + def __iter__(self): + return self + + def next(self): + block = bytearray() + finish_time = time.time() + self.TIMEOUT + while time.time() <= finish_time: + left = self.BUFSIZE - len(block) + data = self.fd.read(left) + if data: + block.extend(data) + if len(block) == self.BUFSIZE: + _, values = common.loads(str(block)) + return values + + time.sleep(self.WAIT) + + raise IOError('timeout') diff --git a/test_stream.py b/test_stream.py new file mode 100644 index 0000000..0ecf0bd --- /dev/null +++ b/test_stream.py @@ -0,0 +1,22 @@ +import stream +import wave + + +def test(): + p = wave.record('-', stdout=wave.sp.PIPE) + f = stream.FileBuffer(p.stdout) + + result = zip(range(10), f) + p.stop() + + j = 0 + for i, buf in result: + assert i == j + assert len(buf) == f.SAMPLES + j += 1 + + try: + for buf in f: + pass + except IOError as e: + assert str(e) == 'timeout'