From 1d5d564f4df3a0d8e200f3e2918f5a5904b14233 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Mon, 2 Feb 2015 21:46:53 +0200 Subject: [PATCH] stream: use async I/O to avoid real-time problems. --- amodem-cli | 26 +++++++----- amodem/async.py | 92 ++++++++++++++++++++++++++++++++++++++++++ amodem/audio.py | 6 ++- amodem/config.py | 2 +- amodem/send.py | 2 +- amodem/stream.py | 2 +- requirements.txt | 1 + scripts/record | 2 +- setup.py | 2 +- tests/test_async.py | 70 ++++++++++++++++++++++++++++++++ tests/test_transfer.py | 24 ++++++++--- tox.ini | 1 + 12 files changed, 207 insertions(+), 23 deletions(-) create mode 100644 amodem/async.py create mode 100644 tests/test_async.py diff --git a/amodem-cli b/amodem-cli index a023126..feb0bcb 100755 --- a/amodem-cli +++ b/amodem-cli @@ -19,7 +19,7 @@ except ImportError: log = logging.getLogger('__name__') -from amodem import recv, send, calib, audio +from amodem import recv, send, calib, audio, async from amodem.config import bitrates bitrate = os.environ.get('BITRATE', 1) @@ -67,9 +67,11 @@ def FileType(mode, audio_interface=None): if fname is None: assert audio_interface is not None if 'r' in mode: - return audio_interface.recorder() + s = audio_interface.recorder() + return async.AsyncReader(stream=s, bufsize=s.bufsize) if 'w' in mode: - return audio_interface.player() + s = audio_interface.player() + return async.AsyncWriter(stream=s) if fname == '-': if 'r' in mode: @@ -197,13 +199,17 @@ def main(): with interface.load(args.audio_library): args.src = args.input_type(args.input) args.dst = args.output_type(args.output) - if args.calibrate is False: - return args.main(config=config, args=args) - else: - try: - args.calib(config=config, args=args) - except KeyboardInterrupt: - pass + try: + if args.calibrate is False: + return args.main(config=config, args=args) + else: + try: + args.calib(config=config, args=args) + except KeyboardInterrupt: + pass + finally: + args.src.close() + args.dst.close() if __name__ == '__main__': diff --git a/amodem/async.py b/amodem/async.py new file mode 100644 index 0000000..0699c2e --- /dev/null +++ b/amodem/async.py @@ -0,0 +1,92 @@ +import threading +import six # since `Queue` module was renamed to `queue` (in Python 3) +import logging + +log = logging.getLogger() + + +class AsyncReader(object): + def __init__(self, stream, bufsize): + self.stream = stream + self.queue = six.moves.queue.Queue() + self.stop = threading.Event() + args = (stream, bufsize, self.queue, self.stop) + self.thread = threading.Thread(target=AsyncReader._thread, + args=args, name='AsyncReader') + self.thread.start() + self.buf = b'' + + @staticmethod + def _thread(src, bufsize, queue, stop): + total = 0 + try: + log.debug('AsyncReader thread started') + while not stop.isSet(): + buf = src.read(bufsize) + queue.put(buf) + total += len(buf) + log.debug('AsyncReader thread stopped (read %d bytes)', total) + except: + log.exception('AsyncReader thread failed') + queue.put(None) + + def read(self, size): + while len(self.buf) < size: + buf = self.queue.get() + if buf is None: + raise IOError('cannot read from stream') + self.buf += buf + + result = self.buf[:size] + self.buf = self.buf[size:] + return result + + def close(self): + if self.stream is not None: + self.stop.set() + self.thread.join() + self.stream.close() + self.stream = None + + +class AsyncWriter(object): + def __init__(self, stream): + self.stream = stream + self.queue = six.moves.queue.Queue() + self.error = threading.Event() + self.stop = threading.Event() + self.args = (stream, self.queue, self.error) + self.thread = None + + @staticmethod + def _thread(dst, queue, error): + total = 0 + try: + log.debug('AsyncWriter thread started') + while True: + buf = queue.get(block=False) + if buf is None: + break + dst.write(buf) + total += len(buf) + log.debug('AsyncWriter thread stopped (written %d bytes)', total) + except: + log.exception('AsyncWriter thread failed') + error.set() + + def write(self, buf): + if self.error.isSet(): + raise IOError('cannot write to stream') + + self.queue.put(buf) + if self.thread is None: + self.thread = threading.Thread(target=AsyncWriter._thread, + args=self.args, name='AsyncWriter') + self.thread.start() # start only after there is data to write + + def close(self): + if self.stream is not None: + self.queue.put(None) + self.thread.join() + self.stream.close() + self.stream = None diff --git a/amodem/audio.py b/amodem/audio.py index b78b9ee..eafc728 100644 --- a/amodem/audio.py +++ b/amodem/audio.py @@ -70,11 +70,13 @@ class Stream(object): self.user_data = ctypes.c_void_p(None) self.stream_callback = ctypes.c_void_p(None) self.bytes_per_sample = config.sample_size + self.latency = float(config.latency) # in seconds + self.bufsize = int(self.latency * config.Fs * self.bytes_per_sample) assert config.bits_per_sample == 16 # just to make sure :) read = bool(read) write = bool(write) - assert read != write + assert read != write # don't support full duplex direction = 'Input' if read else 'Output' api_name = 'GetDefault{0}Device'.format(direction) @@ -83,7 +85,7 @@ class Stream(object): device=index, # choose default device channelCount=1, # mono audio sampleFormat=0x00000008, # 16-bit samples (paInt16) - suggestedLatency=0.1, # 100ms should be good enough + suggestedLatency=self.latency, hostApiSpecificStreamInfo=None) self.interface.call( diff --git a/amodem/config.py b/amodem/config.py index 5d82cc9..d87c281 100644 --- a/amodem/config.py +++ b/amodem/config.py @@ -11,7 +11,7 @@ class Configuration(object): # audio config bits_per_sample = 16 sample_size = bits_per_sample // 8 - samples_per_buffer = 8192 + latency = 0.1 # sender config silence_start = 1.0 diff --git a/amodem/send.py b/amodem/send.py index 0d8df8d..5e52d98 100644 --- a/amodem/send.py +++ b/amodem/send.py @@ -54,7 +54,7 @@ def main(config, src, dst): sender = Sender(dst, config=config) Fs = config.Fs - # pre-padding audio with silence + # pre-padding audio with silence (priming the audio sending queue) sender.write(np.zeros(int(Fs * config.silence_start))) sender.start() diff --git a/amodem/stream.py b/amodem/stream.py index f6e6d14..7cfb401 100644 --- a/amodem/stream.py +++ b/amodem/stream.py @@ -5,7 +5,7 @@ class Reader(object): wait = 0.2 timeout = 2.0 - bufsize = 4096 + bufsize = (8 << 10) def __init__(self, fd, data_type=None, eof=False): self.fd = fd diff --git a/requirements.txt b/requirements.txt index 85d2467..45c8388 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ numpy +six argcomplete diff --git a/scripts/record b/scripts/record index 56f2b0b..dbf5600 100755 --- a/scripts/record +++ b/scripts/record @@ -16,7 +16,7 @@ def main(): interface = audio.Interface(config=config) with interface.load(args.audio_library): src = interface.recorder() - size = config.sample_size * config.samples_per_buffer + size = int(config.sample_size * config.Fs) # one second of audio while True: dst.write(src.read(size)) diff --git a/setup.py b/setup.py index 6c724a2..ed8279f 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ setup( packages=['amodem'], tests_require=['pytest'], cmdclass={'test': PyTest}, - install_requires=['numpy'], + install_requires=['numpy', 'six'], platforms=['POSIX'], classifiers=[ "Development Status :: 4 - Beta", diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..380ea17 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,70 @@ +import mock +import time +import pytest +from amodem import async +import logging + +logging.basicConfig(format='%(message)s') + + +def test_async_reader(): + def _read(n): + time.sleep(n * 0.1) + return b'\x00' * n + s = mock.Mock() + s.read = _read + r = async.AsyncReader(s, 1) + + n = 5 + assert r.read(n) == b'\x00' * n + r.close() + assert r.stream is None + r.close() + + +def test_async_write(): + result = [] + + def _write(buf): + time.sleep(len(buf) * 0.1) + result.append(buf) + s = mock.Mock() + s.write = _write + w = async.AsyncWriter(s) + + w.write('foo') + w.write(' ') + w.write('bar') + w.close() + assert w.stream is None + w.close() + assert result == ['foo', ' ', 'bar'] + + +def test_async_reader_error(): + s = mock.Mock() + s.read.side_effect = IOError() + r = async.AsyncReader(s, 1) + with pytest.raises(IOError): + r.read(3) + + +def test_async_writer_error(): + s = mock.Mock() + s.write.side_effect = IOError() + w = async.AsyncWriter(s) + w.write('123') + w.thread.join() + with pytest.raises(IOError): + w.write('456') + assert s.write.mock_calls == [mock.call('123')] + + +def test_underflow(): + s = mock.Mock() + w = async.AsyncWriter(s) + w.write('blah') + w.thread.join() + assert s.write.mock_calls == [mock.call('blah')] + with pytest.raises(IOError): + w.write('xyzw') diff --git a/tests/test_transfer.py b/tests/test_transfer.py index d7eab2f..c57aa93 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -9,6 +9,7 @@ from amodem import common from amodem import dsp from amodem import sampling from amodem import config +from amodem import async config = config.fastest() import logging @@ -26,7 +27,7 @@ class Args(object): return None -def run(size, chan=None, df=0, success=True): +def run(size, chan=None, df=0, success=True, reader=None): tx_data = os.urandom(size) tx_audio = BytesIO() send.main(config=config, src=BytesIO(tx_data), dst=tx_audio) @@ -42,13 +43,19 @@ def run(size, chan=None, df=0, success=True): data = common.dumps(data) rx_audio = BytesIO(data) - rx_data = BytesIO() - d = BytesIO() - result = recv.main(config=config, src=rx_audio, dst=rx_data, - dump_audio=d) + dump = BytesIO() + + if reader: + rx_audio = reader(rx_audio) + try: + result = recv.main(config=config, src=rx_audio, dst=rx_data, + dump_audio=dump) + finally: + rx_audio.close() + rx_data = rx_data.getvalue() - assert data.startswith(d.getvalue()) + assert data.startswith(dump.getvalue()) assert result == success if success: @@ -64,6 +71,11 @@ def test_small(small_size): run(small_size, chan=lambda x: x) +def test_async(): + run(1024, chan=lambda x: x, + reader=lambda s: async.AsyncReader(s, 128)) + + def test_error(): skip = 32000 # remove trailing silence run(1024, chan=lambda x: x[:-skip], success=False) diff --git a/tox.ini b/tox.ini index 46206d3..0e74867 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ deps= pep8 coverage pylint + six commands= pep8 amodem/ scripts/ tests/ amodem-cli pylint --extension-pkg-whitelist=numpy --report=no amodem