mirror of
https://github.com/romanz/amodem.git
synced 2026-02-06 16:48:06 +08:00
fix wave tools
This commit is contained in:
@@ -2,6 +2,7 @@ import os
|
||||
import signal
|
||||
import subprocess as sp
|
||||
import logging
|
||||
import functools
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -15,19 +16,14 @@ bytes_per_second = bytes_per_sample * Fs
|
||||
audio_format = 'S{}_LE'.format(bits_per_sample) # PCM signed little endian
|
||||
|
||||
|
||||
def play(fname, **kwargs):
|
||||
args = ['aplay', fname, '-q', '-f', audio_format, '-c', '1', '-r', Fs]
|
||||
return launch(*args, **kwargs)
|
||||
|
||||
|
||||
def record(fname, **kwargs):
|
||||
args = ['arecord', fname, '-q', '-f', audio_format, '-c', '1', '-r', Fs]
|
||||
return launch(*args, **kwargs)
|
||||
|
||||
|
||||
def launch(*args, **kwargs):
|
||||
args = list(map(str, args))
|
||||
log.debug('$ %s', ' '.join(args))
|
||||
def launch(tool, fname, **kwargs):
|
||||
args = [tool, fname, '-q', '-f', audio_format, '-c', '1', '-r', str(Fs)]
|
||||
log.debug('running: %r', args)
|
||||
p = sp.Popen(args=args, **kwargs)
|
||||
p.stop = lambda: os.kill(p.pid, signal.SIGKILL)
|
||||
return p
|
||||
|
||||
|
||||
# Use ALSA tools for audio playing/recording
|
||||
play = functools.partial(launch, tool='aplay')
|
||||
record = functools.partial(launch, tool='arecord')
|
||||
|
||||
@@ -4,19 +4,19 @@ import signal
|
||||
|
||||
|
||||
def test_launch():
|
||||
p = wave.launch('cat', stdin=sp.PIPE)
|
||||
p.stdin.close()
|
||||
p = wave.launch(tool='true', fname='fname')
|
||||
assert p.wait() == 0
|
||||
|
||||
p = wave.launch('bash', stdin=sp.PIPE)
|
||||
p.stdin.write(b'exit 42')
|
||||
p = wave.launch(tool='python', fname='-', stdin=sp.PIPE)
|
||||
s = b'import sys; sys.exit(42)'
|
||||
p.stdin.write(s)
|
||||
p.stdin.close()
|
||||
assert p.wait() == 42
|
||||
|
||||
p = wave.launch('cat', stdin=sp.PIPE, stdout=sp.PIPE)
|
||||
p = wave.launch(tool='python', fname='-', stdin=sp.PIPE, stdout=sp.PIPE)
|
||||
s = b'Hello World!'
|
||||
p.stdin.write(s)
|
||||
p.stdin.flush()
|
||||
p.stdin.write(b'print("' + s + b'")\n')
|
||||
p.stdin.close()
|
||||
assert p.stdout.read(len(s)) == s
|
||||
|
||||
p.kill()
|
||||
|
||||
Reference in New Issue
Block a user