Files
amodem/tests/test_stream.py
2014-08-07 17:02:29 +03:00

36 lines
630 B
Python

from amodem import stream
import subprocess as sp
script = br"""
import sys
import time
import os
while True:
time.sleep(0.1)
sys.stdout.write('\x00' * 6400)
sys.stderr.write('.')
"""
def test_read():
p = sp.Popen(args=['python', '-'], stdin=sp.PIPE, stdout=sp.PIPE)
p.stdin.write(script)
p.stdin.close()
f = stream.Reader(p.stdout)
result = zip(range(10), f)
p.kill()
j = 0
for i, buf in result:
assert i == j
assert len(buf) == f.bufsize
j += 1
try:
for buf in f:
pass
except IOError as e:
assert str(e) == 'timeout'