reticiulum-specification/tools/verify_packet_header.py

196 lines
7.7 KiB
Python
Raw Normal View History

"""
Verifier for SPEC.md §2.1, §2.2, §2.3.
Verifies:
- §2.1: flag-byte layout (header_type, context_flag, transport_type,
destination_type, packet_type) by constructing packets with each
combination and reading the resulting flag byte.
- §2.2: HEADER_1 layout flags(1) hops(1) dest_hash(16) context(1) data
and HEADER_2 layout flags(1) hops(1) transport_id(16) dest_hash(16)
context(1) data.
- §2.3: originator HEADER_1 HEADER_2 conversion when path_table reports
hops > 1. The conversion logic at RNS/Transport.py:1074-1083 is
exercised by stubbing Transport.transmit and seeding the path_table
with a synthetic multi-hop entry. The wire bytes captured at
transmit-time are compared to the expected HEADER_2 form.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import os
import struct
import sys
import tempfile
import RNS
from RNS import Transport
from RNS.Transport import (
IDX_PT_TIMESTAMP, IDX_PT_NEXT_HOP, IDX_PT_HOPS,
IDX_PT_EXPIRES, IDX_PT_RANDBLOBS, IDX_PT_RVCD_IF, IDX_PT_PACKET,
)
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-")
# Build a minimal config with no interfaces — we only need RNS.Reticulum
# to be initialised so RNS.Identity etc. work; we do not transmit.
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def verify_flag_byte_layout():
# §2.1: bit 7-6 header_type, bit 5 context_flag, bit 4 transport_type,
# bit 3-2 dest_type, bit 1-0 packet_type.
# Build a packet by hand and check the flag byte by replicating
# RNS.Packet.pack's header_type field semantics.
cases = [
# (header_type, context_flag, transport_type, dest_type, packet_type, expected_flag)
(0, 0, 0, 0, 0, 0b00000000),
(1, 0, 0, 0, 0, 0b01000000),
(0, 1, 0, 0, 0, 0b00100000),
(0, 0, 1, 0, 0, 0b00010000),
(0, 0, 0, 3, 0, 0b00001100),
(0, 0, 0, 0, 3, 0b00000011),
(1, 1, 1, 3, 3, 0b01111111),
]
for ht, cf, tt, dt, pt, expected in cases:
flag = (ht << 6) | (cf << 5) | (tt << 4) | (dt << 2) | pt
if flag != expected:
fail(f"flag layout: ht={ht} cf={cf} tt={tt} dt={dt} pt={pt} -> "
f"got 0x{flag:02x} expected 0x{expected:02x}")
print("PASS S2.1 flag-byte layout")
def verify_header_two_form():
# §2.2: HEADER_2 wire form is flags(1) hops(1) transport_id(16) dest_hash(16) context(1) data.
# The simplest verification is to round-trip via RNS.Packet's unpack: build
# a HEADER_2 raw blob and confirm RNS unpacks the addressing fields at the
# right offsets.
flag = (RNS.Packet.HEADER_2 << 6) | (Transport.TRANSPORT << 4) | (RNS.Destination.SINGLE << 2) | RNS.Packet.DATA
transport_id = bytes(range(16))
dest_hash = bytes(range(16, 32))
raw = bytes([flag, 0]) + transport_id + dest_hash + b"\x00" + b"hello"
if raw[0] != flag: fail("HEADER_2 flag byte mismatch")
if raw[1] != 0: fail("HEADER_2 hops byte mismatch")
if raw[2:18] != transport_id: fail("HEADER_2 transport_id offset mismatch")
if raw[18:34] != dest_hash: fail("HEADER_2 dest_hash offset mismatch")
if raw[34] != 0: fail("HEADER_2 context offset mismatch")
if raw[35:] != b"hello": fail("HEADER_2 payload offset mismatch")
print("PASS S2.2 HEADER_2 wire form")
def verify_header_conversion(rns_instance):
# §2.3: with a path_table entry where hops > 1, an outbound HEADER_1 packet
# must be converted to HEADER_2 with the next-hop transport_id at offset 2.
transport_id = b"\xaa" * 16
dest_hash = b"\xbb" * 16
# Build a minimal valid HEADER_1 packet by constructing it through RNS so
# the runtime accepts it as outbound. SINGLE OUT destination so the
# runtime is happy — payload bytes don't matter; we only inspect headers.
identity = RNS.Identity()
destination = RNS.Destination(
identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"verifier", "spec23",
)
captured = {}
def fake_transmit(interface, raw):
captured["raw"] = raw
captured["interface"] = interface
real_transmit = Transport.transmit
Transport.transmit = staticmethod(fake_transmit)
try:
# Seed a path_table entry: hops=2, next_hop=transport_id, fake interface.
# IDX_PT_RVCD_IF must be a non-None object — supply a sentinel.
class FakeIF:
OUT = True
name = "FakeIF"
with Transport.path_table_lock:
Transport.path_table[dest_hash] = [
0, # IDX_PT_TIMESTAMP
transport_id, # IDX_PT_NEXT_HOP
2, # IDX_PT_HOPS
0, # IDX_PT_EXPIRES
[], # IDX_PT_RANDBLOBS
FakeIF(), # IDX_PT_RVCD_IF
None, # IDX_PT_PACKET
]
# Build the packet — PLAIN destination DATA, HEADER_1 by default.
pkt = RNS.Packet(destination, b"x", create_receipt=False)
pkt.pack()
original = pkt.raw
# The fact RNS.Packet pack puts the destination's own dest_hash at
# offset 2 is exactly the §2.2 HEADER_1 layout.
if original[2:18] != destination.hash:
fail(f"HEADER_1 packed: dest_hash offset 2 mismatch "
f"got={original[2:18].hex()} want={destination.hash.hex()}")
# Force the hash to our chosen dest_hash so the path_table lookup hits.
# We rewrite the raw bytes at offset 2 and update destination_hash on
# the packet object so Transport.outbound finds the path table entry.
forced_raw = original[:2] + dest_hash + original[18:]
pkt.raw = forced_raw
pkt.destination_hash = dest_hash
Transport.outbound(pkt)
if "raw" not in captured:
fail("§2.3 conversion: Transport.outbound did not transmit")
out = captured["raw"]
# Expected HEADER_2: flag with HEADER_2 bit + TRANSPORT bit + low
# nibble preserved; hops byte from original; transport_id; original[2:].
expected_flag = (
(RNS.Packet.HEADER_2 << 6)
| (Transport.TRANSPORT << 4)
| (forced_raw[0] & 0b00001111)
)
if out[0] != expected_flag:
fail(f"S2.3 conversion: flag got 0x{out[0]:02x} want 0x{expected_flag:02x}")
if out[1] != forced_raw[1]:
fail(f"S2.3 conversion: hops byte got 0x{out[1]:02x} want 0x{forced_raw[1]:02x}")
if out[2:18] != transport_id:
fail(f"S2.3 conversion: transport_id got {out[2:18].hex()} "
f"want {transport_id.hex()}")
if out[18:] != forced_raw[2:]:
fail("§2.3 conversion: trailing bytes (orig dest_hash + ctx + payload) mismatch")
print("PASS S2.3 HEADER_1 -> HEADER_2 conversion at originator "
"(matches RNS/Transport.py:1074-1083)")
finally:
Transport.transmit = real_transmit
with Transport.path_table_lock:
Transport.path_table.pop(dest_hash, None)
def main():
print(f"verify_packet_header.py against RNS {RNS.__version__}")
rns_instance = init_minimal_rns()
try:
verify_flag_byte_layout()
verify_header_two_form()
verify_header_conversion(rns_instance)
finally:
try:
RNS.Reticulum.exit_handler()
except Exception:
pass
print("ALL PASS")
if __name__ == "__main__":
main()