mirror of
https://github.com/romanz/amodem.git
synced 2026-03-30 23:57:54 +08:00
async: remove AsyncWriter
we are not expecting real-time problems on the sender's side.
This commit is contained in:
@@ -70,8 +70,7 @@ def FileType(mode, audio_interface=None):
|
||||
s = audio_interface.recorder()
|
||||
return async.AsyncReader(stream=s, bufsize=s.bufsize)
|
||||
if 'w' in mode:
|
||||
s = audio_interface.player()
|
||||
return async.AsyncWriter(stream=s)
|
||||
return audio_interface.player()
|
||||
|
||||
if fname == '-':
|
||||
if 'r' in mode:
|
||||
@@ -208,6 +207,7 @@ def _main():
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
log.debug('Closing input and output')
|
||||
args.src.close()
|
||||
args.dst.close()
|
||||
|
||||
|
||||
@@ -47,46 +47,3 @@ class AsyncReader(object):
|
||||
self.thread.join()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
|
||||
|
||||
class AsyncWriter(object):
|
||||
def __init__(self, stream):
|
||||
self.stream = stream
|
||||
self.queue = six.moves.queue.Queue()
|
||||
self.error = threading.Event()
|
||||
self.stop = threading.Event()
|
||||
self.args = (stream, self.queue, self.error)
|
||||
self.thread = None
|
||||
|
||||
@staticmethod
|
||||
def _thread(dst, queue, error):
|
||||
total = 0
|
||||
try:
|
||||
log.debug('AsyncWriter thread started')
|
||||
while True:
|
||||
buf = queue.get(block=False)
|
||||
if buf is None:
|
||||
break
|
||||
dst.write(buf)
|
||||
total += len(buf)
|
||||
log.debug('AsyncWriter thread stopped (written %d bytes)', total)
|
||||
except BaseException:
|
||||
log.exception('AsyncWriter thread failed')
|
||||
error.set()
|
||||
|
||||
def write(self, buf):
|
||||
if self.error.isSet():
|
||||
raise IOError('cannot write to stream')
|
||||
|
||||
self.queue.put(buf)
|
||||
if self.thread is None:
|
||||
self.thread = threading.Thread(target=AsyncWriter._thread,
|
||||
args=self.args, name='AsyncWriter')
|
||||
self.thread.start() # start only after there is data to write
|
||||
|
||||
def close(self):
|
||||
if self.stream is not None:
|
||||
self.queue.put(None)
|
||||
self.thread.join()
|
||||
self.stream.close()
|
||||
self.stream = None
|
||||
|
||||
@@ -22,49 +22,9 @@ def test_async_reader():
|
||||
r.close()
|
||||
|
||||
|
||||
def test_async_write():
|
||||
result = []
|
||||
|
||||
def _write(buf):
|
||||
time.sleep(len(buf) * 0.1)
|
||||
result.append(buf)
|
||||
s = mock.Mock()
|
||||
s.write = _write
|
||||
w = async.AsyncWriter(s)
|
||||
|
||||
w.write('foo')
|
||||
w.write(' ')
|
||||
w.write('bar')
|
||||
w.close()
|
||||
assert w.stream is None
|
||||
w.close()
|
||||
assert result == ['foo', ' ', 'bar']
|
||||
|
||||
|
||||
def test_async_reader_error():
|
||||
s = mock.Mock()
|
||||
s.read.side_effect = IOError()
|
||||
r = async.AsyncReader(s, 1)
|
||||
with pytest.raises(IOError):
|
||||
r.read(3)
|
||||
|
||||
|
||||
def test_async_writer_error():
|
||||
s = mock.Mock()
|
||||
s.write.side_effect = IOError()
|
||||
w = async.AsyncWriter(s)
|
||||
w.write('123')
|
||||
w.thread.join()
|
||||
with pytest.raises(IOError):
|
||||
w.write('456')
|
||||
assert s.write.mock_calls == [mock.call('123')]
|
||||
|
||||
|
||||
def test_underflow():
|
||||
s = mock.Mock()
|
||||
w = async.AsyncWriter(s)
|
||||
w.write('blah')
|
||||
w.thread.join()
|
||||
assert s.write.mock_calls == [mock.call('blah')]
|
||||
with pytest.raises(IOError):
|
||||
w.write('xyzw')
|
||||
|
||||
@@ -70,11 +70,6 @@ def test_small(small_size):
|
||||
run(small_size, chan=lambda x: x)
|
||||
|
||||
|
||||
def test_async():
|
||||
run(1024, chan=lambda x: x,
|
||||
reader=lambda s: async.AsyncReader(s, 128))
|
||||
|
||||
|
||||
def test_error():
|
||||
skip = 32000 # remove trailing silence
|
||||
run(1024, chan=lambda x: x[:-skip], success=False)
|
||||
|
||||
Reference in New Issue
Block a user