reticiulum-specification/tools/verify_packet_header.py
Rob cfd0d8249b Re-anchor against RNS 1.2.4 / LXMF 0.9.7 + track upstream distribution shift
Upstream RNS 1.2.4 (2026-05-07) announces it is "probably the last
release that is also published to GitHub" — pip continues until rnpkg
is complete and RNS is self-hosting. All 13 verifiers pass against
1.2.4 / 0.9.7; no wire-format, signing, or protocol behavior changed
between 1.2.0 and 1.2.4, so the changes here are purely currency:

- Pin tools/requirements.txt to rns==1.2.4 / lxmf==0.9.7 so the
  verifier stays reproducible if upstream stops mirroring to PyPI
  before the migration is ready.
- Add an "Upstream distribution shift" watch-list to todo.md (local
  Reticulum node, repo destination hash, rnpkg install/upgrade
  commands, rsg signature verification, mirroring source citations).
- Bump SPEC.md frontmatter and re-anchor ~50 line citations across
  Identity.py, Transport.py, Resource.py, Link.py, Reticulum.py,
  Packet.py, and LXMF/* (Identity.py drift was the heaviest at +13
  to +31 lines; Transport.py was variable). Fix one numeric
  (MAX_RANDOM_BLOBS = 32 → 64) and one semantic (§6.6.3 LRPROOF MTU
  clamp citation pointed at the wrong location — corrected to point
  at the transit-relay clamp at Transport.py:1539-1556).
- Update §10.4 decompression-bomb hazard to note upstream's 1.1.9 cap
  adoption, with citations to Resource.py:686-691 and Buffer.py:95-97
  plus a "do not use one-shot bz2.decompress()" warning.
- Re-anchor 11 flows/ files (version pins + ~30 line citations).
- Bump version labels in tools/README.md, test-vectors/README.md, and
  4 verifier docstrings + 2 hardcoded print strings.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 07:42:25 -04:00

236 lines
9.7 KiB
Python

"""
Verifier for SPEC.md §2.1, §2.2, §2.3.
Verifies:
- §2.1: flag-byte layout (ifac_flag at bit 7, header_type at bit 6,
context_flag at bit 5, transport_type at bit 4, destination_type
at bits 3-2, packet_type at bits 1-0) — by constructing packets
with each combination and reading the resulting flag byte, and
by asserting that upstream's parse mask `0b01000000 >> 6`
treats bit 7 as separate from header_type.
- §2.2: HEADER_1 layout flags(1) hops(1) dest_hash(16) context(1) data
and HEADER_2 layout flags(1) hops(1) transport_id(16) dest_hash(16)
context(1) data.
- §2.3: originator HEADER_1 → HEADER_2 conversion when path_table reports
hops > 1. The conversion logic at RNS/Transport.py:1077-1108 is
exercised by stubbing Transport.transmit and seeding the path_table
with a synthetic multi-hop entry. The wire bytes captured at
transmit-time are compared to the expected HEADER_2 form.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import os
import struct
import sys
import tempfile
import RNS
from RNS import Transport
from RNS.Transport import (
IDX_PT_TIMESTAMP, IDX_PT_NEXT_HOP, IDX_PT_HOPS,
IDX_PT_EXPIRES, IDX_PT_RANDBLOBS, IDX_PT_RVCD_IF, IDX_PT_PACKET,
)
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-")
# Build a minimal config with no interfaces — we only need RNS.Reticulum
# to be initialised so RNS.Identity etc. work; we do not transmit.
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def verify_flag_byte_layout():
# §2.1: bit 7 ifac_flag, bit 6 header_type, bit 5 context_flag,
# bit 4 transport_type, bit 3-2 dest_type, bit 1-0 packet_type.
# Build a packet by hand and check the flag byte by replicating
# RNS.Packet.pack's header_type field semantics (header_type << 6,
# i.e. 1-bit field at position 6 — NOT bits 7-6).
cases = [
# (header_type, context_flag, transport_type, dest_type, packet_type, expected_flag)
(0, 0, 0, 0, 0, 0b00000000),
(1, 0, 0, 0, 0, 0b01000000),
(0, 1, 0, 0, 0, 0b00100000),
(0, 0, 1, 0, 0, 0b00010000),
(0, 0, 0, 3, 0, 0b00001100),
(0, 0, 0, 0, 3, 0b00000011),
(1, 1, 1, 3, 3, 0b01111111),
]
for ht, cf, tt, dt, pt, expected in cases:
flag = (ht << 6) | (cf << 5) | (tt << 4) | (dt << 2) | pt
if flag != expected:
fail(f"flag layout: ht={ht} cf={cf} tt={tt} dt={dt} pt={pt} -> "
f"got 0x{flag:02x} expected 0x{expected:02x}")
print("PASS S2.1 flag-byte layout")
def verify_ifac_bit_position():
# §2.1: bit 7 (mask 0x80) is the IFAC flag, set by Transport.transmit
# at line 1003: `new_header = bytes([raw[0] | 0x80, raw[1]])`. It is
# NOT part of header_type.
# Lock in two invariants:
# 1. Setting bit 7 must NOT change the parsed header_type — upstream's
# parser at RNS/Packet.py:246 isolates bit 6 only via mask 0b01000000.
# 2. The constant 0x80 == bit 7, distinct from the header_type mask.
parse_mask = 0b01000000
if parse_mask != 1 << 6:
fail(f"S2.1 parse_mask: header_type mask must be bit 6 (0x40), got 0x{parse_mask:02x}")
if (parse_mask & 0x80) != 0:
fail("S2.1 parse_mask: header_type mask must NOT cover bit 7")
for ifac_set in (0, 1):
for ht_value in (0, 1):
flag = (ifac_set << 7) | (ht_value << 6)
parsed_ht = (flag & parse_mask) >> 6
if parsed_ht != ht_value:
fail(f"S2.1 ifac_bit: flag=0x{flag:02x} (ifac={ifac_set} ht={ht_value}) "
f"parsed header_type={parsed_ht}, expected {ht_value}"
f"bit 7 leaking into header_type would make this fail")
# Also confirm the upstream IFAC setter constant matches our spec.
# RNS/Transport.py:1003: `bytes([raw[0] | 0x80, raw[1]])` — we lock in
# the value 0x80 (bit 7) as the IFAC flag mask.
IFAC_FLAG_MASK = 0x80
if IFAC_FLAG_MASK != 1 << 7:
fail(f"S2.1 IFAC mask: expected 0x80 (bit 7), got 0x{IFAC_FLAG_MASK:02x}")
if IFAC_FLAG_MASK & parse_mask:
fail("S2.1 IFAC mask overlaps header_type parse mask — would re-introduce the prior-spec bug")
print("PASS S2.1 IFAC bit position (bit 7, distinct from header_type)")
def verify_header_two_form():
# §2.2: HEADER_2 wire form is flags(1) hops(1) transport_id(16) dest_hash(16) context(1) data.
# The simplest verification is to round-trip via RNS.Packet's unpack: build
# a HEADER_2 raw blob and confirm RNS unpacks the addressing fields at the
# right offsets.
flag = (RNS.Packet.HEADER_2 << 6) | (Transport.TRANSPORT << 4) | (RNS.Destination.SINGLE << 2) | RNS.Packet.DATA
transport_id = bytes(range(16))
dest_hash = bytes(range(16, 32))
raw = bytes([flag, 0]) + transport_id + dest_hash + b"\x00" + b"hello"
if raw[0] != flag: fail("HEADER_2 flag byte mismatch")
if raw[1] != 0: fail("HEADER_2 hops byte mismatch")
if raw[2:18] != transport_id: fail("HEADER_2 transport_id offset mismatch")
if raw[18:34] != dest_hash: fail("HEADER_2 dest_hash offset mismatch")
if raw[34] != 0: fail("HEADER_2 context offset mismatch")
if raw[35:] != b"hello": fail("HEADER_2 payload offset mismatch")
print("PASS S2.2 HEADER_2 wire form")
def verify_header_conversion(rns_instance):
# §2.3: with a path_table entry where hops > 1, an outbound HEADER_1 packet
# must be converted to HEADER_2 with the next-hop transport_id at offset 2.
transport_id = b"\xaa" * 16
dest_hash = b"\xbb" * 16
# Build a minimal valid HEADER_1 packet by constructing it through RNS so
# the runtime accepts it as outbound. SINGLE OUT destination so the
# runtime is happy — payload bytes don't matter; we only inspect headers.
identity = RNS.Identity()
destination = RNS.Destination(
identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"verifier", "spec23",
)
captured = {}
def fake_transmit(interface, raw):
captured["raw"] = raw
captured["interface"] = interface
real_transmit = Transport.transmit
Transport.transmit = staticmethod(fake_transmit)
try:
# Seed a path_table entry: hops=2, next_hop=transport_id, fake interface.
# IDX_PT_RVCD_IF must be a non-None object — supply a sentinel.
class FakeIF:
OUT = True
name = "FakeIF"
with Transport.path_table_lock:
Transport.path_table[dest_hash] = [
0, # IDX_PT_TIMESTAMP
transport_id, # IDX_PT_NEXT_HOP
2, # IDX_PT_HOPS
0, # IDX_PT_EXPIRES
[], # IDX_PT_RANDBLOBS
FakeIF(), # IDX_PT_RVCD_IF
None, # IDX_PT_PACKET
]
# Build the packet — PLAIN destination DATA, HEADER_1 by default.
pkt = RNS.Packet(destination, b"x", create_receipt=False)
pkt.pack()
original = pkt.raw
# The fact RNS.Packet pack puts the destination's own dest_hash at
# offset 2 is exactly the §2.2 HEADER_1 layout.
if original[2:18] != destination.hash:
fail(f"HEADER_1 packed: dest_hash offset 2 mismatch "
f"got={original[2:18].hex()} want={destination.hash.hex()}")
# Force the hash to our chosen dest_hash so the path_table lookup hits.
# We rewrite the raw bytes at offset 2 and update destination_hash on
# the packet object so Transport.outbound finds the path table entry.
forced_raw = original[:2] + dest_hash + original[18:]
pkt.raw = forced_raw
pkt.destination_hash = dest_hash
Transport.outbound(pkt)
if "raw" not in captured:
fail("§2.3 conversion: Transport.outbound did not transmit")
out = captured["raw"]
# Expected HEADER_2: flag with HEADER_2 bit + TRANSPORT bit + low
# nibble preserved; hops byte from original; transport_id; original[2:].
expected_flag = (
(RNS.Packet.HEADER_2 << 6)
| (Transport.TRANSPORT << 4)
| (forced_raw[0] & 0b00001111)
)
if out[0] != expected_flag:
fail(f"S2.3 conversion: flag got 0x{out[0]:02x} want 0x{expected_flag:02x}")
if out[1] != forced_raw[1]:
fail(f"S2.3 conversion: hops byte got 0x{out[1]:02x} want 0x{forced_raw[1]:02x}")
if out[2:18] != transport_id:
fail(f"S2.3 conversion: transport_id got {out[2:18].hex()} "
f"want {transport_id.hex()}")
if out[18:] != forced_raw[2:]:
fail("§2.3 conversion: trailing bytes (orig dest_hash + ctx + payload) mismatch")
print("PASS S2.3 HEADER_1 -> HEADER_2 conversion at originator "
"(matches RNS/Transport.py:1077-1108)")
finally:
Transport.transmit = real_transmit
with Transport.path_table_lock:
Transport.path_table.pop(dest_hash, None)
def main():
print(f"verify_packet_header.py against RNS {RNS.__version__}")
rns_instance = init_minimal_rns()
try:
verify_flag_byte_layout()
verify_ifac_bit_position()
verify_header_two_form()
verify_header_conversion(rns_instance)
finally:
try:
RNS.Reticulum.exit_handler()
except Exception:
pass
print("ALL PASS")
if __name__ == "__main__":
main()