stream: use async I/O to avoid real-time problems.

This commit is contained in:
Roman Zeyde
2015-02-02 21:46:53 +02:00
parent 5b6d1881ab
commit 1d5d564f4d
12 changed files with 207 additions and 23 deletions

View File

@@ -19,7 +19,7 @@ except ImportError:
log = logging.getLogger('__name__')
from amodem import recv, send, calib, audio
from amodem import recv, send, calib, audio, async
from amodem.config import bitrates
bitrate = os.environ.get('BITRATE', 1)
@@ -67,9 +67,11 @@ def FileType(mode, audio_interface=None):
if fname is None:
assert audio_interface is not None
if 'r' in mode:
return audio_interface.recorder()
s = audio_interface.recorder()
return async.AsyncReader(stream=s, bufsize=s.bufsize)
if 'w' in mode:
return audio_interface.player()
s = audio_interface.player()
return async.AsyncWriter(stream=s)
if fname == '-':
if 'r' in mode:
@@ -197,13 +199,17 @@ def main():
with interface.load(args.audio_library):
args.src = args.input_type(args.input)
args.dst = args.output_type(args.output)
if args.calibrate is False:
return args.main(config=config, args=args)
else:
try:
args.calib(config=config, args=args)
except KeyboardInterrupt:
pass
try:
if args.calibrate is False:
return args.main(config=config, args=args)
else:
try:
args.calib(config=config, args=args)
except KeyboardInterrupt:
pass
finally:
args.src.close()
args.dst.close()
if __name__ == '__main__':

92
amodem/async.py Normal file
View File

@@ -0,0 +1,92 @@
import threading
import six # since `Queue` module was renamed to `queue` (in Python 3)
import logging
log = logging.getLogger()
class AsyncReader(object):
def __init__(self, stream, bufsize):
self.stream = stream
self.queue = six.moves.queue.Queue()
self.stop = threading.Event()
args = (stream, bufsize, self.queue, self.stop)
self.thread = threading.Thread(target=AsyncReader._thread,
args=args, name='AsyncReader')
self.thread.start()
self.buf = b''
@staticmethod
def _thread(src, bufsize, queue, stop):
total = 0
try:
log.debug('AsyncReader thread started')
while not stop.isSet():
buf = src.read(bufsize)
queue.put(buf)
total += len(buf)
log.debug('AsyncReader thread stopped (read %d bytes)', total)
except:
log.exception('AsyncReader thread failed')
queue.put(None)
def read(self, size):
while len(self.buf) < size:
buf = self.queue.get()
if buf is None:
raise IOError('cannot read from stream')
self.buf += buf
result = self.buf[:size]
self.buf = self.buf[size:]
return result
def close(self):
if self.stream is not None:
self.stop.set()
self.thread.join()
self.stream.close()
self.stream = None
class AsyncWriter(object):
def __init__(self, stream):
self.stream = stream
self.queue = six.moves.queue.Queue()
self.error = threading.Event()
self.stop = threading.Event()
self.args = (stream, self.queue, self.error)
self.thread = None
@staticmethod
def _thread(dst, queue, error):
total = 0
try:
log.debug('AsyncWriter thread started')
while True:
buf = queue.get(block=False)
if buf is None:
break
dst.write(buf)
total += len(buf)
log.debug('AsyncWriter thread stopped (written %d bytes)', total)
except:
log.exception('AsyncWriter thread failed')
error.set()
def write(self, buf):
if self.error.isSet():
raise IOError('cannot write to stream')
self.queue.put(buf)
if self.thread is None:
self.thread = threading.Thread(target=AsyncWriter._thread,
args=self.args, name='AsyncWriter')
self.thread.start() # start only after there is data to write
def close(self):
if self.stream is not None:
self.queue.put(None)
self.thread.join()
self.stream.close()
self.stream = None

View File

