diff --git a/amodem-cli b/amodem-cli index b25463c..940ceee 100755 --- a/amodem-cli +++ b/amodem-cli @@ -70,8 +70,7 @@ def FileType(mode, audio_interface=None): s = audio_interface.recorder() return async.AsyncReader(stream=s, bufsize=s.bufsize) if 'w' in mode: - s = audio_interface.player() - return async.AsyncWriter(stream=s) + return audio_interface.player() if fname == '-': if 'r' in mode: @@ -208,6 +207,7 @@ def _main(): except KeyboardInterrupt: pass finally: + log.debug('Closing input and output') args.src.close() args.dst.close() diff --git a/amodem/async.py b/amodem/async.py index df228e8..9391d5d 100644 --- a/amodem/async.py +++ b/amodem/async.py @@ -47,46 +47,3 @@ class AsyncReader(object): self.thread.join() self.stream.close() self.stream = None - - -class AsyncWriter(object): - def __init__(self, stream): - self.stream = stream - self.queue = six.moves.queue.Queue() - self.error = threading.Event() - self.stop = threading.Event() - self.args = (stream, self.queue, self.error) - self.thread = None - - @staticmethod - def _thread(dst, queue, error): - total = 0 - try: - log.debug('AsyncWriter thread started') - while True: - buf = queue.get(block=False) - if buf is None: - break - dst.write(buf) - total += len(buf) - log.debug('AsyncWriter thread stopped (written %d bytes)', total) - except BaseException: - log.exception('AsyncWriter thread failed') - error.set() - - def write(self, buf): - if self.error.isSet(): - raise IOError('cannot write to stream') - - self.queue.put(buf) - if self.thread is None: - self.thread = threading.Thread(target=AsyncWriter._thread, - args=self.args, name='AsyncWriter') - self.thread.start() # start only after there is data to write - - def close(self): - if self.stream is not None: - self.queue.put(None) - self.thread.join() - self.stream.close() - self.stream = None diff --git a/tests/test_async.py b/tests/test_async.py index 380ea17..1a5322d 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -22,49 +22,9 @@ def test_async_reader(): r.close() -def test_async_write(): - result = [] - - def _write(buf): - time.sleep(len(buf) * 0.1) - result.append(buf) - s = mock.Mock() - s.write = _write - w = async.AsyncWriter(s) - - w.write('foo') - w.write(' ') - w.write('bar') - w.close() - assert w.stream is None - w.close() - assert result == ['foo', ' ', 'bar'] - - def test_async_reader_error(): s = mock.Mock() s.read.side_effect = IOError() r = async.AsyncReader(s, 1) with pytest.raises(IOError): r.read(3) - - -def test_async_writer_error(): - s = mock.Mock() - s.write.side_effect = IOError() - w = async.AsyncWriter(s) - w.write('123') - w.thread.join() - with pytest.raises(IOError): - w.write('456') - assert s.write.mock_calls == [mock.call('123')] - - -def test_underflow(): - s = mock.Mock() - w = async.AsyncWriter(s) - w.write('blah') - w.thread.join() - assert s.write.mock_calls == [mock.call('blah')] - with pytest.raises(IOError): - w.write('xyzw') diff --git a/tests/test_transfer.py b/tests/test_transfer.py index 5c0a842..308d6bc 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -70,11 +70,6 @@ def test_small(small_size): run(small_size, chan=lambda x: x) -def test_async(): - run(1024, chan=lambda x: x, - reader=lambda s: async.AsyncReader(s, 128)) - - def test_error(): skip = 32000 # remove trailing silence run(1024, chan=lambda x: x[:-skip], success=False)