audio: add simple ALSA interface

This commit is contained in:
Roman Zeyde
2015-07-30 21:53:11 +03:00
parent 8d72621b9b
commit f82a4f4a39
3 changed files with 122 additions and 18 deletions

View File

@@ -60,8 +60,10 @@ class Decompressor(object):
self.stream.write(self.obj.flush())
def FileType(mode, audio_interface=None):
def FileType(mode, interface_factory=None):
def opener(fname):
audio_interface = interface_factory() if interface_factory else None
assert 'r' in mode or 'w' in mode
if audio_interface is None and fname is None:
fname = '-'
@@ -97,20 +99,11 @@ def get_volume_cmd(args):
return c[args.command]
class _DummyContextManager(object):
def __enter__(self):
pass
def __exit__(self, *args):
pass
def wrap(cls, stream, enable):
return cls(stream) if enable else stream
def create_parser(description, interface):
def create_parser(description, interface_factory):
p = argparse.ArgumentParser(description=description)
subparsers = p.add_subparsers()
@@ -135,7 +128,7 @@ def create_parser(description, interface):
volume_cmd=get_volume_cmd(args)
),
input_type=FileType('rb'),
output_type=FileType('wb', interface),
output_type=FileType('wb', interface_factory),
command='send'
)
@@ -162,7 +155,7 @@ def create_parser(description, interface):
config=config, src=args.src, verbose=args.verbose,
volume_cmd=get_volume_cmd(args)
),
input_type=FileType('rb', interface),
input_type=FileType('rb', interface_factory),
output_type=FileType('wb'),
command='recv'
)
@@ -194,8 +187,12 @@ def _main():
description = fmt.format(version.__doc__,
config.modem_bps / 1e3, len(config.symbols),
config.Nfreq, config.Fs / 1e3)
interface = audio.Interface(config=config)
p = create_parser(description, interface)
interface = None
def interface_factory():
return interface
p = create_parser(description, interface_factory)
args = p.parse_args()
if args.verbose == 0:
@@ -218,9 +215,11 @@ def _main():
import pylab # pylint: disable=import-error
args.pylab = pylab
if args.audio_library == '-':
interface = _DummyContextManager()
if args.audio_library == 'ALSA':
from . import alsa
interface = alsa.Interface(config)
else:
interface = audio.Interface(config)
interface.load(args.audio_library)
with interface:
@@ -232,9 +231,9 @@ def _main():
else:
args.calib(config=config, args=args)
finally:
log.debug('Closing input and output')
args.src.close()
args.dst.close()
log.debug('Finished I/O')
if __name__ == '__main__':

65
amodem/alsa.py Normal file
View File

@@ -0,0 +1,65 @@
import subprocess
import logging
log = logging.getLogger(__name__)
class Interface(object):
RECORDER = 'arecord'
PLAYER = 'aplay'
def __init__(self, config):
self.config = config
rate = int(config.Fs)
bits_per_sample = config.bits_per_sample
assert bits_per_sample == 16
args = '-f S{:d}_LE -c 1 -r {:d} -T 100 -q -'
args = args.format(bits_per_sample, rate).split()
self.record_cmd = [self.RECORDER] + args
self.play_cmd = [self.PLAYER] + args
self.processes = []
def __enter__(self):
return self
def __exit__(self, *args):
for p in self.processes:
try:
p.wait()
except OSError:
log.warning('%s failed', p)
def launch(self, **kwargs):
log.debug('Launching subprocess: %s', kwargs)
p = subprocess.Popen(**kwargs)
self.processes.append(p)
return p
def recorder(self):
return Recorder(self)
def player(self):
return Player(self)
class Recorder(object):
def __init__(self, lib):
self.p = lib.launch(args=lib.record_cmd, stdout=subprocess.PIPE)
self.read = self.p.stdout.read
self.bufsize = 4096
def close(self):
self.p.kill()
class Player(object):
def __init__(self, lib):
self.p = lib.launch(args=lib.play_cmd, stdin=subprocess.PIPE)
self.write = self.p.stdin.write
def close(self):
self.p.stdin.close()
self.p.wait()

40
tests/test_alsa.py Normal file
View File

@@ -0,0 +1,40 @@
from amodem import alsa, config
import mock
def test_alsa():
interface = alsa.Interface(config=config.fastest())
interface.launch = mock.Mock()
with interface:
r = interface.recorder()
r.read(2)
r.close()
p = mock.call(
args='arecord -f S16_LE -c 1 -r 32000 -T 100 -q -'.split(),
stdout=-1)
assert interface.launch.mock_calls == [p, p.stdout.read(2), p.kill()]
interface.launch = mock.Mock()
with interface:
p = interface.player()
p.write('\x00\x00')
p.close()
p = mock.call(
args='aplay -f S16_LE -c 1 -r 32000 -T 100 -q -'.split(),
stdin=-1)
assert interface.launch.mock_calls == [
p, p.stdin.write('\x00\x00'), p.stdin.close(), p.wait()
]
def test_alsa_subprocess():
interface = alsa.Interface(config=config.fastest())
with mock.patch('subprocess.Popen') as popen:
with interface:
p = interface.launch(args=['foobar'])
p.wait.side_effect = OSError('invalid command')
assert interface.processes == [p]
assert popen.mock_calls == [mock.call(args=['foobar'])]