@@ -70,11 +70,13 @@ class Stream(object):
self.user_data = ctypes.c_void_p(None)
self.stream_callback = ctypes.c_void_p(None)
self.bytes_per_sample = config.sample_size
self.latency = float(config.latency) # in seconds
self.bufsize = int(self.latency * config.Fs * self.bytes_per_sample)
assert config.bits_per_sample == 16 # just to make sure :)
read = bool(read)
write = bool(write)
assert read != write
assert read != write # don't support full duplex
direction = 'Input' if read else 'Output'
api_name = 'GetDefault{0}Device'.format(direction)
@@ -83,7 +85,7 @@ class Stream(object):
device=index, # choose default device
channelCount=1, # mono audio
sampleFormat=0x00000008, # 16-bit samples (paInt16)
suggestedLatency=0.1, # 100ms should be good enough
suggestedLatency=self.latency,
hostApiSpecificStreamInfo=None)
self.interface.call(

View File

@@ -11,7 +11,7 @@ class Configuration(object):
# audio config
bits_per_sample = 16
sample_size = bits_per_sample // 8
samples_per_buffer = 8192
latency = 0.1
# sender config
silence_start = 1.0

View File

@@ -54,7 +54,7 @@ def main(config, src, dst):
sender = Sender(dst, config=config)
Fs = config.Fs
# pre-padding audio with silence
# pre-padding audio with silence (priming the audio sending queue)
sender.write(np.zeros(int(Fs * config.silence_start)))
sender.start()

View File

@@ -5,7 +5,7 @@ class Reader(object):
wait = 0.2
timeout = 2.0
bufsize = 4096
bufsize = (8 << 10)
def __init__(self, fd, data_type=None, eof=False):
self.fd = fd

View File

@@ -1,2 +1,3 @@
numpy
six
argcomplete

View File

@@ -16,7 +16,7 @@ def main():
interface = audio.Interface(config=config)
with interface.load(args.audio_library):
src = interface.recorder()
size = config.sample_size * config.samples_per_buffer
size = int(config.sample_size * config.Fs) # one second of audio
while True:
dst.write(src.read(size))

View File

@@ -25,7 +25,7 @@ setup(
packages=['amodem'],
tests_require=['pytest'],
cmdclass={'test': PyTest},
install_requires=['numpy'],
install_requires=['numpy', 'six'],
platforms=['POSIX'],
classifiers=[
"Development Status :: 4 - Beta",

70
tests/test_async.py Normal file
View File

@@ -0,0 +1,70 @@
import mock
import time
import pytest
from amodem import async
import logging
logging.basicConfig(format='%(message)s')
def test_async_reader():
def _read(n):
time.sleep(n * 0.1)
return b'\x00' * n
s = mock.Mock()
s.read = _read
r = async.AsyncReader(s, 1)
n = 5
assert r.read(n) == b'\x00' * n
r.close()
assert r.stream is None
r.close()
def test_async_write():
result = []
def _write(buf):
time.sleep(len(buf) * 0.1)
result.append(buf)
s = mock.Mock()
s.write = _write
w = async.AsyncWriter(s)
w.write('foo')
w.write(' ')
w.write('bar')
w.close()
assert w.stream is None
w.close()
assert result == ['foo', ' ', 'bar']
def test_async_reader_error():
s = mock.Mock()
s.read.side_effect = IOError()
r = async.AsyncReader(s, 1)
with pytest.raises(IOError):
r.read(3)
def test_async_writer_error():
s = mock.Mock()
s.write.side_effect = IOError()
w = async.AsyncWriter(s)
w.write('123')
w.thread.join()
with pytest.raises(IOError):
w.write('456')
assert s.write.mock_calls == [mock.call('123')]
def test_underflow():
s = mock.Mock()
w = async.AsyncWriter(s)
w.write('blah')
w.thread.join()
assert s.write.mock_calls == [mock.call('blah')]
with pytest.raises(IOError):
w.write('xyzw')

View File

@@ -9,6 +9,7 @@ from amodem import common
from amodem import dsp
from amodem import sampling
from amodem import config
from amodem import async
config = config.fastest()
import logging
@@ -26,7 +27,7 @@ class Args(object):
return None
def run(size, chan=None, df=0, success=True):
def run(size, chan=None, df=0, success=True, reader=None):
tx_data = os.urandom(size)
tx_audio = BytesIO()
send.main(config=config, src=BytesIO(tx_data), dst=tx_audio)
@@ -42,13 +43,19 @@ def run(size, chan=None, df=0, success=True):
data = common.dumps(data)
rx_audio = BytesIO(data)
rx_data = BytesIO()
d = BytesIO()
result = recv.main(config=config, src=rx_audio, dst=rx_data,
dump_audio=d)
dump = BytesIO()
if reader:
rx_audio = reader(rx_audio)
try:
result = recv.main(config=config, src=rx_audio, dst=rx_data,
dump_audio=dump)
finally:
rx_audio.close()
rx_data = rx_data.getvalue()
assert data.startswith(d.getvalue())
assert data.startswith(dump.getvalue())
assert result == success
if success:
@@ -64,6 +71,11 @@ def test_small(small_size):
run(small_size, chan=lambda x: x)
def test_async():
run(1024, chan=lambda x: x,
reader=lambda s: async.AsyncReader(s, 128))
def test_error():
skip = 32000 # remove trailing silence
run(1024, chan=lambda x: x[:-skip], success=False)

View File

@@ -7,6 +7,7 @@ deps=
pep8
coverage
pylint
six
commands=
pep8 amodem/ scripts/ tests/ amodem-cli
pylint --extension-pkg-whitelist=numpy --report=no amodem