reticiulum-specification/tools/verify_rnode_split.py
Rob abf66b9cef Add four more verifiers + receive-propagated flow + frontmatter version
Verifiers:
  tools/verify_proof_packet.py — locks in §6.5. Toggles
    Reticulum.__use_implicit_proof to test both modes; confirms
    Identity.prove emits 64B (implicit) or 96B (explicit) proof
    body; PacketReceipt.validate_proof accepts both lengths and
    rejects an 80B body.
  tools/verify_link_handshake.py — locks in §6.1, §6.2, §6.3, §6.6.
    Most importantly verifies the previously-corrected §6.2 LRPROOF
    body order (signature(64) || responder_X25519_pub(32) ||
    [signalling]) and §6.3 link_id offsets (N=2 for HEADER_1) by
    actually building a Link initiator-side, capturing the
    LINKREQUEST raw bytes, computing link_id by the spec recipe,
    running validate_request inline (since the upstream wrapper
    swallows exceptions), and confirming the responder's LRPROOF
    bytes match the spec layout. This was the single most
    interop-critical correction we made.
  tools/verify_rnode_split.py — locks in §8.3. Pure-function
    re-implementation of the canonical TX and RX state machines
    from RNode_Firmware.ino:359-446 + 716-742; tests header-byte
    layout, single-frame TX, split-frame TX (300B → 254+46 with
    shared header byte), all four RX state-machine cases (a/b/c/d
    from the spec table), and end-to-end TX/RX round-trip at
    sizes 50, 254, 255, 300, 508.
  tools/verify_msgpack_quirk.py — locks in §9.3. Confirms umsgpack
    distinguishes str (fixstr/0xa5) from bytes (bin8/0xc4); confirms
    LXMF.display_name_from_app_data parses bytes-encoded display
    names correctly and silently returns None (not crash) on
    str-encoded ones, matching the bug-tolerance documented in §9.3.

All 11 verifiers pass against RNS 1.2.0 / LXMF 0.9.6.

Plus:
  - SPEC.md frontmatter: 'Last verified against' line per agent.md §7.
  - flows/receive-propagated-lxmf.md: closing half of the propagated
    LXMF lifecycle. /get listing query, fetch query, ack-and-purge
    via the have_ids slot, message-bundle unpack and dispatch
    through lxmf_delivery.
  - tools/README.md status table refreshed; flows/README.md flips
    receive-propagated-lxmf.md to .

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 12:54:34 -04:00

277 lines
9.7 KiB
Python

