manage streams as bytes
This commit is contained in:
139
StreamManager.js
139
StreamManager.js
@@ -1,12 +1,19 @@
|
|||||||
import Dispatcher from "./Dispatcher";
|
import Dispatcher from "./Dispatcher";
|
||||||
import * as CRC from './CRC';
|
import * as CRC from './CRC';
|
||||||
|
import * as PacketUtils from './PacketUtils';
|
||||||
import {
|
import {
|
||||||
bitsToBytes,
|
bitsToBytes,
|
||||||
bitsToInt,
|
bitsToInt,
|
||||||
numberToBytes
|
bytesToBits,
|
||||||
|
numberToBytes,
|
||||||
|
numberToHex
|
||||||
} from "./converters";
|
} from "./converters";
|
||||||
|
|
||||||
const dispatcher = new Dispatcher('StreamManager', ['change']);
|
const dispatcher = new Dispatcher('StreamManager', ['change']);
|
||||||
|
let DATA = new Uint8ClampedArray();
|
||||||
|
const FAILED_SEQUENCES = [];
|
||||||
|
let SAMPLES_EXPECTED = 0;
|
||||||
|
let SAMPLES_RECEIVED = 0;
|
||||||
|
|
||||||
const BITS = [];
|
const BITS = [];
|
||||||
let BITS_PER_PACKET = 0;
|
let BITS_PER_PACKET = 0;
|
||||||
@@ -26,12 +33,72 @@ export const addEventListener = dispatcher.addListener;
|
|||||||
export const removeEventListener = dispatcher.removeListener;
|
export const removeEventListener = dispatcher.removeListener;
|
||||||
|
|
||||||
export const reset = () => {
|
export const reset = () => {
|
||||||
|
let changed = false;
|
||||||
|
SAMPLES_RECEIVED = 0;
|
||||||
|
SAMPLES_EXPECTED = 0;
|
||||||
|
if(FAILED_SEQUENCES.length !== 0) {
|
||||||
|
FAILED_SEQUENCES.length = 0;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
if(DATA.length !== 0) {
|
||||||
|
DATA = new Uint8ClampedArray();
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
if(BITS.length !== 0) {
|
if(BITS.length !== 0) {
|
||||||
BITS.length = 0;
|
BITS.length = 0;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
if(changed)
|
||||||
dispatcher.emit('change');
|
dispatcher.emit('change');
|
||||||
|
}
|
||||||
|
export const getDataBytes = () => {
|
||||||
|
const dataSize = getTransferByteCount();
|
||||||
|
const dataSizeTrusted = isTransferByteCountTrusted();
|
||||||
|
const headerByteCount = getStreamHeaderByteCount();
|
||||||
|
if(dataSizeTrusted) {
|
||||||
|
return DATA.subarray(headerByteCount, headerByteCount + dataSize);
|
||||||
|
} else {
|
||||||
|
return DATA.subarray(headerByteCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
export const applyPacket = ({
|
||||||
|
crc,
|
||||||
|
actualCrc,
|
||||||
|
packetIndex,
|
||||||
|
sequence,
|
||||||
|
bytes,
|
||||||
|
size
|
||||||
|
}) => {
|
||||||
|
console.log('apply packet %s (sequence %s) crc %s was %s', packetIndex, sequence, numberToHex(8)(crc), numberToHex(8)(actualCrc));
|
||||||
|
const dataSize = PacketUtils.getPacketDataByteCount();
|
||||||
|
const offset = sequence * dataSize;
|
||||||
|
const length = offset + dataSize;
|
||||||
|
if(crc === actualCrc) {
|
||||||
|
if(FAILED_SEQUENCES.includes(sequence)) {
|
||||||
|
FAILED_SEQUENCES.splice(FAILED_SEQUENCES.indexOf(sequence), 1);
|
||||||
|
}
|
||||||
|
if(DATA.length < length) {
|
||||||
|
const copy = new Uint8ClampedArray(length);
|
||||||
|
copy.set(DATA.subarray(0, DATA.length));
|
||||||
|
DATA = copy;
|
||||||
|
}
|
||||||
|
DATA.set(bytes, offset);
|
||||||
|
delete BITS[packetIndex];
|
||||||
|
dispatcher.emit('packetReceived');
|
||||||
|
} else {
|
||||||
|
if(!FAILED_SEQUENCES.includes(sequence))
|
||||||
|
FAILED_SEQUENCES.push(sequence);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
export const getPercentReceived = () => {
|
||||||
|
if(SAMPLES_EXPECTED === 0) return 0;
|
||||||
|
return SAMPLES_RECEIVED / SAMPLES_EXPECTED;
|
||||||
|
}
|
||||||
|
export const setPacketsExpected = packetCount => {
|
||||||
|
if(packetCount < 0 || packetCount === Infinity) packetCount = 0;
|
||||||
|
// used when requesting individual packets out of sequence
|
||||||
|
SAMPLES_EXPECTED = packetCount * PacketUtils.getPacketSegmentCount();
|
||||||
|
}
|
||||||
export const changeConfiguration = ({
|
export const changeConfiguration = ({
|
||||||
segmentsPerPacket,
|
segmentsPerPacket,
|
||||||
bitsPerPacket,
|
bitsPerPacket,
|
||||||
@@ -52,11 +119,12 @@ export const setPacketEncoding = ({ encode, decode } = {}) => {
|
|||||||
PACKET_ENCODING.encode = encode ?? noEncoding;
|
PACKET_ENCODING.encode = encode ?? noEncoding;
|
||||||
PACKET_ENCODING.decode = decode ?? noEncoding;
|
PACKET_ENCODING.decode = decode ?? noEncoding;
|
||||||
}
|
}
|
||||||
export const addBits = (
|
export const addSample = (
|
||||||
packetIndex,
|
packetIndex,
|
||||||
segmentIndex,
|
segmentIndex,
|
||||||
bits
|
bits
|
||||||
) => {
|
) => {
|
||||||
|
SAMPLES_RECEIVED++;
|
||||||
if(BITS[packetIndex] === undefined) {
|
if(BITS[packetIndex] === undefined) {
|
||||||
BITS[packetIndex] = [];
|
BITS[packetIndex] = [];
|
||||||
}
|
}
|
||||||
@@ -64,6 +132,20 @@ export const addBits = (
|
|||||||
BITS[packetIndex][segmentIndex] = bits;
|
BITS[packetIndex][segmentIndex] = bits;
|
||||||
if(hasNewBits(oldBits, bits))
|
if(hasNewBits(oldBits, bits))
|
||||||
dispatcher.emit('change');
|
dispatcher.emit('change');
|
||||||
|
|
||||||
|
// Last segment in a packet?
|
||||||
|
if(segmentIndex === PacketUtils.getPacketSegmentCount() -1) {
|
||||||
|
// Unpack!
|
||||||
|
const packetBits = getPacketBits(packetIndex);
|
||||||
|
const unpacked = PacketUtils.unpack().getPacketFromBits(packetBits, packetIndex);
|
||||||
|
applyPacket(unpacked);
|
||||||
|
}
|
||||||
|
if(SAMPLES_EXPECTED === 0) {
|
||||||
|
const dataLength = getTransferByteCount();
|
||||||
|
if(isTransferByteCountTrusted()) {
|
||||||
|
SAMPLES_EXPECTED = PacketUtils.packetStats(dataLength).sampleCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
const sumSegmentBits = (sum, segment) => sum + segment.length;
|
const sumSegmentBits = (sum, segment) => sum + segment.length;
|
||||||
const sumPacketBits = (sum, packet) => sum + packet.reduce(sumSegmentBits, 0);
|
const sumPacketBits = (sum, packet) => sum + packet.reduce(sumSegmentBits, 0);
|
||||||
@@ -83,47 +165,7 @@ export const getPacketReceivedCount = () => {
|
|||||||
if(BITS.length === 0) return 1;
|
if(BITS.length === 0) return 1;
|
||||||
return BITS.length;
|
return BITS.length;
|
||||||
}
|
}
|
||||||
export const getStreamBits = () => {
|
|
||||||
const bits = [];
|
|
||||||
const packetCount = getPacketReceivedCount();
|
|
||||||
for(let packetIndex = 0; packetIndex < packetCount; packetIndex++) {
|
|
||||||
const packet = BITS[packetIndex] ?? [];
|
|
||||||
for(let segmentIndex = 0; segmentIndex < SEGMENTS_PER_PACKET; segmentIndex++) {
|
|
||||||
let segment = packet[segmentIndex] ?? [];
|
|
||||||
for(let bitIndex = 0; bitIndex < BITS_PER_SEGMENT; bitIndex++) {
|
|
||||||
const bit = segment[bitIndex];
|
|
||||||
bits.push(bit ?? 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return bits;
|
|
||||||
}
|
|
||||||
export const getPacketSegmentBits = (packetIndex, segmentIndex) => BITS[packetIndex]?.[segmentIndex];
|
export const getPacketSegmentBits = (packetIndex, segmentIndex) => BITS[packetIndex]?.[segmentIndex];
|
||||||
export const getAllPacketBits = () => {
|
|
||||||
const packetCount = getPacketReceivedCount();
|
|
||||||
const bits = [];
|
|
||||||
for(let packetIndex = 0; packetIndex < packetCount; packetIndex++) {
|
|
||||||
bits.push(...getPacketBits(packetIndex));
|
|
||||||
}
|
|
||||||
return bits;
|
|
||||||
}
|
|
||||||
export const getAllPacketBitsDecoded = () => {
|
|
||||||
const packetCount = getPacketReceivedCount();
|
|
||||||
const bits = [];
|
|
||||||
for(let packetIndex = 0; packetIndex < packetCount; packetIndex++) {
|
|
||||||
bits.push(...getPacketBitsDecoded(packetIndex));
|
|
||||||
}
|
|
||||||
return bits;
|
|
||||||
}
|
|
||||||
export const getDataBytes = () => {
|
|
||||||
const bits = getAllPacketBitsDecoded();
|
|
||||||
bits.splice(0, getStreamHeaderBitCount());
|
|
||||||
const bytes = bitsToBytes(bits);
|
|
||||||
if(isTransferByteCountTrusted()) {
|
|
||||||
bytes.length = getTransferByteCount();
|
|
||||||
}
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
export const getPacketBits = (packetIndex, defaultBit = 0) => {
|
export const getPacketBits = (packetIndex, defaultBit = 0) => {
|
||||||
const bits = [];
|
const bits = [];
|
||||||
const packet = BITS[packetIndex] ?? [];
|
const packet = BITS[packetIndex] ?? [];
|
||||||
@@ -143,13 +185,14 @@ export const getPacketBitsDecoded = (packetIndex, defaultBit = 0) => {
|
|||||||
const bits = getPacketBits(packetIndex, defaultBit);
|
const bits = getPacketBits(packetIndex, defaultBit);
|
||||||
return PACKET_ENCODING.decode(bits);
|
return PACKET_ENCODING.decode(bits);
|
||||||
}
|
}
|
||||||
const getStreamHeaderBitCount = () => {
|
const getStreamHeaderByteCount = () => {
|
||||||
return Object.keys(STREAM_HEADERS).reduce((lastBit, key) => {
|
const lastBit = Object.keys(STREAM_HEADERS).reduce((lastBit, key) => {
|
||||||
const {index = 0, length = 0} = STREAM_HEADERS[key];
|
const {index = 0, length = 0} = STREAM_HEADERS[key];
|
||||||
if(length === 0) return lastBit;
|
if(length === 0) return lastBit;
|
||||||
if(lastBit < index + length) return index + length;
|
if(lastBit < index + length) return index + length;
|
||||||
return lastBit;
|
return lastBit;
|
||||||
}, 0);
|
}, 0);
|
||||||
|
return Math.ceil(lastBit / 8);
|
||||||
}
|
}
|
||||||
const getStreamHeaderBits = name => {
|
const getStreamHeaderBits = name => {
|
||||||
const header = STREAM_HEADERS[name];
|
const header = STREAM_HEADERS[name];
|
||||||
@@ -174,17 +217,17 @@ export const getTransferByteCount = () => {
|
|||||||
export const getTransferByteCountCrc = () => {
|
export const getTransferByteCountCrc = () => {
|
||||||
const name = 'transfer byte count crc';
|
const name = 'transfer byte count crc';
|
||||||
const length = STREAM_HEADERS[name].length;
|
const length = STREAM_HEADERS[name].length;
|
||||||
if(length === 0) return 0;
|
if(length === 0) return -1;
|
||||||
const bits = getStreamHeaderBits(name);
|
const bits = getStreamHeaderBits(name);
|
||||||
if(bits.length !== length) return CRC.INVALID;
|
if(bits.length !== length) return CRC.INVALID;
|
||||||
return bitsToInt(bits, length);
|
return bitsToInt(bits, length);
|
||||||
}
|
}
|
||||||
export const getTransferByteCountActualCrc = () => {
|
export const getTransferByteCountActualCrc = () => {
|
||||||
const countBits = getStreamHeaderBits('transfer byte count').length;
|
const countBits = getStreamHeaderBits('transfer byte count').length;
|
||||||
if(countBits === 0) return 0;
|
if(countBits === 0) return -1;
|
||||||
|
|
||||||
const crcBits = getStreamHeaderBits('transfer byte count crc').length;
|
const crcBits = getStreamHeaderBits('transfer byte count crc').length;
|
||||||
if(crcBits === 0) return 0;
|
if(crcBits === 0) return -1;
|
||||||
|
|
||||||
const count = getTransferByteCount();
|
const count = getTransferByteCount();
|
||||||
const bytesOfCount = numberToBytes(count, countBits);
|
const bytesOfCount = numberToBytes(count, countBits);
|
||||||
|
|||||||
Reference in New Issue
Block a user