mirror of
https://github.com/romanz/amodem.git
synced 2026-02-07 01:18:02 +08:00
218 lines
6.8 KiB
Python
Executable File
218 lines
6.8 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# PYTHON_ARGCOMPLETE_OK
|
|
|
|
import os
|
|
import sys
|
|
import zlib
|
|
import logging
|
|
import argparse
|
|
|
|
# Python 3 has `buffer` attribute for byte-based I/O
|
|
_stdin = getattr(sys.stdin, 'buffer', sys.stdin)
|
|
_stdout = getattr(sys.stdout, 'buffer', sys.stdout)
|
|
|
|
|
|
try:
|
|
import argcomplete
|
|
except ImportError:
|
|
argcomplete = None
|
|
|
|
log = logging.getLogger('__name__')
|
|
|
|
from amodem import main, calib, audio, async
|
|
from amodem.config import bitrates
|
|
|
|
bitrate = os.environ.get('BITRATE', 1)
|
|
config = bitrates.get(int(bitrate))
|
|
|
|
|
|
class Compressor(object):
|
|
def __init__(self, stream):
|
|
self.obj = zlib.compressobj()
|
|
self.stream = stream
|
|
|
|
def read(self, size):
|
|
while True:
|
|
data = self.stream.read(size)
|
|
if data:
|
|
result = self.obj.compress(data)
|
|
if not result: # compression is too good :)
|
|
continue # try again (since falsy data = EOF)
|
|
elif self.obj:
|
|
result = self.obj.flush()
|
|
self.obj = None
|
|
else:
|
|
result = '' # EOF marker
|
|
return result
|
|
|
|
|
|
class Decompressor(object):
|
|
def __init__(self, stream):
|
|
self.obj = zlib.decompressobj()
|
|
self.stream = stream
|
|
|
|
def write(self, data):
|
|
self.stream.write(self.obj.decompress(bytes(data)))
|
|
|
|
def flush(self):
|
|
self.stream.write(self.obj.flush())
|
|
|
|
|
|
def FileType(mode, audio_interface=None):
|
|
def opener(fname):
|
|
assert 'r' in mode or 'w' in mode
|
|
if audio_interface is None and fname is None:
|
|
fname = '-'
|
|
|
|
if fname is None:
|
|
assert audio_interface is not None
|
|
if 'r' in mode:
|
|
s = audio_interface.recorder()
|
|
return async.AsyncReader(stream=s, bufsize=s.bufsize)
|
|
if 'w' in mode:
|
|
s = audio_interface.player()
|
|
return async.AsyncWriter(stream=s)
|
|
|
|
if fname == '-':
|
|
if 'r' in mode:
|
|
return _stdin
|
|
if 'w' in mode:
|
|
return _stdout
|
|
|
|
return open(fname, mode)
|
|
|
|
return opener
|
|
|
|
|
|
def get_volume_cmd(args):
|
|
volume_controllers = [
|
|
dict(test='pactl --version',
|
|
send='pactl set-sink-volume @DEFAULT_SINK@',
|
|
recv='pactl set-source-volume @DEFAULT_SOURCE@')
|
|
]
|
|
if args.calibrate == 'auto':
|
|
for c in volume_controllers:
|
|
if os.system(c['test']) == 0:
|
|
return c[args.command]
|
|
|
|
|
|
def _main():
|
|
fmt = ('Audio OFDM MODEM: {0:.1f} kb/s ({1:d}-QAM x {2:d} carriers) '
|
|
'Fs={3:.1f} kHz')
|
|
description = fmt.format(config.modem_bps / 1e3, len(config.symbols),
|
|
config.Nfreq, config.Fs / 1e3)
|
|
interface = audio.Interface(config=config)
|
|
|
|
p = argparse.ArgumentParser(description=description)
|
|
subparsers = p.add_subparsers()
|
|
|
|
def wrap(cls, stream, enable):
|
|
return cls(stream) if enable else stream
|
|
|
|
# Modulator
|
|
sender = subparsers.add_parser(
|
|
'send', help='modulate binary data into audio signal.')
|
|
sender.add_argument(
|
|
'-i', '--input', help='input file (use "-" for stdin).')
|
|
sender.add_argument(
|
|
'-o', '--output', help='output file (use "-" for stdout).'
|
|
' if not specified, `aplay` tool will be used.')
|
|
sender.set_defaults(
|
|
main=lambda config, args: main.send(
|
|
config, src=wrap(Compressor, args.src, args.zip), dst=args.dst
|
|
),
|
|
calib=lambda config, args: calib.send(
|
|
config=config, dst=args.dst,
|
|
volume_cmd=get_volume_cmd(args)
|
|
),
|
|
input_type=FileType('rb'),
|
|
output_type=FileType('wb', interface),
|
|
command='send'
|
|
)
|
|
|
|
# Demodulator
|
|
receiver = subparsers.add_parser(
|
|
'recv', help='demodulate audio signal into binary data.')
|
|
receiver.add_argument(
|
|
'-i', '--input', help='input file (use "-" for stdin).'
|
|
' if not specified, `arecord` tool will be used.')
|
|
receiver.add_argument(
|
|
'-o', '--output', help='output file (use "-" for stdout).')
|
|
receiver.add_argument(
|
|
'-d', '--dump', type=FileType('wb'),
|
|
help='Filename to save recorded audio')
|
|
receiver.add_argument(
|
|
'--plot', action='store_true', default=False,
|
|
help='plot results using pylab module')
|
|
receiver.set_defaults(
|
|
main=lambda config, args: main.recv(
|
|
config, src=args.src, dst=wrap(Decompressor, args.dst, args.zip),
|
|
pylab=args.pylab, dump_audio=args.dump
|
|
),
|
|
calib=lambda config, args: calib.recv(
|
|
config=config, src=args.src, verbose=args.verbose,
|
|
volume_cmd=get_volume_cmd(args)
|
|
),
|
|
input_type=FileType('rb', interface),
|
|
output_type=FileType('wb'),
|
|
command='recv'
|
|
)
|
|
|
|
calibration_help = ('Run calibration '
|
|
'(specify "auto" for automatic gain control)')
|
|
|
|
for sub in subparsers.choices.values():
|
|
sub.add_argument('-c', '--calibrate', nargs='?', default=False,
|
|
metavar='SYSTEM', help=calibration_help)
|
|
sub.add_argument('-l', '--audio-library', default='libportaudio.so',
|
|
help='File name of PortAudio shared library.')
|
|
sub.add_argument('-z', '--zip', default=False, action='store_true',
|
|
help='Use ZIP to compress data.')
|
|
g = sub.add_mutually_exclusive_group()
|
|
g.add_argument('-v', '--verbose', default=0, action='count')
|
|
g.add_argument('-q', '--quiet', default=False, action='store_true')
|
|
|
|
if argcomplete:
|
|
argcomplete.autocomplete(p)
|
|
|
|
args = p.parse_args()
|
|
if args.verbose == 0:
|
|
level, fmt = 'INFO', '%(message)s'
|
|
elif args.verbose == 1:
|
|
level, fmt = 'DEBUG', '%(message)s'
|
|
elif args.verbose >= 2:
|
|
level, fmt = ('DEBUG', '%(asctime)s %(levelname)-10s '
|
|
'%(message)-100s '
|
|
'%(filename)s:%(lineno)d')
|
|
if args.quiet:
|
|
level, fmt = 'WARNING', '%(message)s'
|
|
logging.basicConfig(level=level, format=fmt)
|
|
|
|
# Parsing and execution
|
|
log.debug(description)
|
|
|
|
args.pylab = None
|
|
if getattr(args, 'plot', False):
|
|
import pylab
|
|
args.pylab = pylab
|
|
|
|
with interface.load(args.audio_library):
|
|
args.src = args.input_type(args.input)
|
|
args.dst = args.output_type(args.output)
|
|
try:
|
|
if args.calibrate is False:
|
|
return args.main(config=config, args=args)
|
|
else:
|
|
try:
|
|
args.calib(config=config, args=args)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
args.src.close()
|
|
args.dst.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
success = _main()
|
|
sys.exit(0 if success else 1)
|