"""
Verifier for SPEC.md S8.3 (RNode air-frame split-packet protocol).
Pure-function verifier — no Reticulum runtime needed because S8.3 is
a LoRa air-frame protocol that lives between RNodes, not on the host
KISS channel. We re-implement the canonical TX and RX state machines
in Python from the upstream RNode_Firmware source and exercise them.
Scenarios:
1. Header-byte layout: bit 7..4 random seq nibble, bit 0 FLAG_SPLIT,
bits 3..1 reserved zero. Verified via mask checks against the
constants from RNode_Firmware/Framing.h.
2. TX side, payload <= 254 bytes: emits one frame with FLAG_SPLIT=0,
header || payload, and the seq nibble is randomized per fresh TX.
3. TX side, payload > 254 bytes: emits two frames sharing the same
header byte (same seq nibble + FLAG_SPLIT=1), split at exactly
254 bytes of payload in the first frame and the remainder in the
second.
4. RX state machine, four cases of inbound frames per the table at
S8.3:
a. SPLIT first half → buffer
b. SPLIT second half matching seq → reassemble
c. SPLIT seq mismatch → replace buffer
d. non-SPLIT after first half buffered → discard buffer, deliver
5. Wire-byte equivalence: TX a 300-byte payload, run the resulting
frames through the RX state machine, confirm reassembled payload
bytes match the original.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import os
import sys
# ---- Constants from RNode_Firmware/Framing.h:105-108 + Config.h:59-61 ----
NIBBLE_SEQ = 0xF0
NIBBLE_FLAGS = 0x0F
FLAG_SPLIT = 0x01
SEQ_UNSET = 0xFF
MTU = 508 # max reassembled Reticulum packet payload
SINGLE_MTU = 255 # max LoRa frame size (header + payload)
HEADER_L = 1
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
# ---- TX side (mirrors RNode_Firmware.ino:716-742) ----
def tx_frames(payload, seq_nibble=None):
"""Return a list of LoRa frames (header + payload bytes) for a
given Reticulum packet payload, mirroring the RNode firmware's
transmit() function.
seq_nibble: optional override of the random sequence nibble for
deterministic testing. Must be in 0..15.
"""
if seq_nibble is None:
seq_nibble = os.urandom(1)[0] >> 4 # any 0..15 will do
if not 0 <= seq_nibble <= 15:
raise ValueError("seq_nibble must be 0..15")
if len(payload) > MTU:
raise ValueError(f"payload too large: {len(payload)} > {MTU}")
header = (seq_nibble << 4) & NIBBLE_SEQ # high nibble = seq, low = 0
if len(payload) > SINGLE_MTU - HEADER_L:
header |= FLAG_SPLIT
first_payload = payload[:SINGLE_MTU - HEADER_L]
second_payload = payload[SINGLE_MTU - HEADER_L:]
return [bytes([header]) + first_payload,
bytes([header]) + second_payload]
else:
return [bytes([header]) + payload]
# ---- RX side (mirrors RNode_Firmware.ino:359-446 receive_callback) ----
class RxStateMachine:
"""Re-implementation of the upstream RX reassembly logic. Calling
.deliver(frame) returns the reassembled Reticulum packet bytes if
a complete packet is now available, or None if the call merely
buffered a first-half / replaced state / processed a duplicate.
"""
def __init__(self):
self.seq = SEQ_UNSET
self.buf = b""
def deliver(self, frame):
if len(frame) < 1:
return None
header = frame[0]
sequence = (header & NIBBLE_SEQ) >> 4
is_split = (header & FLAG_SPLIT) != 0
payload = frame[1:]
if is_split and self.seq == SEQ_UNSET:
# Case a: first half — buffer
self.buf = payload
self.seq = sequence
return None
elif is_split and sequence == self.seq:
# Case b: second half matching seq — reassemble
assembled = self.buf + payload
self.buf = b""
self.seq = SEQ_UNSET
return assembled
elif is_split and sequence != self.seq:
# Case c: seq mismatch — replace with this as new first half
self.buf = payload
self.seq = sequence
return None
elif not is_split:
# Case d: non-split — clear any buffered first half, deliver
if self.seq != SEQ_UNSET:
self.buf = b""
self.seq = SEQ_UNSET
return payload
return None
# ---- Tests ----
def verify_header_layout():
# NIBBLE_SEQ has only the high nibble set
if NIBBLE_SEQ != 0xF0:
fail(f"NIBBLE_SEQ != 0xF0 (got {NIBBLE_SEQ:#x})")
# NIBBLE_FLAGS has only the low nibble set
if NIBBLE_FLAGS != 0x0F:
fail(f"NIBBLE_FLAGS != 0x0F (got {NIBBLE_FLAGS:#x})")
# FLAG_SPLIT is the low bit of the low nibble
if FLAG_SPLIT != 0x01:
fail(f"FLAG_SPLIT != 0x01 (got {FLAG_SPLIT:#x})")
# SEQ_UNSET is the all-ones sentinel
if SEQ_UNSET != 0xFF:
fail(f"SEQ_UNSET != 0xFF (got {SEQ_UNSET:#x})")
print("PASS S8.3 header constants: NIBBLE_SEQ=0xF0, FLAG_SPLIT=0x01, SEQ_UNSET=0xFF")
def verify_tx_single_frame():
# 100-byte payload — fits in one frame
payload = bytes(range(100))
frames = tx_frames(payload, seq_nibble=0x7)
if len(frames) != 1:
fail(f"S8.3 single-frame TX produced {len(frames)} frames, want 1")
f = frames[0]
if len(f) != 1 + 100:
fail(f"S8.3 single-frame size = {len(f)}, want 101")
header = f[0]
if (header & NIBBLE_SEQ) >> 4 != 0x7:
fail(f"S8.3 seq nibble lost: header = {header:#x}")
if header & FLAG_SPLIT:
fail(f"S8.3 single-frame TX set FLAG_SPLIT (header = {header:#x})")
if f[1:] != payload:
fail("S8.3 single-frame payload mangled")
print(f"PASS S8.3 TX single-frame (100B payload, seq=0x7, header={header:#04x})")
def verify_tx_split_frames():
# 300-byte payload — splits into 254 + 46
payload = bytes(i & 0xFF for i in range(300))
frames = tx_frames(payload, seq_nibble=0xA)
if len(frames) != 2:
fail(f"S8.3 split TX produced {len(frames)} frames, want 2")
h1, h2 = frames[0][0], frames[1][0]
if h1 != h2:
fail(f"S8.3 split frames have different headers: {h1:#04x} vs {h2:#04x}")
if not (h1 & FLAG_SPLIT):
fail(f"S8.3 split frames did not set FLAG_SPLIT (header={h1:#04x})")
if (h1 & NIBBLE_SEQ) >> 4 != 0xA:
fail(f"S8.3 split frame seq nibble lost: header={h1:#04x}")
if len(frames[0]) != 1 + 254:
fail(f"S8.3 split frame 1 size = {len(frames[0])}, want 255")
if len(frames[1]) != 1 + (300 - 254):
fail(f"S8.3 split frame 2 size = {len(frames[1])}, want {1 + 300 - 254}")
if frames[0][1:] != payload[:254]:
fail("S8.3 split frame 1 payload mismatch")
if frames[1][1:] != payload[254:]:
fail("S8.3 split frame 2 payload mismatch")
print(f"PASS S8.3 TX split frames (300B payload, 254+46 split, "
f"shared header={h1:#04x})")
def verify_rx_state_machine():
rx = RxStateMachine()
# Case d: non-split arrives first → deliver immediately
out = rx.deliver(bytes([0x30]) + b"non-split-1")
if out != b"non-split-1":
fail(f"S8.3 RX case d (non-split fresh) failed: got {out!r}")
# Case a: split first half → buffer
out = rx.deliver(bytes([0x51]) + b"first")
if out is not None:
fail(f"S8.3 RX case a (first half) returned non-None: {out!r}")
if rx.seq != 0x5:
fail(f"S8.3 RX case a did not buffer seq=5, got {rx.seq}")
# Case b: split second half with matching seq → reassemble
out = rx.deliver(bytes([0x51]) + b"second")
if out != b"first" + b"second":
fail(f"S8.3 RX case b (second half match) failed: got {out!r}")
if rx.seq != SEQ_UNSET:
fail(f"S8.3 RX state didn't reset after reassembly: seq={rx.seq}")
# Case a again, then case c: seq mismatch replaces buffer
rx.deliver(bytes([0x51]) + b"AAAA")
rx.deliver(bytes([0x71]) + b"BBBB")
if rx.seq != 0x7 or rx.buf != b"BBBB":
fail(f"S8.3 RX case c (seq mismatch) state wrong: seq={rx.seq}, buf={rx.buf!r}")
# Case d while a first-half is buffered → discard buffer, deliver non-split
rx.deliver(bytes([0x91]) + b"discardme") # buffers seq=9
out = rx.deliver(bytes([0x30]) + b"non-split-2")
if out != b"non-split-2":
fail(f"S8.3 RX case d (non-split with stale buffer) failed: got {out!r}")
if rx.seq != SEQ_UNSET:
fail(f"S8.3 RX case d did not discard stale buffer: seq={rx.seq}")
print("PASS S8.3 RX state machine: 4 cases (a/b/c/d) all correct")
def verify_tx_rx_roundtrip():
"""End-to-end: TX a payload, feed the resulting frames through
the RX state machine, confirm reassembled payload matches."""
for size in [50, 254, 255, 300, 508]:
original = bytes(i & 0xFF for i in range(size))
frames = tx_frames(original, seq_nibble=0x3)
rx = RxStateMachine()
out = None
for f in frames:
res = rx.deliver(f)
if res is not None:
out = res
if out != original:
fail(f"S8.3 TX/RX round-trip mismatch at size {size}:\n"
f" in: {original[:30]!r}... ({len(original)}B)\n"
f" out: {out[:30] if out else None!r}... ({len(out) if out else 0}B)")
print("PASS S8.3 TX/RX round-trip at sizes [50, 254, 255, 300, 508]")
def main():
print("verify_rnode_split.py — pure-function verifier (no RNS runtime needed)")
verify_header_layout()
verify_tx_single_frame()
verify_tx_split_frames()
verify_rx_state_machine()
verify_tx_rx_roundtrip()
print("ALL PASS")
if __name__ == "__main__":
main()