mirror of
https://github.com/romanz/amodem.git
synced 2026-04-18 20:25:59 +08:00
split tests from ecc
This commit is contained in:
20
ecc.py
20
ecc.py
@@ -31,7 +31,9 @@ def decode(data, nsym=DEFAULT_NSYM):
|
|||||||
chunk = data[i:i+BLOCK_SIZE]
|
chunk = data[i:i+BLOCK_SIZE]
|
||||||
try:
|
try:
|
||||||
dec.extend(rs_correct_msg(chunk, nsym))
|
dec.extend(rs_correct_msg(chunk, nsym))
|
||||||
except ReedSolomonError:
|
log.info('Decoded %d blocks = %d bytes', (i+1) / BLOCK_SIZE, len(dec))
|
||||||
|
except ReedSolomonError as e:
|
||||||
|
log.info('Decoding stopped: %s', e)
|
||||||
break
|
break
|
||||||
|
|
||||||
if i == 0:
|
if i == 0:
|
||||||
@@ -44,17 +46,9 @@ def decode(data, nsym=DEFAULT_NSYM):
|
|||||||
n = struct.calcsize(LEN_FMT)
|
n = struct.calcsize(LEN_FMT)
|
||||||
payload, length = dec[n:], dec[:n]
|
payload, length = dec[n:], dec[:n]
|
||||||
length, = struct.unpack(LEN_FMT, length)
|
length, = struct.unpack(LEN_FMT, length)
|
||||||
assert length <= len(payload)
|
if length > len(payload):
|
||||||
|
log.warning('%d bytes are missing!', length - len(payload))
|
||||||
|
return None
|
||||||
|
|
||||||
log.info('Decoded {} bytes'.format(length))
|
log.info('Decoded {} bytes'.format(length))
|
||||||
return payload[:length]
|
return payload[:length]
|
||||||
|
|
||||||
|
|
||||||
def test_codec():
|
|
||||||
import random
|
|
||||||
r = random.Random(0)
|
|
||||||
x = bytearray(r.randrange(0, 256) for i in range(16 * 1024))
|
|
||||||
y = encode(x)
|
|
||||||
assert len(y) % BLOCK_SIZE == 0
|
|
||||||
x_ = decode(y)
|
|
||||||
assert x_[:len(x)] == x
|
|
||||||
assert all(v == 0 for v in x_[len(x):])
|
|
||||||
|
|||||||
15
test_ecc.py
Normal file
15
test_ecc.py
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
import ecc
|
||||||
|
import random
|
||||||
|
|
||||||
|
def test_random():
|
||||||
|
r = random.Random(0)
|
||||||
|
x = bytearray(r.randrange(0, 256) for i in range(16 * 1024))
|
||||||
|
y = ecc.encode(x)
|
||||||
|
assert len(y) % ecc.BLOCK_SIZE == 0
|
||||||
|
x_ = ecc.decode(y)
|
||||||
|
assert x_[:len(x)] == x
|
||||||
|
assert all(v == 0 for v in x_[len(x):])
|
||||||
|
|
||||||
|
def test_file():
|
||||||
|
data = open('data.send').read()
|
||||||
|
assert ecc.decode(ecc.encode(data)) == data
|
||||||
Reference in New Issue
Block a user