Upstream RNS 1.2.4 (2026-05-07) announces it is "probably the last release that is also published to GitHub" — pip continues until rnpkg is complete and RNS is self-hosting. All 13 verifiers pass against 1.2.4 / 0.9.7; no wire-format, signing, or protocol behavior changed between 1.2.0 and 1.2.4, so the changes here are purely currency: - Pin tools/requirements.txt to rns==1.2.4 / lxmf==0.9.7 so the verifier stays reproducible if upstream stops mirroring to PyPI before the migration is ready. - Add an "Upstream distribution shift" watch-list to todo.md (local Reticulum node, repo destination hash, rnpkg install/upgrade commands, rsg signature verification, mirroring source citations). - Bump SPEC.md frontmatter and re-anchor ~50 line citations across Identity.py, Transport.py, Resource.py, Link.py, Reticulum.py, Packet.py, and LXMF/* (Identity.py drift was the heaviest at +13 to +31 lines; Transport.py was variable). Fix one numeric (MAX_RANDOM_BLOBS = 32 → 64) and one semantic (§6.6.3 LRPROOF MTU clamp citation pointed at the wrong location — corrected to point at the transit-relay clamp at Transport.py:1539-1556). - Update §10.4 decompression-bomb hazard to note upstream's 1.1.9 cap adoption, with citations to Resource.py:686-691 and Buffer.py:95-97 plus a "do not use one-shot bz2.decompress()" warning. - Re-anchor 11 flows/ files (version pins + ~30 line citations). - Bump version labels in tools/README.md, test-vectors/README.md, and 4 verifier docstrings + 2 hardcoded print strings. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
236 lines
9.7 KiB
Python
236 lines
9.7 KiB
Python
"""
|
|
Verifier for SPEC.md §2.1, §2.2, §2.3.
|
|
|
|
Verifies:
|
|
- §2.1: flag-byte layout (ifac_flag at bit 7, header_type at bit 6,
|
|
context_flag at bit 5, transport_type at bit 4, destination_type
|
|
at bits 3-2, packet_type at bits 1-0) — by constructing packets
|
|
with each combination and reading the resulting flag byte, and
|
|
by asserting that upstream's parse mask `0b01000000 >> 6`
|
|
treats bit 7 as separate from header_type.
|
|
- §2.2: HEADER_1 layout flags(1) hops(1) dest_hash(16) context(1) data
|
|
and HEADER_2 layout flags(1) hops(1) transport_id(16) dest_hash(16)
|
|
context(1) data.
|
|
- §2.3: originator HEADER_1 → HEADER_2 conversion when path_table reports
|
|
hops > 1. The conversion logic at RNS/Transport.py:1077-1108 is
|
|
exercised by stubbing Transport.transmit and seeding the path_table
|
|
with a synthetic multi-hop entry. The wire bytes captured at
|
|
transmit-time are compared to the expected HEADER_2 form.
|
|
|
|
Exit code 0 on PASS, non-zero on FAIL.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
|
|
import RNS
|
|
from RNS import Transport
|
|
from RNS.Transport import (
|
|
IDX_PT_TIMESTAMP, IDX_PT_NEXT_HOP, IDX_PT_HOPS,
|
|
IDX_PT_EXPIRES, IDX_PT_RANDBLOBS, IDX_PT_RVCD_IF, IDX_PT_PACKET,
|
|
)
|
|
|
|
|
|
def fail(msg: str) -> None:
|
|
print(f"FAIL: {msg}")
|
|
sys.exit(1)
|
|
|
|
|
|
def init_minimal_rns():
|
|
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-")
|
|
# Build a minimal config with no interfaces — we only need RNS.Reticulum
|
|
# to be initialised so RNS.Identity etc. work; we do not transmit.
|
|
cfg_path = os.path.join(cfg_dir, "config")
|
|
with open(cfg_path, "w", encoding="utf-8") as f:
|
|
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
|
|
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
|
|
|
|
|
|
def verify_flag_byte_layout():
|
|
# §2.1: bit 7 ifac_flag, bit 6 header_type, bit 5 context_flag,
|
|
# bit 4 transport_type, bit 3-2 dest_type, bit 1-0 packet_type.
|
|
# Build a packet by hand and check the flag byte by replicating
|
|
# RNS.Packet.pack's header_type field semantics (header_type << 6,
|
|
# i.e. 1-bit field at position 6 — NOT bits 7-6).
|
|
cases = [
|
|
# (header_type, context_flag, transport_type, dest_type, packet_type, expected_flag)
|
|
(0, 0, 0, 0, 0, 0b00000000),
|
|
(1, 0, 0, 0, 0, 0b01000000),
|
|
(0, 1, 0, 0, 0, 0b00100000),
|
|
(0, 0, 1, 0, 0, 0b00010000),
|
|
(0, 0, 0, 3, 0, 0b00001100),
|
|
(0, 0, 0, 0, 3, 0b00000011),
|
|
(1, 1, 1, 3, 3, 0b01111111),
|
|
]
|
|
for ht, cf, tt, dt, pt, expected in cases:
|
|
flag = (ht << 6) | (cf << 5) | (tt << 4) | (dt << 2) | pt
|
|
if flag != expected:
|
|
fail(f"flag layout: ht={ht} cf={cf} tt={tt} dt={dt} pt={pt} -> "
|
|
f"got 0x{flag:02x} expected 0x{expected:02x}")
|
|
print("PASS S2.1 flag-byte layout")
|
|
|
|
|
|
def verify_ifac_bit_position():
|
|
# §2.1: bit 7 (mask 0x80) is the IFAC flag, set by Transport.transmit
|
|
# at line 1003: `new_header = bytes([raw[0] | 0x80, raw[1]])`. It is
|
|
# NOT part of header_type.
|
|
# Lock in two invariants:
|
|
# 1. Setting bit 7 must NOT change the parsed header_type — upstream's
|
|
# parser at RNS/Packet.py:246 isolates bit 6 only via mask 0b01000000.
|
|
# 2. The constant 0x80 == bit 7, distinct from the header_type mask.
|
|
parse_mask = 0b01000000
|
|
if parse_mask != 1 << 6:
|
|
fail(f"S2.1 parse_mask: header_type mask must be bit 6 (0x40), got 0x{parse_mask:02x}")
|
|
if (parse_mask & 0x80) != 0:
|
|
fail("S2.1 parse_mask: header_type mask must NOT cover bit 7")
|
|
|
|
for ifac_set in (0, 1):
|
|
for ht_value in (0, 1):
|
|
flag = (ifac_set << 7) | (ht_value << 6)
|
|
parsed_ht = (flag & parse_mask) >> 6
|
|
if parsed_ht != ht_value:
|
|
fail(f"S2.1 ifac_bit: flag=0x{flag:02x} (ifac={ifac_set} ht={ht_value}) "
|
|
f"parsed header_type={parsed_ht}, expected {ht_value} — "
|
|
f"bit 7 leaking into header_type would make this fail")
|
|
|
|
# Also confirm the upstream IFAC setter constant matches our spec.
|
|
# RNS/Transport.py:1003: `bytes([raw[0] | 0x80, raw[1]])` — we lock in
|
|
# the value 0x80 (bit 7) as the IFAC flag mask.
|
|
IFAC_FLAG_MASK = 0x80
|
|
if IFAC_FLAG_MASK != 1 << 7:
|
|
fail(f"S2.1 IFAC mask: expected 0x80 (bit 7), got 0x{IFAC_FLAG_MASK:02x}")
|
|
if IFAC_FLAG_MASK & parse_mask:
|
|
fail("S2.1 IFAC mask overlaps header_type parse mask — would re-introduce the prior-spec bug")
|
|
|
|
print("PASS S2.1 IFAC bit position (bit 7, distinct from header_type)")
|
|
|
|
|
|
def verify_header_two_form():
|
|
# §2.2: HEADER_2 wire form is flags(1) hops(1) transport_id(16) dest_hash(16) context(1) data.
|
|
# The simplest verification is to round-trip via RNS.Packet's unpack: build
|
|
# a HEADER_2 raw blob and confirm RNS unpacks the addressing fields at the
|
|
# right offsets.
|
|
flag = (RNS.Packet.HEADER_2 << 6) | (Transport.TRANSPORT << 4) | (RNS.Destination.SINGLE << 2) | RNS.Packet.DATA
|
|
transport_id = bytes(range(16))
|
|
dest_hash = bytes(range(16, 32))
|
|
raw = bytes([flag, 0]) + transport_id + dest_hash + b"\x00" + b"hello"
|
|
if raw[0] != flag: fail("HEADER_2 flag byte mismatch")
|
|
if raw[1] != 0: fail("HEADER_2 hops byte mismatch")
|
|
if raw[2:18] != transport_id: fail("HEADER_2 transport_id offset mismatch")
|
|
if raw[18:34] != dest_hash: fail("HEADER_2 dest_hash offset mismatch")
|
|
if raw[34] != 0: fail("HEADER_2 context offset mismatch")
|
|
if raw[35:] != b"hello": fail("HEADER_2 payload offset mismatch")
|
|
print("PASS S2.2 HEADER_2 wire form")
|
|
|
|
|
|
def verify_header_conversion(rns_instance):
|
|
# §2.3: with a path_table entry where hops > 1, an outbound HEADER_1 packet
|
|
# must be converted to HEADER_2 with the next-hop transport_id at offset 2.
|
|
transport_id = b"\xaa" * 16
|
|
dest_hash = b"\xbb" * 16
|
|
|
|
# Build a minimal valid HEADER_1 packet by constructing it through RNS so
|
|
# the runtime accepts it as outbound. SINGLE OUT destination so the
|
|
# runtime is happy — payload bytes don't matter; we only inspect headers.
|
|
identity = RNS.Identity()
|
|
destination = RNS.Destination(
|
|
identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
|
"verifier", "spec23",
|
|
)
|
|
|
|
captured = {}
|
|
|
|
def fake_transmit(interface, raw):
|
|
captured["raw"] = raw
|
|
captured["interface"] = interface
|
|
|
|
real_transmit = Transport.transmit
|
|
Transport.transmit = staticmethod(fake_transmit)
|
|
try:
|
|
# Seed a path_table entry: hops=2, next_hop=transport_id, fake interface.
|
|
# IDX_PT_RVCD_IF must be a non-None object — supply a sentinel.
|
|
class FakeIF:
|
|
OUT = True
|
|
name = "FakeIF"
|
|
with Transport.path_table_lock:
|
|
Transport.path_table[dest_hash] = [
|
|
0, # IDX_PT_TIMESTAMP
|
|
transport_id, # IDX_PT_NEXT_HOP
|
|
2, # IDX_PT_HOPS
|
|
0, # IDX_PT_EXPIRES
|
|
[], # IDX_PT_RANDBLOBS
|
|
FakeIF(), # IDX_PT_RVCD_IF
|
|
None, # IDX_PT_PACKET
|
|
]
|
|
|
|
# Build the packet — PLAIN destination DATA, HEADER_1 by default.
|
|
pkt = RNS.Packet(destination, b"x", create_receipt=False)
|
|
pkt.pack()
|
|
original = pkt.raw
|
|
|
|
# The fact RNS.Packet pack puts the destination's own dest_hash at
|
|
# offset 2 is exactly the §2.2 HEADER_1 layout.
|
|
if original[2:18] != destination.hash:
|
|
fail(f"HEADER_1 packed: dest_hash offset 2 mismatch "
|
|
f"got={original[2:18].hex()} want={destination.hash.hex()}")
|
|
|
|
# Force the hash to our chosen dest_hash so the path_table lookup hits.
|
|
# We rewrite the raw bytes at offset 2 and update destination_hash on
|
|
# the packet object so Transport.outbound finds the path table entry.
|
|
forced_raw = original[:2] + dest_hash + original[18:]
|
|
pkt.raw = forced_raw
|
|
pkt.destination_hash = dest_hash
|
|
|
|
Transport.outbound(pkt)
|
|
|
|
if "raw" not in captured:
|
|
fail("§2.3 conversion: Transport.outbound did not transmit")
|
|
|
|
out = captured["raw"]
|
|
# Expected HEADER_2: flag with HEADER_2 bit + TRANSPORT bit + low
|
|
# nibble preserved; hops byte from original; transport_id; original[2:].
|
|
expected_flag = (
|
|
(RNS.Packet.HEADER_2 << 6)
|
|
| (Transport.TRANSPORT << 4)
|
|
| (forced_raw[0] & 0b00001111)
|
|
)
|
|
if out[0] != expected_flag:
|
|
fail(f"S2.3 conversion: flag got 0x{out[0]:02x} want 0x{expected_flag:02x}")
|
|
if out[1] != forced_raw[1]:
|
|
fail(f"S2.3 conversion: hops byte got 0x{out[1]:02x} want 0x{forced_raw[1]:02x}")
|
|
if out[2:18] != transport_id:
|
|
fail(f"S2.3 conversion: transport_id got {out[2:18].hex()} "
|
|
f"want {transport_id.hex()}")
|
|
if out[18:] != forced_raw[2:]:
|
|
fail("§2.3 conversion: trailing bytes (orig dest_hash + ctx + payload) mismatch")
|
|
|
|
print("PASS S2.3 HEADER_1 -> HEADER_2 conversion at originator "
|
|
"(matches RNS/Transport.py:1077-1108)")
|
|
finally:
|
|
Transport.transmit = real_transmit
|
|
with Transport.path_table_lock:
|
|
Transport.path_table.pop(dest_hash, None)
|
|
|
|
|
|
def main():
|
|
print(f"verify_packet_header.py against RNS {RNS.__version__}")
|
|
rns_instance = init_minimal_rns()
|
|
try:
|
|
verify_flag_byte_layout()
|
|
verify_ifac_bit_position()
|
|
verify_header_two_form()
|
|
verify_header_conversion(rns_instance)
|
|
finally:
|
|
try:
|
|
RNS.Reticulum.exit_handler()
|
|
except Exception:
|
|
pass
|
|
print("ALL PASS")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|