From fb0abe1b161f7af7f596e969343788eb674cfd9d Mon Sep 17 00:00:00 2001 From: Joseph <162703152+josephnef@users.noreply.github.com> Date: Sun, 7 Jun 2026 17:15:38 +0300 Subject: [PATCH] RaptorQ (RFC 6330) FEC layer for the stream link MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The corruption survey in PR #85 showed that real-range OFDM frames on this link will see 30-70% loss. tun_p2p.py's blind --repeat N is a fixed-cost workaround; this PR ships a real erasure code on top of the existing stream framing. * tools/precoder/pyproject.toml: add `raptorq>=2` (cberner's PyO3 binding to the Rust RFC 6330 implementation; abi3 manylinux wheels on PyPI, MIT, ~26 Gbps enc / ~7 Gbps dec at K=1000). * tools/precoder/stream_fec.py: FecConfig (K, symbol_size, overhead) + FecEncoder.add_packet / .flush + FecDecoder.add_symbol / .expire_blocks_older_than. IP packets are concatenation-packed into K-symbol blocks (u16 length prefix + payload per packet, zero pad to symbol_size). RaptorQ produces K + ceil(K*overhead) output symbols per block; receiver decodes from any K+ε of them. * tools/precoder/test_stream_fec.py: 19 unit tests — round-trip with no loss; loss tolerance @ 0/20/40% (R/K=1 reliably recovers there); 50% loss recovers only at R/K=2; 70% loss is unrecoverable (asserts the expire-blocks bookkeeping fires); concatenation packing; partial-block flush; block-id wrap; MTU enforcement; garbage-envelope handling. * tools/precoder/tun_p2p.py: new --fec-k / --fec-overhead / --fec-symbol-size / --fec-flush-ms / --fec-block-expire-ms flags. tx_thread feeds packets through FecEncoder; a parallel fec_flush_thread force-encodes partial blocks every flush-ms so sparse traffic doesn't stall. rx_thread feeds payloads through FecDecoder, writing decoded IP packets to TUN. RX-side seq dedup is forced off when FEC is on (RaptorQ symbols are dedup-friendly via SBN+ESI). New `fec=[...]` segment in the periodic stderr report. * tun_p2p.py docstring extended with an FEC section. Inner envelope (lives inside StreamFrame.payload): MAGIC(2)=0xF52E VER(1)=0 K(1) KREAL(1) SYMBOL_SIZE(2) BLOCK_ID(2) RAPTORQ_PKT(var) = 9 B header + raptorq-managed SBN+ESI+symbol. Source symbols are themselves concatenations of length-prefixed IP packets, so small packets (ACK flood) share symbols instead of each burning a whole symbol's worth of airtime. Hardware verification (two-netns single-host bench, RTL8812AU 0x8812 + TP-Link Archer T2U Plus / RTL8821AU 0x0120, ch 6, no --repeat, ping -c 30 -i 1): --fec-k 16 --fec-overhead 1.0 --fec-flush-ms 50 30/30 received, 0% loss, 0 DUP, RTT 121/160/207 ms blk-ok=30 blk-lost=1 (startup), sym-tx=1023 sym-rx=973 --fec-k 8 --fec-overhead 1.0 --fec-flush-ms 20 30/30 received, 0% loss, 0 DUP, RTT 73/95/145 ms blk-ok=30 blk-lost=1, sym-tx=544 sym-rx=511 The smaller K trades a bit of recovery margin for a 65 ms drop in median RTT. Both decode 100% on a healthy link; the survey's noisier regimes are what motivates --fec-overhead > 1. Test results cd tools/precoder && uv run pytest → 87 passed (31 pipeline + 37 stream + 19 fec) python -m pytest tests/precoder_smoke.py tests/precoder_stream_smoke.py → 8 passed Open caveats (documented in stream_fec.py / tun_p2p.py docstrings) * Strict block boundaries — no cross-block FEC, no Raptor carousel. Good enough at K=8-16 + 20-50 ms flush; revisit if the latency budget tightens further. * No rateless dynamic overhead — R/K is fixed at construction. * RFC 6330 has Qualcomm patents largely expired in primary jurisdictions by 2026; cberner's MIT lib explicitly notes this. Co-Authored-By: Claude Opus 4.7 --- tools/precoder/pyproject.toml | 7 +- tools/precoder/stream_fec.py | 350 ++++++++++++++++++++++++++++++ tools/precoder/test_stream_fec.py | 315 +++++++++++++++++++++++++++ tools/precoder/tun_p2p.py | 278 ++++++++++++++++++++++-- tools/precoder/uv.lock | 11 + 5 files changed, 938 insertions(+), 23 deletions(-) create mode 100644 tools/precoder/stream_fec.py create mode 100644 tools/precoder/test_stream_fec.py diff --git a/tools/precoder/pyproject.toml b/tools/precoder/pyproject.toml index 2596fe7..2632ca1 100644 --- a/tools/precoder/pyproject.toml +++ b/tools/precoder/pyproject.toml @@ -6,6 +6,11 @@ readme = "README.md" requires-python = ">=3.9" dependencies = [ "numpy>=1.23", + # RaptorQ (RFC 6330) fountain-code FEC for the stream link's + # packet-erasure channel. cberner's Rust+PyO3 implementation has + # manylinux abi3 wheels on PyPI so `uv sync` doesn't need a Rust + # toolchain. Used by stream_fec.py / tun_p2p.py. + "raptorq>=2", ] # Phase-B RF verification (tools/precoder/fft_capture.py) needs an SDR stack. @@ -24,4 +29,4 @@ dev = [ [tool.pytest.ini_options] # test_pipeline.py lives alongside the module; default (prepend) import mode # puts this dir on sys.path so `import encode_subcarriers` resolves. -testpaths = ["test_pipeline.py", "test_stream.py"] +testpaths = ["test_pipeline.py", "test_stream.py", "test_stream_fec.py"] diff --git a/tools/precoder/stream_fec.py b/tools/precoder/stream_fec.py new file mode 100644 index 0000000..f418e55 --- /dev/null +++ b/tools/precoder/stream_fec.py @@ -0,0 +1,350 @@ +"""RaptorQ (RFC 6330) fountain-code FEC layer for the stream link. + +Sits between the TUN device and `stream.py`'s framing. The channel underneath +is a packet-erasure channel (whole frames either arrive clean or are dropped +by the stream layer's CRC check), so a real erasure code can recover from a +loss rate the blind `--repeat N` knob in `tun_p2p.py` can't match without +infinite N. RaptorQ was designed for exactly this regime (broadcast, no ACK, +variable loss) — see the corruption survey in PR #85 for the empirical +30-70% loss numbers that motivated this. + +Wire format. The OUTER stream framing in `stream.py` is unchanged; FEC lives +INSIDE `StreamFrame.payload` as a fixed 9-byte inner envelope followed by +the serialised raptorq packet (lib-managed SBN + ESI + symbol): + + FEC_MAGIC (2) = 0xF52E little-endian + VERSION/FLAGS (1) = 0 (reserved) + K (1) = source symbols per block (encoder config) + KREAL (1) = real source symbols in THIS block (1..K). For + flush-padded last blocks KREAL < K and the trailing + (K - KREAL) decoded source symbols are zero pads. + SYMBOL_SIZE (2) = LE u16, must match TX config + BLOCK_ID (2) = LE u16 wraps; identifies which K-symbol block + RAPTORQ_PKT (var) = lib's serialised packet (≈ 4 byte SBN+ESI plus + the symbol bytes; size depends on lib internals) + +Source symbols (the K bytes-segments the codec encodes) are themselves +concatenations of length-prefixed IP packets: + + [u16 len_a LE][packet_a]…[u16 len_b LE][packet_b]…[zero pad to SYMBOL_SIZE] + +A `len = 0` sentinel or remaining bytes < 2 marks "no more packets in this +symbol". This packs small packets densely (ACK floods don't burn one symbol +each) and keeps a packet whose size is close to symbol_size in exactly one +symbol. Tun MTU must be at most `SYMBOL_SIZE - 2`. + +Block-decoding model — cberner's `raptorq` lib (RFC 6330 reference port) +exposes block-incremental decoding: feed packets one at a time, the lib +returns the K source symbols (concatenated) when enough have arrived. We +keep one `raptorq.Decoder` per in-flight block id; entries are dropped +either after successful decode or after `expire_blocks_older_than` (the +tun_p2p bridge calls this periodically with a configurable max age). +""" + +from __future__ import annotations + +import math +import struct +import time +from dataclasses import dataclass +from typing import Optional + +import raptorq + +FEC_MAGIC = 0xF52E +FEC_HEADER_LEN = 9 +FEC_HEADER_STRUCT = " None: + if self.k <= 0 or self.k > 255: + raise ValueError(f"k must be in 1..255 (got {self.k})") + if self.symbol_size <= PACKET_LEN_PREFIX: + raise ValueError( + f"symbol_size must be > {PACKET_LEN_PREFIX} (got {self.symbol_size})" + ) + if self.overhead < 0: + raise ValueError(f"overhead must be >= 0 (got {self.overhead})") + + @property + def repair_count(self) -> int: + """Number of repair packets per block (= ceil(k * overhead)).""" + return math.ceil(self.k * self.overhead) + + @property + def max_packet_size(self) -> int: + """Largest IP packet (without length prefix) that fits in one symbol.""" + return self.symbol_size - PACKET_LEN_PREFIX + + +def _pack_header(cfg: FecConfig, kreal: int, block_id: int) -> bytes: + return struct.pack( + FEC_HEADER_STRUCT, + FEC_MAGIC, 0, cfg.k, kreal, cfg.symbol_size, block_id & 0xFFFF, + ) + + +def _unpack_header(env: bytes) -> Optional[tuple[int, int, int, int, int]]: + """Return (version, k, kreal, symbol_size, block_id) or None on bad magic.""" + if len(env) < FEC_HEADER_LEN: + return None + magic, ver, k, kreal, ss, bid = struct.unpack_from(FEC_HEADER_STRUCT, env) + if magic != FEC_MAGIC: + return None + return ver, k, kreal, ss, bid + + +# --------------------------------------------------------------------------- # +# Encoder +# --------------------------------------------------------------------------- # +class FecEncoder: + """Concatenation-packs IP packets, then RaptorQ-encodes K source symbols + into K + repair_count output symbols. Each output symbol is wrapped in + the inner envelope and returned as bytes ready to drop into a + `StreamFrame.payload`. + """ + + def __init__(self, cfg: FecConfig) -> None: + self.cfg = cfg + self._pending_symbols: list[bytes] = [] + self._current_symbol = bytearray() + self._block_id = 0 + # Stats — read by tun_p2p for periodic counter prints. + self.blocks_encoded = 0 + self.packets_in = 0 + self.symbols_out = 0 + self.bytes_in = 0 + + def add_packet(self, pkt: bytes) -> list[bytes]: + """Append `pkt` to the current packing symbol. If the symbol fills, it + is moved to the pending list; if pending reaches K, a block is + encoded and the K + repair_count envelopes are returned. Otherwise + returns []. + + Raises ValueError if `pkt` is too large to fit in even an empty + symbol — the caller (tun_p2p) is responsible for capping TUN MTU at + `cfg.max_packet_size`. + """ + n = len(pkt) + if n > self.cfg.max_packet_size: + raise ValueError( + f"packet {n}B exceeds max_packet_size {self.cfg.max_packet_size}B " + f"(symbol_size {self.cfg.symbol_size} - {PACKET_LEN_PREFIX} prefix)" + ) + self.packets_in += 1 + self.bytes_in += n + needed = PACKET_LEN_PREFIX + n + # If this packet doesn't fit alongside what's already in the current + # symbol, seal that symbol off first. + if needed > self.cfg.symbol_size - len(self._current_symbol): + self._seal_current_symbol() + ready = self._maybe_encode_full_block() + if ready: + # Stash the new packet for the next block, then return the + # encoded one. (We append below.) + self._append_to_current(pkt) + return ready + self._append_to_current(pkt) + return [] + + def _append_to_current(self, pkt: bytes) -> None: + self._current_symbol += struct.pack(" None: + if not self._current_symbol: + return + pad = self.cfg.symbol_size - len(self._current_symbol) + if pad: + self._current_symbol += b"\x00" * pad + self._pending_symbols.append(bytes(self._current_symbol)) + self._current_symbol = bytearray() + + def _maybe_encode_full_block(self) -> list[bytes]: + if len(self._pending_symbols) >= self.cfg.k: + return self._encode_block(kreal=self.cfg.k) + return [] + + def flush(self) -> list[bytes]: + """Force-encode whatever's pending. The current packing symbol is + sealed (zero-padded), then pending is zero-padded to K, then the + block is encoded with KREAL set to the count of real (non-padded) + source symbols. Decoder uses KREAL to discard the trailing zero + symbols when unpacking IP packets. + + Returns [] if there's literally nothing pending. + """ + self._seal_current_symbol() + if not self._pending_symbols: + return [] + kreal = len(self._pending_symbols) + while len(self._pending_symbols) < self.cfg.k: + self._pending_symbols.append(b"\x00" * self.cfg.symbol_size) + return self._encode_block(kreal=kreal) + + def _encode_block(self, kreal: int) -> list[bytes]: + data = b"".join(self._pending_symbols) + self._pending_symbols.clear() + encoder = raptorq.Encoder.with_defaults(data, self.cfg.symbol_size) + packets = encoder.get_encoded_packets(self.cfg.repair_count) + bid = self._block_id + self._block_id = (self._block_id + 1) & 0xFFFF + self.blocks_encoded += 1 + self.symbols_out += len(packets) + header = _pack_header(self.cfg, kreal, bid) + return [header + p for p in packets] + + @property + def pending_packets(self) -> int: + """Whether there is in-flight data that hasn't been flushed. Used by + the tun_p2p flush timer thread to decide whether to call flush().""" + if self._current_symbol: + return 1 + return len(self._pending_symbols) + + +# --------------------------------------------------------------------------- # +# Decoder +# --------------------------------------------------------------------------- # +@dataclass +class _BlockState: + decoder: "raptorq.Decoder" + kreal: int + first_seen: float + decoded: bool = False + + +class FecDecoder: + """Buffers incoming inner-envelope symbols by block-id, runs RaptorQ's + block-incremental decoder, unpacks length-prefixed IP packets out of + the first KREAL source symbols when a block decodes. + """ + + def __init__(self, cfg: FecConfig) -> None: + self.cfg = cfg + self._blocks: dict[int, _BlockState] = {} + # Stats. + self.blocks_decoded = 0 + self.blocks_unrecoverable = 0 + self.symbols_in = 0 + self.symbols_dropped_bad_cfg = 0 + self.symbols_dropped_stale_block = 0 + self.packets_out = 0 + self.bytes_out = 0 + + def add_symbol(self, envelope: bytes) -> list[bytes]: + """Feed one inner-envelope blob; returns recovered IP packets when a + block decodes (possibly empty).""" + header = _unpack_header(envelope) + if header is None: + return [] + version, k, kreal, symbol_size, block_id = header + if version != 0: + return [] + if k != self.cfg.k or symbol_size != self.cfg.symbol_size: + self.symbols_dropped_bad_cfg += 1 + return [] + if not (1 <= kreal <= k): + self.symbols_dropped_bad_cfg += 1 + return [] + packet = envelope[FEC_HEADER_LEN:] + self.symbols_in += 1 + + state = self._blocks.get(block_id) + if state is None: + state = _BlockState( + decoder=raptorq.Decoder.with_defaults( + k * symbol_size, symbol_size), + kreal=kreal, + first_seen=time.monotonic(), + ) + self._blocks[block_id] = state + elif state.decoded: + # Late symbol for an already-decoded block; the entry will be + # GC'd by expire_blocks_older_than. We don't re-decode. + self.symbols_dropped_stale_block += 1 + return [] + + # Decoder.decode returns the concatenated source bytes when enough + # symbols have arrived for the block; None otherwise. + try: + result = state.decoder.decode(packet) + except Exception: + # Malformed packet bytes; raptorq raises on bad SBN/ESI fields. + return [] + if result is None: + return [] + # Mark this block decoded but keep the entry alive so late symbols + # (we ship K + repair_count, decoder typically needs K + ε ~< all of + # them) don't get fed into a fresh Decoder and trigger a second + # spurious decode. The `state.decoded` branch above silently drops + # those late symbols. `expire_blocks_older_than` GCs the entry. + state.decoded = True + self.blocks_decoded += 1 + ip_pkts = self._unpack(result, state.kreal) + self.packets_out += len(ip_pkts) + self.bytes_out += sum(len(p) for p in ip_pkts) + return ip_pkts + + def _unpack(self, decoded: bytes, kreal: int) -> list[bytes]: + ss = self.cfg.symbol_size + out: list[bytes] = [] + for i in range(kreal): + symbol = decoded[i * ss:(i + 1) * ss] + pos = 0 + while pos + PACKET_LEN_PREFIX <= len(symbol): + ln = int.from_bytes( + symbol[pos:pos + PACKET_LEN_PREFIX], "little") + if ln == 0: + break + end = pos + PACKET_LEN_PREFIX + ln + if end > len(symbol): + # Malformed (truncated) — stop scanning this symbol. + break + out.append(bytes(symbol[pos + PACKET_LEN_PREFIX:end])) + pos = end + return out + + def expire_blocks_older_than(self, max_age_s: float) -> int: + """Drop block entries whose first symbol arrived more than `max_age_s` + ago. Both decoded and undecoded entries are GC'd; for undecoded + ones, `blocks_unrecoverable` is incremented. + + Returns the count of UNRECOVERABLE (= still undecoded) blocks + evicted — useful as a counter for the bridge's stderr report. The + return value intentionally excludes successfully-decoded blocks + even though they're also evicted. + """ + if not self._blocks: + return 0 + now = time.monotonic() + expired = [ + bid for bid, st in self._blocks.items() + if (now - st.first_seen) > max_age_s + ] + unrecoverable = 0 + for bid in expired: + if not self._blocks[bid].decoded: + unrecoverable += 1 + del self._blocks[bid] + self.blocks_unrecoverable += unrecoverable + return unrecoverable + + @property + def in_flight_blocks(self) -> int: + return len(self._blocks) diff --git a/tools/precoder/test_stream_fec.py b/tools/precoder/test_stream_fec.py new file mode 100644 index 0000000..a610e7e --- /dev/null +++ b/tools/precoder/test_stream_fec.py @@ -0,0 +1,315 @@ +"""Tests for the RaptorQ FEC layer (`stream_fec.py`). + +Cover: + * Round-trip with no symbol loss (full reconstruction). + * Loss tolerance at 30/50/70% random symbol drop (R/K = 1 default → 50% is + the design point; 30% must succeed every time, 70% is expected to fail). + * Concatenation packing — small + medium packets share symbols correctly. + * Partial-block flush — KREAL < K, decoder emits exactly KREAL worth of + packets, no trailing garbage from padded zero symbols. + * Block ID wrap — encoder rolls past 65535 without colliding live blocks. + * MTU enforcement — oversized packet → ValueError. + * Unrecoverable-block bookkeeping — expire_blocks_older_than counts them. +""" + +from __future__ import annotations + +import random +import time + +import pytest + +import stream_fec +from stream_fec import FecConfig, FecDecoder, FecEncoder + + +def _round_trip(packets: list[bytes], cfg: FecConfig, + drop_pct: float = 0.0, + seed: int = 0, + flush: bool = True) -> list[bytes]: + """TX-encode `packets` into envelopes, randomly drop `drop_pct`, RX-decode. + Returns the recovered packet stream in arrival order. Per-block.""" + enc = FecEncoder(cfg) + dec = FecDecoder(cfg) + envelopes: list[bytes] = [] + for p in packets: + envelopes += enc.add_packet(p) + if flush: + envelopes += enc.flush() + rng = random.Random(seed) + rng.shuffle(envelopes) + keep = envelopes[int(len(envelopes) * drop_pct):] + out: list[bytes] = [] + for env in keep: + out += dec.add_symbol(env) + return out + + +# --------------------------------------------------------------------------- # +# Config sanity +# --------------------------------------------------------------------------- # +def test_config_rejects_bad_args(): + with pytest.raises(ValueError): + FecConfig(k=0) + with pytest.raises(ValueError): + FecConfig(k=256) + with pytest.raises(ValueError): + FecConfig(symbol_size=1) + with pytest.raises(ValueError): + FecConfig(overhead=-0.1) + + +def test_repair_count_is_ceil_k_times_overhead(): + assert FecConfig(k=16, overhead=1.0).repair_count == 16 + assert FecConfig(k=16, overhead=0.5).repair_count == 8 + assert FecConfig(k=16, overhead=0.51).repair_count == 9 + assert FecConfig(k=16, overhead=2.0).repair_count == 32 + + +# --------------------------------------------------------------------------- # +# Round-trip +# --------------------------------------------------------------------------- # +def test_no_loss_round_trip_recovers_every_packet(): + cfg = FecConfig(k=8, symbol_size=200, overhead=0.5) + rng = random.Random(0xC0FFEE) + pkts = [bytes(rng.randint(0, 255) for _ in range(rng.randint(20, 180))) + for _ in range(20)] + out = _round_trip(pkts, cfg, drop_pct=0.0, seed=1) + assert out == pkts + + +@pytest.mark.parametrize("drop_pct", [0.0, 0.2, 0.4]) +def test_recovers_within_design_loss(drop_pct): + """At R/K = 1 (overhead 1.0) cberner's raptorq needs K_effective+ε + symbols per block where K_effective ≈ K + 1 (small overhead the lib + adds for OTI alignment). With K=8 + R=8 = 16 envelopes per block we + have ~7 symbols of headroom, so 40% loss is reliably recoverable; + above that we're in the regime where the bridge's FEC needs higher + overhead. Asymptotic K → ∞ gives the textbook ~50% loss tolerance + at R/K = 1 — see the 70%-loss test for the unrecoverable-block side. + + The decoder emits packets per-block when each block crosses the decode + threshold, so cross-block envelope order can reorder packets in groups + of K — within a block, order is preserved. We assert set-equality on + the bytes.""" + cfg = FecConfig(k=8, symbol_size=200, overhead=1.0) + rng = random.Random(0xBEEF + int(drop_pct * 100)) + pkts = [ + i.to_bytes(2, "little") + bytes( + rng.randint(0, 255) for _ in range(rng.randint(50, 178))) + for i in range(16) + ] + out = _round_trip(pkts, cfg, drop_pct=drop_pct, seed=int(drop_pct * 10)) + assert sorted(out) == sorted(pkts), ( + f"drop={drop_pct}: got {len(out)}/{len(pkts)} packets, " + f"set-equal={set(out) == set(pkts)}" + ) + + +def test_50pct_loss_recoverable_at_higher_overhead(): + """The default overhead R/K=1 is the recommended middle of the survey's + loss range; raising to R/K=2 buys reliable decode at 50% loss for the + small-K regime where cberner's lib needs K+1 packets.""" + cfg = FecConfig(k=8, symbol_size=200, overhead=2.0) + rng = random.Random(0xFADE) + pkts = [bytes(rng.randint(0, 255) for _ in range(rng.randint(50, 178))) + for _ in range(16)] + out = _round_trip(pkts, cfg, drop_pct=0.5, seed=42) + assert sorted(out) == sorted(pkts) + + +def test_70_percent_loss_is_unrecoverable_with_default_overhead(): + """Documents the expectation: at 70% drop and R/K = 1, decode fails. + The bridge's job is to count unrecoverable blocks via the decoder's + expire_blocks_older_than API, not to silently lose packets.""" + cfg = FecConfig(k=8, symbol_size=200, overhead=1.0) + rng = random.Random(0xDEAD) + pkts = [bytes(rng.randint(0, 255) for _ in range(rng.randint(20, 180))) + for _ in range(16)] + enc = FecEncoder(cfg) + dec = FecDecoder(cfg) + envelopes = [] + for p in pkts: + envelopes += enc.add_packet(p) + envelopes += enc.flush() + rng.shuffle(envelopes) + keep = envelopes[int(len(envelopes) * 0.7):] + out = [] + for env in keep: + out += dec.add_symbol(env) + # At 70% drop expect 0 or 1 blocks decoded (random; structural — RaptorQ + # has small probability of decoding with significantly fewer than K + # symbols, but the typical case fails). + assert len(out) < len(pkts) + # Aging-out the still-in-flight blocks moves them into the unrecoverable + # counter — that's the bridge's signal. + dec.expire_blocks_older_than(-1.0) + assert dec.blocks_unrecoverable >= 1 + + +# --------------------------------------------------------------------------- # +# Concatenation packing +# --------------------------------------------------------------------------- # +def test_many_small_packets_share_one_symbol(): + """32 packets of 30 B with symbol_size=200 must pack ~6 packets per + symbol (30 + 2-byte prefix = 32 B per slot, 6 fit in 200 with 8 B left + for zero pad). K=8 → 32 packets should fit in much less than 32 symbols.""" + cfg = FecConfig(k=8, symbol_size=200, overhead=1.0) + rng = random.Random(0xCAFE) + pkts = [bytes(rng.randint(0, 255) for _ in range(30)) for _ in range(32)] + enc = FecEncoder(cfg) + envelopes = [] + for p in pkts: + envelopes += enc.add_packet(p) + envelopes += enc.flush() + # Decoder gives back exactly the input order. + dec = FecDecoder(cfg) + out = [] + for env in envelopes: + out += dec.add_symbol(env) + assert out == pkts + # Symbol-efficiency: at most ceil(32/6) = 6 source symbols needed for 32 + # packets. Two blocks (K=8) means 16 source slots, plenty of headroom; + # the encoder should have produced exactly 1 block. + assert enc.blocks_encoded == 1 + + +def test_mixed_sizes_round_trip_preserves_order(): + cfg = FecConfig(k=4, symbol_size=200, overhead=1.0) + pkts = [ + bytes([0x11] * 30), bytes([0x22] * 100), bytes([0x33] * 10), + bytes([0x44] * 180), bytes([0x55] * 5), bytes([0x66] * 50), + ] + out = _round_trip(pkts, cfg, drop_pct=0.0) + assert out == pkts + + +def test_max_size_packet_fits_in_its_own_symbol(): + cfg = FecConfig(k=2, symbol_size=200, overhead=1.0) + pkts = [bytes([0xA1]) * cfg.max_packet_size, + bytes([0xB2]) * cfg.max_packet_size] + out = _round_trip(pkts, cfg, drop_pct=0.0) + assert out == pkts + + +def test_oversized_packet_raises(): + cfg = FecConfig(k=4, symbol_size=200) + enc = FecEncoder(cfg) + with pytest.raises(ValueError): + enc.add_packet(bytes(cfg.max_packet_size + 1)) + + +# --------------------------------------------------------------------------- # +# Partial / flush +# --------------------------------------------------------------------------- # +def test_partial_block_flush_emits_exact_kreal_packets(): + cfg = FecConfig(k=8, symbol_size=200, overhead=1.0) + pkts = [bytes([i + 1]) * 50 for i in range(3)] # 3 packets, well under K + out = _round_trip(pkts, cfg, drop_pct=0.0) + assert out == pkts + + +def test_unflushed_partial_block_emits_nothing(): + cfg = FecConfig(k=8, symbol_size=200, overhead=1.0) + enc = FecEncoder(cfg) + dec = FecDecoder(cfg) + pkts = [bytes([0x55]) * 50 for _ in range(3)] + envelopes = [] + for p in pkts: + envelopes += enc.add_packet(p) + # No flush() — envelopes should be empty, decoder should produce nothing. + assert envelopes == [] + out = [] + for env in envelopes: + out += dec.add_symbol(env) + assert out == [] + + +# --------------------------------------------------------------------------- # +# Block ID wrap + multi-block sequencing +# --------------------------------------------------------------------------- # +def test_block_id_wraps_without_aliasing_in_flight(): + """Drive the encoder through 65538 blocks with K=2 to wrap u16. The + decoder should see each block id at most once at a time.""" + cfg = FecConfig(k=2, symbol_size=200, overhead=0.5) + enc = FecEncoder(cfg) + dec = FecDecoder(cfg) + pkts_in: list[bytes] = [] + pkts_out: list[bytes] = [] + for i in range(65540): + pkt = bytes([(i & 0xFF), ((i >> 8) & 0xFF)] * 50) # 100 B + pkts_in.append(pkt) + envelopes = enc.add_packet(pkt) + for env in envelopes: + pkts_out += dec.add_symbol(env) + pkts_out += [p for env in enc.flush() for p in dec.add_symbol(env)] + # Some blocks may decode out of order or after later ones; what we care + # about is no aliasing damage — every input packet emerges intact. + assert pkts_out == pkts_in + + +def test_multiple_blocks_interleaved_at_decoder(): + """Out-of-order envelope arrival across blocks must still decode each + block correctly.""" + cfg = FecConfig(k=4, symbol_size=200, overhead=1.0) + rng = random.Random(7) + pkts = [bytes([i + 1]) * 50 for i in range(12)] # 3 blocks + enc = FecEncoder(cfg) + dec = FecDecoder(cfg) + envelopes = [] + for p in pkts: + envelopes += enc.add_packet(p) + envelopes += enc.flush() + rng.shuffle(envelopes) + out = [] + for env in envelopes: + out += dec.add_symbol(env) + # Per-block order is preserved by the codec; across blocks the decode + # order depends on which block reached the K-symbol threshold first. + # Sort by first-byte tag, which uniquely identifies each input packet. + out_by_tag = sorted(out, key=lambda b: b[0]) + pkts_by_tag = sorted(pkts, key=lambda b: b[0]) + assert out_by_tag == pkts_by_tag + + +# --------------------------------------------------------------------------- # +# Expiry / bookkeeping +# --------------------------------------------------------------------------- # +def test_expire_blocks_older_than_drops_stale(): + cfg = FecConfig(k=8, symbol_size=200, overhead=0.5) + enc = FecEncoder(cfg) + dec = FecDecoder(cfg) + # Need packets that each occupy their own symbol so we trigger encoding. + # With symbol_size=200 and max_packet_size=198, use ~190-byte packets. + pkts = [bytes([i + 1]) * 190 for i in range(cfg.k)] + envelopes = [] + for p in pkts: + envelopes += enc.add_packet(p) + envelopes += enc.flush() # ensure the block is encoded + assert envelopes, "encoder produced no envelopes — block not encoded" + # Feed only one envelope (decoder won't have enough to decode). + dec.add_symbol(envelopes[0]) + assert dec.in_flight_blocks == 1 + n = dec.expire_blocks_older_than(-1.0) # negative age = expire everything + assert n == 1 + assert dec.blocks_unrecoverable == 1 + assert dec.in_flight_blocks == 0 + + +def test_decoder_drops_symbol_with_wrong_config(): + cfg = FecConfig(k=8, symbol_size=200) + dec = FecDecoder(cfg) + # Hand-craft an envelope with a different k. + bad_header = stream_fec._pack_header( + FecConfig(k=16, symbol_size=200), kreal=16, block_id=0) + out = dec.add_symbol(bad_header + b"\x00" * 200) + assert out == [] + assert dec.symbols_dropped_bad_cfg == 1 + + +def test_decoder_ignores_garbage_envelope(): + cfg = FecConfig(k=8, symbol_size=200) + dec = FecDecoder(cfg) + assert dec.add_symbol(b"\x00\x00not a fec frame") == [] + assert dec.add_symbol(b"") == [] + assert dec.symbols_in == 0 diff --git a/tools/precoder/tun_p2p.py b/tools/precoder/tun_p2p.py index 2d895da..fe5d957 100644 --- a/tools/precoder/tun_p2p.py +++ b/tools/precoder/tun_p2p.py @@ -55,6 +55,29 @@ stream_tx.py — offset/entry_state default to 0 so the shape is model-bound, not on-air-honoured. Useful for "the bytes encode a shape" demos, not for proving per-subcarrier IQ at the antenna. + +FEC MODE (RaptorQ, RFC 6330) + +Pass --fec-k > 0 on BOTH peers to swap the byte-mode per-IP-packet framing +for a RaptorQ block code. Each block packs K source symbols' worth of +concatenated IP packets, then ships K + ceil(K*--fec-overhead) RaptorQ +encoded symbols. The receiver decodes once it has roughly K+ε of them, +regardless of which ones were dropped. Defaults to K=16, overhead=1.0, +which decodes reliably at ≤ ~40% loss. + + --fec-k 16 --fec-overhead 1.0 ≈ 50% airtime, recovers up to ~40% loss + --fec-k 16 --fec-overhead 0.5 ≈ 33% airtime, recovers up to ~25% loss + --fec-k 16 --fec-overhead 2.0 ≈ 67% airtime, recovers up to ~55% loss + +Latency floor: K source packets × per-packet airtime + --fec-flush-ms. +At the default --fec-flush-ms=50 the bridge waits at most 50 ms before +force-encoding a partial block, so a single ping per second still flies +within ≈ 50 ms of encode buffer + 1 ms × number of envelopes shipped. + +FEC mode forces dedup off (RaptorQ symbols self-dedup via SBN/ESI). The +periodic stderr report adds a `fec=[...]` segment with sym-tx/sym-rx/ +blk-ok/blk-lost counters so you can tell at a glance whether the channel +is healthy enough for the current overhead setting. """ from __future__ import annotations @@ -79,6 +102,17 @@ import stream # noqa: E402 +# stream_fec pulls in `raptorq` from PyPI. Defer the import so byte-mode +# users running on a python without the wheel installed don't get +# blindsided at startup. We only need the module when --fec-k > 0. +stream_fec = None # type: ignore[assignment] +def _import_stream_fec(): # noqa: E302 + global stream_fec + if stream_fec is None: + import stream_fec as _sf # noqa: F401 + stream_fec = _sf + return stream_fec + # Linux TUN/TAP constants. See . TUNSETIFF = 0x400454CA IFF_TUN = 0x0001 @@ -186,7 +220,17 @@ def __init__(self) -> None: def tx_thread(stop: StopBit, tun_fd: int, tx_stdin, body_bytes: int, shape: Optional[dict], seed: int, offset: int, entry_state: int, - repeat: int, counters: dict) -> None: + repeat: int, counters: dict, + fec_enc: "Optional[stream_fec.FecEncoder]" = None, + fec_lock: "Optional[threading.Lock]" = None) -> None: + """Read IP packets off the TUN and ship them through the stream link. + + In byte mode each packet is a single StreamFrame. In FEC mode each + packet is added to the FecEncoder (concatenation-packed into source + symbols, then RaptorQ-encoded when a K-symbol block fills); the + encoder returns a list of inner-envelope blobs, each of which becomes + its own StreamFrame. + """ seq = 0 try: while not stop.stop: @@ -199,11 +243,76 @@ def tx_thread(stop: StopBit, tun_fd: int, tx_stdin, body_bytes: int, return if not pkt: return - if len(pkt) > body_bytes - stream.ENVELOPE_LEN: - # Should never happen with --tun-mtu sized correctly. - counters["tun_oversize"] += 1 + + if fec_enc is not None: + # FEC mode: feed the packet to the encoder; ship whatever + # envelopes come back. Most adds return [] until the K-th + # source symbol triggers a block encode. + if len(pkt) > fec_enc.cfg.max_packet_size: + counters["tun_oversize"] += 1 + continue + assert fec_lock is not None + with fec_lock: + envelopes = fec_enc.add_packet(pkt) + counters["tx_pkts"] += 1 + counters["tx_bytes"] += len(pkt) + else: + # Byte mode: one StreamFrame per IP packet. + if len(pkt) > body_bytes - stream.ENVELOPE_LEN: + counters["tun_oversize"] += 1 + continue + envelopes = [pkt] + counters["tx_pkts"] += 1 + counters["tx_bytes"] += len(pkt) + + for env in envelopes: + frame = stream.StreamFrame(seq=seq, total=0, payload=env) + seq = (seq + 1) & 0xFFFF + body, _ = stream.encode_body( + frame, shape=shape, body_bytes=body_bytes, + seed=seed, offset=offset, entry_state=entry_state, + ) + chunk = struct.pack(" None: + """Periodically flush the FecEncoder's pending block. Without this, a + sparse traffic pattern (single ping per second) would never accumulate + K source packets and the link would go silent until traffic spiked. + + Maintains its own seq counter — in FEC mode the outer SeqWindow dedup + is off (RaptorQ symbols are inherently dedup-friendly via SBN+ESI), so + the tx_thread's and flush_thread's seq spaces don't need to share or + coordinate. + """ + import time as _time + interval = max(0.001, flush_ms / 1000.0) + seq = 0x8000 # arbitrary mid-range start; just a counter, not significant + while not stop.stop: + _time.sleep(interval) + if stop.stop: + return + with fec_lock: + if fec_enc.pending_packets == 0: continue - frame = stream.StreamFrame(seq=seq, total=0, payload=pkt) + envelopes = fec_enc.flush() + for env in envelopes: + frame = stream.StreamFrame(seq=seq, total=0, payload=env) seq = (seq + 1) & 0xFFFF body, _ = stream.encode_body( frame, shape=shape, body_bytes=body_bytes, @@ -216,10 +325,8 @@ def tx_thread(stop: StopBit, tun_fd: int, tx_stdin, body_bytes: int, tx_stdin.flush() except (BrokenPipeError, ValueError): return - counters["tx_pkts"] += 1 - counters["tx_bytes"] += len(pkt) - finally: - stop.stop = True + counters["fec_symbols_tx"] += 1 + counters["fec_flushes"] += 1 class SeqWindow: @@ -248,7 +355,20 @@ def seen_or_add(self, seq: int) -> bool: def rx_thread(stop: StopBit, rx_stdout, tun_fd: int, shape: Optional[dict], seed: int, offset: int, entry_state: int, - dedup: Optional[SeqWindow], counters: dict) -> None: + dedup: Optional[SeqWindow], counters: dict, + fec_dec: "Optional[stream_fec.FecDecoder]" = None, + fec_block_expire_ms: int = 500) -> None: + """Read `` lines off the WiFiDriverDemo, decode the + stream envelope, then either: + * byte mode: write the StreamFrame payload to the TUN as one IP packet, + * FEC mode: feed the payload to the FecDecoder; when a block decodes, + write each unpacked IP packet to the TUN. + + Block expiry runs on a cheap monotonic-clock check (once per second + is enough; we don't want to pay it per symbol). + """ + import time as _time + last_expire = _time.monotonic() try: for line in rx_stdout: if stop.stop: @@ -267,17 +387,39 @@ def rx_thread(stop: StopBit, rx_stdout, tun_fd: int, counters["malformed"] += 1 continue if dedup is not None and dedup.seen_or_add(frame.seq): - # Duplicate from --repeat fan-out (or a real radio repeat). - # Drop before it ever touches the IP stack. counters["dedup_dropped"] += 1 continue - try: - os.write(tun_fd, frame.payload) - except OSError as e: - sys.stderr.write(f"tun write: {e}\n") - return - counters["rx_pkts"] += 1 - counters["rx_bytes"] += len(frame.payload) + + if fec_dec is not None: + ip_pkts = fec_dec.add_symbol(frame.payload) + counters["fec_symbols_rx"] += 1 + for pkt in ip_pkts: + try: + os.write(tun_fd, pkt) + except OSError as e: + sys.stderr.write(f"tun write: {e}\n") + return + counters["rx_pkts"] += 1 + counters["rx_bytes"] += len(pkt) + # Periodic expiry — cheap; once a second is plenty. + now = _time.monotonic() + if now - last_expire >= 1.0: + last_expire = now + fec_dec.expire_blocks_older_than( + fec_block_expire_ms / 1000.0) + # Bridge counter mirrors the decoder's own counter so + # the periodic stderr report sees it. + counters["fec_blocks_unrecoverable"] = ( + fec_dec.blocks_unrecoverable) + counters["fec_blocks_decoded"] = fec_dec.blocks_decoded + else: + try: + os.write(tun_fd, frame.payload) + except OSError as e: + sys.stderr.write(f"tun write: {e}\n") + return + counters["rx_pkts"] += 1 + counters["rx_bytes"] += len(frame.payload) finally: stop.stop = True @@ -353,6 +495,32 @@ def main(argv: Optional[list[str]] = None) -> int: ap.add_argument("--entry-state", type=lambda s: int(s, 0), default=0) ap.add_argument("--report-interval", type=float, default=5.0, help="seconds between stderr counter prints; 0 = silent") + + # FEC mode (RaptorQ, RFC 6330). When --fec-k > 0, packets are + # concatenation-packed into K source symbols per block, RaptorQ-encoded + # into K + ceil(K*overhead) output symbols, and shipped through the + # existing stream framing. RX dedup is forced off — RaptorQ symbols are + # inherently dedup-friendly via SBN+ESI and the codec just discards any + # excess once the block has decoded. + ap.add_argument("--fec-k", type=int, default=0, + help="source symbols per RaptorQ block; 0 = FEC off " + "(default). Sensible range 8-64; latency floor " + "scales with K.") + ap.add_argument("--fec-overhead", type=float, default=1.0, + help="repair-to-source ratio (default 1.0 = 50%% airtime " + "overhead, decodes reliably at ~40%% loss). Raise " + "for noisier links, lower for clean ones.") + ap.add_argument("--fec-symbol-size", type=int, default=1477, + help="bytes per RaptorQ symbol (default 1477, fits one " + "1500 B stream body with the 9 B FEC + 4 B RaptorQ " + "headers). Must be the same on both peers.") + ap.add_argument("--fec-flush-ms", type=int, default=50, + help="ms before a partial block is force-encoded (default " + "50). At sparse traffic this bounds the encoder-side " + "latency floor.") + ap.add_argument("--fec-block-expire-ms", type=int, default=500, + help="ms before the RX side gives up on a block whose " + "first symbol arrived this long ago (default 500).") args = ap.parse_args(argv) do_tx = args.mode in ("duplex", "duplex-split", "tx-only") @@ -394,14 +562,60 @@ def main(argv: Optional[list[str]] = None) -> int: f"shape={'on' if shape else 'off'}\n" ) + # FEC config / sanity. When FEC is on, force-off the byte-mode dedup + # (RaptorQ has its own block-level dedup via SBN/ESI) and cap the + # tun MTU at the symbol size minus the per-packet length prefix so a + # max-size packet still fits in one source symbol. + fec_cfg = None + if args.fec_k > 0: + _import_stream_fec() + fec_cfg = stream_fec.FecConfig( + k=args.fec_k, + symbol_size=args.fec_symbol_size, + overhead=args.fec_overhead, + ) + # body must hold the stream envelope (10 B) + inner FEC header + # (9 B) + the raptorq packet (≤ symbol_size B; cberner's lib caps + # packet bytes at the configured `mtu`). + min_body = (fec_cfg.symbol_size + + stream_fec.FEC_HEADER_LEN + + stream.ENVELOPE_LEN) + if args.body_bytes < min_body: + sys.stderr.write( + f"tun_p2p: --body-bytes {args.body_bytes} too small for FEC " + f"symbol_size {fec_cfg.symbol_size}; need at least " + f"~{min_body}. Increase --body-bytes or lower " + f"--fec-symbol-size.\n" + ) + return 2 + fec_tun_cap = fec_cfg.max_packet_size + if tun_mtu > fec_tun_cap: + sys.stderr.write( + f"tun_p2p: clamping --tun-mtu {tun_mtu} → {fec_tun_cap} " + f"(FEC max_packet_size with symbol={fec_cfg.symbol_size})\n" + ) + tun_mtu = fec_tun_cap + if args.dedup: + sys.stderr.write( + "tun_p2p: FEC on → forcing --no-dedup (RaptorQ symbols dedup " + "themselves; the outer SeqWindow would just waste cycles)\n" + ) + args.dedup = False + tun_fd = open_tun(args.tun_name, tun_mtu, args.tun_addr) stop = StopBit() counters = { "tx_pkts": 0, "tx_bytes": 0, "tun_oversize": 0, "rx_pkts": 0, "rx_bytes": 0, "malformed": 0, "rate_mismatch": 0, "dedup_dropped": 0, + # FEC counters; stay at 0 in byte mode. + "fec_symbols_tx": 0, "fec_symbols_rx": 0, "fec_flushes": 0, + "fec_blocks_decoded": 0, "fec_blocks_unrecoverable": 0, } dedup = SeqWindow(args.dedup_window) if args.dedup else None + fec_enc = stream_fec.FecEncoder(fec_cfg) if fec_cfg else None + fec_dec = stream_fec.FecDecoder(fec_cfg) if fec_cfg else None + fec_lock = threading.Lock() if fec_enc else None if single_binary: duplex_proc = launch_duplex(args) @@ -424,16 +638,26 @@ def main(argv: Optional[list[str]] = None) -> int: target=tx_thread, daemon=True, args=(stop, tun_fd, tx_proc.stdin, args.body_bytes, shape, args.seed, args.offset, args.entry_state, args.repeat, - counters), + counters, fec_enc, fec_lock), ) t.start() threads.append(t) + if fec_enc is not None: + t = threading.Thread( + target=fec_flush_thread, daemon=True, + args=(stop, tx_proc.stdin, args.body_bytes, shape, + args.seed, args.offset, args.entry_state, args.repeat, + counters, fec_enc, fec_lock, args.fec_flush_ms), + ) + t.start() + threads.append(t) if do_rx: assert rx_stdout is not None t = threading.Thread( target=rx_thread, daemon=True, args=(stop, rx_stdout, tun_fd, shape, - args.seed, args.offset, args.entry_state, dedup, counters), + args.seed, args.offset, args.entry_state, dedup, counters, + fec_dec, args.fec_block_expire_ms), ) t.start() threads.append(t) @@ -450,6 +674,15 @@ def shutdown(*_): time.sleep(0.5) if args.report_interval and (time.monotonic() - last_print) >= args.report_interval: last_print = time.monotonic() + fec_str = "" + if fec_cfg is not None: + fec_str = ( + f" fec=[K={fec_cfg.k} sym-tx={counters['fec_symbols_tx']} " + f"sym-rx={counters['fec_symbols_rx']} " + f"blk-ok={counters['fec_blocks_decoded']} " + f"blk-lost={counters['fec_blocks_unrecoverable']} " + f"flush={counters['fec_flushes']}]" + ) sys.stderr.write( f"tun_p2p: tx={counters['tx_pkts']}pkt/" f"{counters['tx_bytes']}B " @@ -457,7 +690,8 @@ def shutdown(*_): f"dedup-drop={counters['dedup_dropped']} " f"mal={counters['malformed']} " f"rate-mismatch={counters['rate_mismatch']} " - f"tun-oversize={counters['tun_oversize']}\n" + f"tun-oversize={counters['tun_oversize']}" + f"{fec_str}\n" ) finally: stop.stop = True diff --git a/tools/precoder/uv.lock b/tools/precoder/uv.lock index 03e015a..e6f122e 100644 --- a/tools/precoder/uv.lock +++ b/tools/precoder/uv.lock @@ -24,6 +24,7 @@ dependencies = [ { name = "numpy", version = "2.0.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.10.*'" }, { name = "numpy", version = "2.4.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "raptorq" }, ] [package.optional-dependencies] @@ -41,6 +42,7 @@ dev = [ requires-dist = [ { name = "numpy", specifier = ">=1.23" }, { name = "pyrtlsdr", marker = "extra == 'sdr'", specifier = ">=0.2.93" }, + { name = "raptorq", specifier = ">=2" }, ] provides-extras = ["sdr"] @@ -365,6 +367,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, ] +[[package]] +name = "raptorq" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f4/5e/09c4d73b65c34868d24f02ddabcf4f13618982b00bcb12bc8da42e90ffeb/raptorq-2.0.0.tar.gz", hash = "sha256:89cc189525973bc4ff6495b4c3d28f0c1fd629b5c11dec4d3f21883ac8e011d3", size = 87500, upload-time = "2024-03-16T00:58:28.787Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/06/f4/85d0e976816b9e2aac6f35552690801e848a232453adb64d517c35169adb/raptorq-2.0.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a7cc9ffb024969b3cb33ae4f3789431fae0e225b19ae8bad85bb67703e564532", size = 374706, upload-time = "2024-03-16T00:58:26.626Z" }, +] + [[package]] name = "tomli" version = "2.4.1"