audio: use ctypes to access PortAudio API directly

This commit is contained in:
Roman Zeyde
2015-01-06 12:41:59 +02:00
parent b3510c18b3
commit 75dd7d28c9
2 changed files with 122 additions and 47 deletions

View File

@@ -55,7 +55,8 @@ def main():
'Fs={3:.1f} kHz')
description = fmt.format(config.modem_bps / 1e3, len(config.symbols),
config.Nfreq, config.Fs / 1e3)
interface = audio.Interface(config)
interface = audio.Library('libportaudio.so')
p = argparse.ArgumentParser(description=description)
subparsers = p.add_subparsers()
@@ -102,32 +103,35 @@ def main():
if argcomplete:
argcomplete.autocomplete(p)
args = p.parse_args()
if args.verbose == 0:
level, format = 'INFO', '%(message)s'
elif args.verbose == 1:
level, format = 'DEBUG', '%(message)s'
elif args.verbose >= 2:
level, format = ('DEBUG', '%(asctime)s %(levelname)-10s '
'%(message)-100s '
'%(filename)s:%(lineno)d')
if args.quiet:
level, format = 'WARNING', '%(message)s'
logging.basicConfig(level=level, format=format)
# Parsing and execution
log.debug(description)
if getattr(args, 'plot', False):
import pylab
else:
pylab = None
with interface:
args = p.parse_args()
if args.verbose == 0:
level, format = 'INFO', '%(message)s'
elif args.verbose == 1:
level, format = 'DEBUG', '%(message)s'
elif args.verbose >= 2:
level, format = ('DEBUG', '%(asctime)s %(levelname)-10s '
'%(message)-100s '
'%(filename)s:%(lineno)d')
if args.quiet:
level, format = 'WARNING', '%(message)s'
logging.basicConfig(level=level, format=format)
src = args.input_type(args.input)
dst = args.output_type(args.output)
if args.calibrate:
args.calibration(config=config, src=src, dst=dst, verbose=args.verbose)
else:
return args.main(config=config, src=src, dst=dst, pylab=pylab)
# Parsing and execution
log.debug(description)
if getattr(args, 'plot', False):
import pylab
else:
pylab = None
src = args.input_type(args.input)
dst = args.output_type(args.output)
if args.calibrate:
args.calibration(config=config, src=src, dst=dst,
verbose=args.verbose)
else:
return args.main(config=config, src=src, dst=dst, pylab=pylab)
if __name__ == '__main__':

View File

@@ -1,35 +1,106 @@
import pyaudio
import ctypes
import logging
log = logging.getLogger(__name__)
class Interface(object):
def __init__(self, config, library=pyaudio):
self.p = library.PyAudio()
fmt = getattr(library, 'paInt{0}'.format(config.bits_per_sample))
self.sample_size = config.sample_size
self.kwargs = dict(
channels=1, rate=int(config.Fs), format=fmt,
frames_per_buffer=config.samples_per_buffer
)
class Library(object):
def __init__(self, name):
self.lib = ctypes.CDLL(name)
self.lib.Pa_GetVersionText.restype = ctypes.c_char_p
log.debug('Library version: "%s"', self.lib.Pa_GetVersionText())
self.lib.Pa_GetErrorText.restype = ctypes.c_char_p
assert self.lib.Pa_GetErrorText(0) == 'Success'
self.streams = []
def __del__(self):
self.p.terminate()
def call(self, name, *args, **kwargs):
func = getattr(self.lib, 'Pa_{0}'.format(name))
func.restype = kwargs.get('restype', self._error_check)
return func(*args)
def player(self):
return self.p.open(output=True, **self.kwargs)
def _error_check(self, res):
if res != 0:
raise Exception(res, self.lib.Pa_GetErrorText(res))
def __enter__(self):
self.call('Initialize')
return self
def __exit__(self, *args):
for s in self.streams:
s.close()
self.call('Terminate')
def recorder(self):
stream = self.p.open(input=True, **self.kwargs)
return _Recorder(stream, sample_size=self.sample_size)
return Stream(self, read=True)
def player(self):
return Stream(self, write=True)
class _Recorder(object):
def __init__(self, stream, sample_size):
self.stream = stream
self.sample_size = sample_size
class Stream(object):
class Parameters(ctypes.Structure):
_fields_ = [
('device', ctypes.c_int),
('channelCount', ctypes.c_int),
('sampleFormat', ctypes.c_ulong),
('suggestedLatency', ctypes.c_double),
('hostApiSpecificStreamInfo', ctypes.POINTER(None)),
]
sample_rate = 32000.0
frames_per_buffer = 4096
suggested_latency = 0.1
channel_count = 1
sample_format = 0x00000008 # 16-bit samples (paInt16)
bytes_per_sample = 2
flags = 0 # no flags (paNoFlag)
def __init__(self, lib, read=False, write=False):
self.lib = lib
self.stream = ctypes.POINTER(ctypes.c_void_p)()
self.user_data = ctypes.c_void_p(None)
self.stream_callback = ctypes.c_void_p(None)
index = lib.call('GetDefaultInputDevice', restype=ctypes.c_int)
self.params = Stream.Parameters(
device=index,
channelCount=self.channel_count,
sampleFormat=self.sample_format,
suggestedLatency=self.suggested_latency,
hostApiSpecificStreamInfo=None)
self.lib.call(
'OpenStream',
ctypes.byref(self.stream),
ctypes.byref(self.params) if read else None,
ctypes.byref(self.params) if write else None,
ctypes.c_double(self.sample_rate),
ctypes.c_ulong(self.frames_per_buffer),
ctypes.c_ulong(self.flags),
self.stream_callback,
self.user_data)
self.lib.streams.append(self)
self.lib.call('StartStream', self.stream)
def close(self):
if self.stream:
self.lib.call('StopStream', self.stream)
self.lib.call('CloseStream', self.stream)
self.stream = None
def read(self, size):
assert size % self.sample_size == 0
return self.stream.read(size // self.sample_size)
assert size % self.bytes_per_sample == 0
buf = ctypes.create_string_buffer(size)
frames = ctypes.c_ulong(size // self.bytes_per_sample)
self.lib.call('ReadStream', self.stream, buf, frames)
return buf.raw
def write(self, data):
data = bytes(data)
assert len(data) % self.bytes_per_sample == 0
buf = ctypes.c_char_p(data)
frames = ctypes.c_ulong(len(data) // self.bytes_per_sample)
self.lib.call('WriteStream', self.stream, buf, frames)