diff --git a/StreamManager.js b/StreamManager.js index e53000a..a5e018e 100644 --- a/StreamManager.js +++ b/StreamManager.js @@ -1,12 +1,19 @@ import Dispatcher from "./Dispatcher"; import * as CRC from './CRC'; +import * as PacketUtils from './PacketUtils'; import { bitsToBytes, bitsToInt, - numberToBytes + bytesToBits, + numberToBytes, + numberToHex } from "./converters"; const dispatcher = new Dispatcher('StreamManager', ['change']); +let DATA = new Uint8ClampedArray(); +const FAILED_SEQUENCES = []; +let SAMPLES_EXPECTED = 0; +let SAMPLES_RECEIVED = 0; const BITS = []; let BITS_PER_PACKET = 0; @@ -26,12 +33,72 @@ export const addEventListener = dispatcher.addListener; export const removeEventListener = dispatcher.removeListener; export const reset = () => { + let changed = false; + SAMPLES_RECEIVED = 0; + SAMPLES_EXPECTED = 0; + if(FAILED_SEQUENCES.length !== 0) { + FAILED_SEQUENCES.length = 0; + changed = true; + } + if(DATA.length !== 0) { + DATA = new Uint8ClampedArray(); + changed = true; + } if(BITS.length !== 0) { BITS.length = 0; + changed = true; + } + if(changed) dispatcher.emit('change'); +} +export const getDataBytes = () => { + const dataSize = getTransferByteCount(); + const dataSizeTrusted = isTransferByteCountTrusted(); + const headerByteCount = getStreamHeaderByteCount(); + if(dataSizeTrusted) { + return DATA.subarray(headerByteCount, headerByteCount + dataSize); + } else { + return DATA.subarray(headerByteCount); } } - +export const applyPacket = ({ + crc, + actualCrc, + packetIndex, + sequence, + bytes, + size +}) => { + console.log('apply packet %s (sequence %s) crc %s was %s', packetIndex, sequence, numberToHex(8)(crc), numberToHex(8)(actualCrc)); + const dataSize = PacketUtils.getPacketDataByteCount(); + const offset = sequence * dataSize; + const length = offset + dataSize; + if(crc === actualCrc) { + if(FAILED_SEQUENCES.includes(sequence)) { + FAILED_SEQUENCES.splice(FAILED_SEQUENCES.indexOf(sequence), 1); + } + if(DATA.length < length) { + const copy = new Uint8ClampedArray(length); + copy.set(DATA.subarray(0, DATA.length)); + DATA = copy; + } + DATA.set(bytes, offset); + delete BITS[packetIndex]; + dispatcher.emit('packetReceived'); + } else { + if(!FAILED_SEQUENCES.includes(sequence)) + FAILED_SEQUENCES.push(sequence); + } +} +export const getPercentReceived = () => { + if(SAMPLES_EXPECTED === 0) return 0; + return SAMPLES_RECEIVED / SAMPLES_EXPECTED; +} +export const setPacketsExpected = packetCount => { + if(packetCount < 0 || packetCount === Infinity) packetCount = 0; + // used when requesting individual packets out of sequence + SAMPLES_EXPECTED = packetCount * PacketUtils.getPacketSegmentCount(); +} export const changeConfiguration = ({ segmentsPerPacket, bitsPerPacket, @@ -52,11 +119,12 @@ export const setPacketEncoding = ({ encode, decode } = {}) => { PACKET_ENCODING.encode = encode ?? noEncoding; PACKET_ENCODING.decode = decode ?? noEncoding; } -export const addBits = ( +export const addSample = ( packetIndex, segmentIndex, bits ) => { + SAMPLES_RECEIVED++; if(BITS[packetIndex] === undefined) { BITS[packetIndex] = []; } @@ -64,6 +132,20 @@ export const addBits = ( BITS[packetIndex][segmentIndex] = bits; if(hasNewBits(oldBits, bits)) dispatcher.emit('change'); + + // Last segment in a packet? + if(segmentIndex === PacketUtils.getPacketSegmentCount() -1) { + // Unpack! + const packetBits = getPacketBits(packetIndex); + const unpacked = PacketUtils.unpack().getPacketFromBits(packetBits, packetIndex); + applyPacket(unpacked); + } + if(SAMPLES_EXPECTED === 0) { + const dataLength = getTransferByteCount(); + if(isTransferByteCountTrusted()) { + SAMPLES_EXPECTED = PacketUtils.packetStats(dataLength).sampleCount; + } + } } const sumSegmentBits = (sum, segment) => sum + segment.length; const sumPacketBits = (sum, packet) => sum + packet.reduce(sumSegmentBits, 0); @@ -83,47 +165,7 @@ export const getPacketReceivedCount = () => { if(BITS.length === 0) return 1; return BITS.length; } -export const getStreamBits = () => { - const bits = []; - const packetCount = getPacketReceivedCount(); - for(let packetIndex = 0; packetIndex < packetCount; packetIndex++) { - const packet = BITS[packetIndex] ?? []; - for(let segmentIndex = 0; segmentIndex < SEGMENTS_PER_PACKET; segmentIndex++) { - let segment = packet[segmentIndex] ?? []; - for(let bitIndex = 0; bitIndex < BITS_PER_SEGMENT; bitIndex++) { - const bit = segment[bitIndex]; - bits.push(bit ?? 0); - } - } - } - return bits; -} export const getPacketSegmentBits = (packetIndex, segmentIndex) => BITS[packetIndex]?.[segmentIndex]; -export const getAllPacketBits = () => { - const packetCount = getPacketReceivedCount(); - const bits = []; - for(let packetIndex = 0; packetIndex < packetCount; packetIndex++) { - bits.push(...getPacketBits(packetIndex)); - } - return bits; -} -export const getAllPacketBitsDecoded = () => { - const packetCount = getPacketReceivedCount(); - const bits = []; - for(let packetIndex = 0; packetIndex < packetCount; packetIndex++) { - bits.push(...getPacketBitsDecoded(packetIndex)); - } - return bits; -} -export const getDataBytes = () => { - const bits = getAllPacketBitsDecoded(); - bits.splice(0, getStreamHeaderBitCount()); - const bytes = bitsToBytes(bits); - if(isTransferByteCountTrusted()) { - bytes.length = getTransferByteCount(); - } - return bytes; -} export const getPacketBits = (packetIndex, defaultBit = 0) => { const bits = []; const packet = BITS[packetIndex] ?? []; @@ -143,13 +185,14 @@ export const getPacketBitsDecoded = (packetIndex, defaultBit = 0) => { const bits = getPacketBits(packetIndex, defaultBit); return PACKET_ENCODING.decode(bits); } -const getStreamHeaderBitCount = () => { - return Object.keys(STREAM_HEADERS).reduce((lastBit, key) => { +const getStreamHeaderByteCount = () => { + const lastBit = Object.keys(STREAM_HEADERS).reduce((lastBit, key) => { const {index = 0, length = 0} = STREAM_HEADERS[key]; if(length === 0) return lastBit; if(lastBit < index + length) return index + length; return lastBit; }, 0); + return Math.ceil(lastBit / 8); } const getStreamHeaderBits = name => { const header = STREAM_HEADERS[name]; @@ -174,17 +217,17 @@ export const getTransferByteCount = () => { export const getTransferByteCountCrc = () => { const name = 'transfer byte count crc'; const length = STREAM_HEADERS[name].length; - if(length === 0) return 0; + if(length === 0) return -1; const bits = getStreamHeaderBits(name); if(bits.length !== length) return CRC.INVALID; return bitsToInt(bits, length); } export const getTransferByteCountActualCrc = () => { const countBits = getStreamHeaderBits('transfer byte count').length; - if(countBits === 0) return 0; + if(countBits === 0) return -1; const crcBits = getStreamHeaderBits('transfer byte count crc').length; - if(crcBits === 0) return 0; + if(crcBits === 0) return -1; const count = getTransferByteCount(); const bytesOfCount = numberToBytes(count, countBits);