mirror of
https://github.com/romanz/amodem.git
synced 2026-04-21 22:06:27 +08:00
util: add docstrings
This commit is contained in:
@@ -38,7 +38,7 @@ class FakeSocket(object):
|
|||||||
def test_send_recv():
|
def test_send_recv():
|
||||||
s = FakeSocket()
|
s = FakeSocket()
|
||||||
util.send(s, b'123')
|
util.send(s, b'123')
|
||||||
util.send(s, data=[42], fmt='B')
|
util.send(s, b'*')
|
||||||
assert s.buf.getvalue() == b'123*'
|
assert s.buf.getvalue() == b'123*'
|
||||||
|
|
||||||
s.buf.seek(0)
|
s.buf.seek(0)
|
||||||
|
|||||||
@@ -1,14 +1,20 @@
|
|||||||
|
"""Various I/O and serialization utilities."""
|
||||||
import io
|
import io
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
|
|
||||||
def send(conn, data, fmt=None):
|
def send(conn, data):
|
||||||
if fmt:
|
"""Send data blob to connection socket."""
|
||||||
data = struct.pack(fmt, *data)
|
|
||||||
conn.sendall(data)
|
conn.sendall(data)
|
||||||
|
|
||||||
|
|
||||||
def recv(conn, size):
|
def recv(conn, size):
|
||||||
|
"""
|
||||||
|
Receive bytes from connection socket or stream.
|
||||||
|
|
||||||
|
If size is struct.calcsize()-compatible format, use it to unpack the data.
|
||||||
|
Otherwise, return the plain blob as bytes.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
fmt = size
|
fmt = size
|
||||||
size = struct.calcsize(fmt)
|
size = struct.calcsize(fmt)
|
||||||
@@ -34,11 +40,13 @@ def recv(conn, size):
|
|||||||
|
|
||||||
|
|
||||||
def read_frame(conn):
|
def read_frame(conn):
|
||||||
|
"""Read size-prefixed frame from connection."""
|
||||||
size, = recv(conn, '>L')
|
size, = recv(conn, '>L')
|
||||||
return recv(conn, size)
|
return recv(conn, size)
|
||||||
|
|
||||||
|
|
||||||
def bytes2num(s):
|
def bytes2num(s):
|
||||||
|
"""Convert MSB-first bytes to an unsigned integer."""
|
||||||
res = 0
|
res = 0
|
||||||
for i, c in enumerate(reversed(bytearray(s))):
|
for i, c in enumerate(reversed(bytearray(s))):
|
||||||
res += c << (i * 8)
|
res += c << (i * 8)
|
||||||
@@ -46,6 +54,7 @@ def bytes2num(s):
|
|||||||
|
|
||||||
|
|
||||||
def num2bytes(value, size):
|
def num2bytes(value, size):
|
||||||
|
"""Convert an unsigned integer to MSB-first bytes with specified size."""
|
||||||
res = []
|
res = []
|
||||||
for _ in range(size):
|
for _ in range(size):
|
||||||
res.append(value & 0xFF)
|
res.append(value & 0xFF)
|
||||||
@@ -55,10 +64,12 @@ def num2bytes(value, size):
|
|||||||
|
|
||||||
|
|
||||||
def pack(fmt, *args):
|
def pack(fmt, *args):
|
||||||
|
"""Serialize MSB-first message."""
|
||||||
return struct.pack('>' + fmt, *args)
|
return struct.pack('>' + fmt, *args)
|
||||||
|
|
||||||
|
|
||||||
def frame(*msgs):
|
def frame(*msgs):
|
||||||
|
"""Serialize MSB-first length-prefixed frame."""
|
||||||
res = io.BytesIO()
|
res = io.BytesIO()
|
||||||
for msg in msgs:
|
for msg in msgs:
|
||||||
res.write(msg)
|
res.write(msg)
|
||||||
|
|||||||
Reference in New Issue
Block a user