mirror of
https://github.com/romanz/amodem.git
synced 2026-02-06 16:48:06 +08:00
31 lines
587 B
Python
31 lines
587 B
Python
import mock
|
|
import time
|
|
import pytest
|
|
from amodem import async_reader
|
|
import logging
|
|
|
|
logging.basicConfig(format='%(message)s')
|
|
|
|
|
|
def test_async_reader():
|
|
def _read(n):
|
|
time.sleep(n * 0.1)
|
|
return b'\x00' * n
|
|
s = mock.Mock()
|
|
s.read = _read
|
|
r = async_reader.AsyncReader(s, 1)
|
|
|
|
n = 5
|
|
assert r.read(n) == b'\x00' * n
|
|
r.close()
|
|
assert r.stream is None
|
|
r.close()
|
|
|
|
|
|
def test_async_reader_error():
|
|
s = mock.Mock()
|
|
s.read.side_effect = IOError()
|
|
r = async_reader.AsyncReader(s, 1)
|
|
with pytest.raises(IOError):
|
|
r.read(3)
|