#!/usr/bin/env python # PYTHON_ARGCOMPLETE_OK from amodem import main, calib, audio, async from amodem.config import bitrates import os import sys import zlib import logging import argparse # Python 3 has `buffer` attribute for byte-based I/O _stdin = getattr(sys.stdin, 'buffer', sys.stdin) _stdout = getattr(sys.stdout, 'buffer', sys.stdout) try: import argcomplete except ImportError: argcomplete = None log = logging.getLogger('__name__') bitrate = os.environ.get('BITRATE', 1) config = bitrates.get(int(bitrate)) class Compressor(object): def __init__(self, stream): self.obj = zlib.compressobj() self.stream = stream def read(self, size): while True: data = self.stream.read(size) if data: result = self.obj.compress(data) if not result: # compression is too good :) continue # try again (since falsy data = EOF) elif self.obj: result = self.obj.flush() self.obj = None else: result = '' # EOF marker return result class Decompressor(object): def __init__(self, stream): self.obj = zlib.decompressobj() self.stream = stream def write(self, data): self.stream.write(self.obj.decompress(bytes(data))) def flush(self): self.stream.write(self.obj.flush()) def FileType(mode, audio_interface=None): def opener(fname): assert 'r' in mode or 'w' in mode if audio_interface is None and fname is None: fname = '-' if fname is None: assert audio_interface is not None if 'r' in mode: s = audio_interface.recorder() return async.AsyncReader(stream=s, bufsize=s.bufsize) if 'w' in mode: return audio_interface.player() if fname == '-': if 'r' in mode: return _stdin if 'w' in mode: return _stdout return open(fname, mode) return opener def get_volume_cmd(args): volume_controllers = [ dict(test='pactl --version', send='pactl set-sink-volume @DEFAULT_SINK@', recv='pactl set-source-volume @DEFAULT_SOURCE@') ] if args.calibrate == 'auto': for c in volume_controllers: if os.system(c['test']) == 0: return c[args.command] class _DummyInterface(object): ''' Audio interface mock, to skip shared library loading. ''' def __enter__(self): pass def __exit__(self, *args): pass def _main(): fmt = ('Audio OFDM MODEM: {0:.1f} kb/s ({1:d}-QAM x {2:d} carriers) ' 'Fs={3:.1f} kHz') description = fmt.format(config.modem_bps / 1e3, len(config.symbols), config.Nfreq, config.Fs / 1e3) interface = audio.Interface(config=config) p = argparse.ArgumentParser(description=description) subparsers = p.add_subparsers() def wrap(cls, stream, enable): return cls(stream) if enable else stream # Modulator sender = subparsers.add_parser( 'send', help='modulate binary data into audio signal.') sender.add_argument( '-i', '--input', help='input file (use "-" for stdin).') sender.add_argument( '-o', '--output', help='output file (use "-" for stdout).' ' if not specified, `aplay` tool will be used.') sender.set_defaults( main=lambda config, args: main.send( config, src=wrap(Compressor, args.src, args.zip), dst=args.dst ), calib=lambda config, args: calib.send( config=config, dst=args.dst, volume_cmd=get_volume_cmd(args) ), input_type=FileType('rb'), output_type=FileType('wb', interface), command='send' ) # Demodulator receiver = subparsers.add_parser( 'recv', help='demodulate audio signal into binary data.') receiver.add_argument( '-i', '--input', help='input file (use "-" for stdin).' ' if not specified, `arecord` tool will be used.') receiver.add_argument( '-o', '--output', help='output file (use "-" for stdout).') receiver.add_argument( '-d', '--dump', type=FileType('wb'), help='Filename to save recorded audio') receiver.add_argument( '--plot', action='store_true', default=False, help='plot results using pylab module') receiver.set_defaults( main=lambda config, args: main.recv( config, src=args.src, dst=wrap(Decompressor, args.dst, args.zip), pylab=args.pylab, dump_audio=args.dump ), calib=lambda config, args: calib.recv( config=config, src=args.src, verbose=args.verbose, volume_cmd=get_volume_cmd(args) ), input_type=FileType('rb', interface), output_type=FileType('wb'), command='recv' ) calibration_help = ('Run calibration ' '(specify "auto" for automatic gain control)') for sub in subparsers.choices.values(): sub.add_argument('-c', '--calibrate', nargs='?', default=False, metavar='SYSTEM', help=calibration_help) sub.add_argument('-l', '--audio-library', default='libportaudio.so', help='File name of PortAudio shared library.') sub.add_argument('-z', '--zip', default=False, action='store_true', help='Use ZIP to compress data.') g = sub.add_mutually_exclusive_group() g.add_argument('-v', '--verbose', default=0, action='count') g.add_argument('-q', '--quiet', default=False, action='store_true') if argcomplete: argcomplete.autocomplete(p) args = p.parse_args() if args.verbose == 0: level, fmt = 'INFO', '%(message)s' elif args.verbose == 1: level, fmt = 'DEBUG', '%(message)s' elif args.verbose >= 2: level, fmt = ('DEBUG', '%(asctime)s %(levelname)-10s ' '%(message)-100s ' '%(filename)s:%(lineno)d') if args.quiet: level, fmt = 'WARNING', '%(message)s' logging.basicConfig(level=level, format=fmt) # Parsing and execution log.debug(description) args.pylab = None if getattr(args, 'plot', False): import pylab args.pylab = pylab if args.audio_library == '-': interface = _DummyInterface() else: interface.load(args.audio_library) with interface: args.src = args.input_type(args.input) args.dst = args.output_type(args.output) try: if args.calibrate is False: return args.main(config=config, args=args) else: try: args.calib(config=config, args=args) except KeyboardInterrupt: pass finally: log.debug('Closing input and output') args.src.close() args.dst.close() if __name__ == '__main__': success = _main() sys.exit(0 if success else 1)