Files
amodem/amodem-cli
2014-11-30 12:48:57 +02:00

175 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python
# PYTHON_ARGCOMPLETE_OK
import sys
if sys.version_info.major == 2:
_stdin = sys.stdin
_stdout = sys.stdout
else:
_stdin = sys.stdin.buffer
_stdout = sys.stdout.buffer
import argparse
try:
import argcomplete
except ImportError:
argcomplete = None
import logging
log = logging.getLogger('__name__')
from amodem import config
from amodem import recv
from amodem import send
from amodem import audio
from amodem import calib
null = open('/dev/null', 'wb')
def FileType(mode, process=None):
def opener(fname):
assert 'r' in mode or 'w' in mode
if process is None and fname is None:
fname = '-'
if fname is None:
assert process is not None
if 'r' in mode:
return process.launch(stdout=audio.sp.PIPE, stderr=null).stdout
if 'w' in mode:
return process.launch(stdin=audio.sp.PIPE, stderr=null).stdin
if fname == '-':
if 'r' in mode:
return _stdin
if 'w' in mode:
return _stdout
return open(fname, mode)
return opener
def main():
fmt = ('Audio OFDM MODEM: {:.1f} kb/s ({:d}-QAM x {:d} carriers) '
'Fs={:.1f} kHz')
description = fmt.format(config.modem_bps / 1e3, len(config.symbols),
config.Nfreq, config.Fs / 1e3)
p = argparse.ArgumentParser(description=description)
g = p.add_mutually_exclusive_group()
g.add_argument('-v', '--verbose', default=0, action='count')
g.add_argument('-q', '--quiet', default=False, action='store_true')
subparsers = p.add_subparsers()
# Modulator
sender = subparsers.add_parser(
'send', help='modulate binary data into audio signal.')
sender.add_argument(
'-i', '--input', help='input file (use "-" for stdin).')
sender.add_argument(
'-o', '--output', help='output file (use "-" for stdout).'
' if not specified, `aplay` tool will be used.')
sender.add_argument(
'-c', '--calibrate', default=False, action='store_true')
sender.add_argument(
'-w', '--wave', default=False, action='store_true')
sender.add_argument(
'--silence-start', type=float, default=1.0,
help='seconds of silence before transmission starts')
sender.add_argument(
'--silence-stop', type=float, default=1.0,
help='seconds of silence after transmission stops')
sender.set_defaults(
main=run_send,
input_type=FileType('rb'),
output_type=FileType('wb', audio.play(Fs=config.Fs))
)
# Demodulator
receiver = subparsers.add_parser(
'recv', help='demodulate audio signal into binary data.')
receiver.add_argument(
'-i', '--input', help='input file (use "-" for stdin).'
' if not specified, `arecord` tool will be used.')
receiver.add_argument(
'-o', '--output', help='output file (use "-" for stdout).')
receiver.add_argument(
'-c', '--calibrate', default=False, action='store_true')
receiver.add_argument(
'-w', '--wave', default=False, action='store_true')
receiver.add_argument(
'--skip', type=int, default=320,
help='skip initial N samples, due to spurious spikes')
receiver.add_argument(
'--plot', action='store_true', default=False,
help='plot results using pylab module')
receiver.set_defaults(
main=run_recv,
input_type=FileType('rb', audio.record(Fs=config.Fs)),
output_type=FileType('wb')
)
if argcomplete:
argcomplete.autocomplete(p)
args = p.parse_args()
if args.verbose == 0:
level, format = 'INFO', '%(message)s'
elif args.verbose == 1:
level, format = 'DEBUG', '%(message)s'
elif args.verbose >= 2:
level, format = ('DEBUG', '%(asctime)s %(levelname)-10s '
'%(message)-100s '
'%(filename)s:%(lineno)d')
if args.quiet:
level, format = 'WARNING', '%(message)s'
logging.basicConfig(level=level, format=format)
# Parsing and execution
log.debug(description)
if getattr(args, 'plot', False):
import pylab
args.plot = pylab
args.config = config
args.main(args)
def join_process(process):
exitcode = 0
try:
exitcode = process.wait()
except KeyboardInterrupt:
process.kill()
exitcode = process.wait()
sys.exit(exitcode)
def run_modem(args, func):
args.input = args.input_type(args.input)
args.output = args.output_type(args.output)
func(args)
def run_send(args):
if args.calibrate:
calib.send(config=config, verbose=args.verbose)
elif args.wave:
join_process(audio.play(Fs=config.Fs).launch(fname=args.input))
else:
run_modem(args, send.main)
def run_recv(args):
if args.calibrate:
calib.recv(config=config, verbose=args.verbose)
elif args.wave:
join_process(audio.record(Fs=config.Fs).launch(fname=args.output))
else:
run_modem(args, recv.main)
if __name__ == '__main__':
main()