reticiulum-specification/tools/verify_transport_link.py
John Poole 4a14dca3a4 Completed the transport-relayed Link three-tier unit.
Key findings:

Valid established-Link traffic uses HEADER_1 and link_table.
HEADER_2 Link traffic can cross the addressed relay, then is dropped by the next node.
Relay forwarding requires correct interface and hop count.
Relay forwarding does not require IDX_LT_VALIDATED or destination_type=LINK.
Endpoint Link delivery does require destination_type=LINK.
Link-addressed PROOF uses link_table; ordinary DATA proofs use reverse_table.
Added:

Tier 1 audit
Transport-Link flow
Verifier
Deterministic vectors
Updated SPEC.md, playbook.md, README files, and existing Link flow documentation.

Verification:

Deterministic vector regeneration: identical SHA-256
Full pinned suite: 21 passed, 0 failed
git diff --check: passed
No commit created.
2026-06-08 17:50:52 -07:00

344 lines
14 KiB
Python

"""
Verifier for transport-relayed Link establishment and established-Link traffic.
Exercises upstream RNS 1.2.4 Transport.inbound with a synthetic one-relay
topology. This catches behavior that direct-Link and self-round-trip tests do
not expose.
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
import time
import RNS
from RNS import Transport
from RNS.Transport import (
IDX_LT_DSTHASH,
IDX_LT_HOPS,
IDX_LT_NH_IF,
IDX_LT_NH_TRID,
IDX_LT_PROOF_TMO,
IDX_LT_RCVD_IF,
IDX_LT_REM_HOPS,
IDX_LT_VALIDATED,
IDX_PT_EXPIRES,
IDX_PT_HOPS,
IDX_PT_NEXT_HOP,
IDX_PT_PACKET,
IDX_PT_RANDBLOBS,
IDX_PT_RVCD_IF,
IDX_PT_TIMESTAMP,
)
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
VECTORS_PATH = os.path.join(REPO_ROOT, "test-vectors", "transport-link.json")
IDS_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
def fail(message: str) -> None:
print(f"FAIL: {message}")
sys.exit(1)
def load_json(path: str):
with open(path, "r", encoding="utf-8") as input_file:
return json.load(input_file)
def init_minimal_rns():
config_dir = tempfile.mkdtemp(prefix="rns-verify-transport-link-")
config_path = os.path.join(config_dir, "config")
with open(config_path, "w", encoding="utf-8") as config:
config.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=config_dir, loglevel=0)
class FakeInterface:
OUT = True
IN = True
HW_MTU = RNS.Reticulum.MTU
AUTOCONFIGURE_MTU = True
FIXED_MTU = True
bitrate = 1_000_000
mode = RNS.Interfaces.Interface.Interface.MODE_FULL
def __init__(self, name: str):
self.name = name
def __str__(self):
return self.name
def clear_transport_state() -> None:
Transport.packet_hashlist = set()
Transport.packet_hashlist_prev = set()
Transport.path_table.clear()
Transport.link_table.clear()
Transport.reverse_table.clear()
Transport.destinations_map.clear()
Transport.pending_links.clear()
Transport.active_links.clear()
Transport.local_client_interfaces.clear()
def assert_forward(captured: list, interface, raw: bytes, label: str) -> None:
if len(captured) != 1:
fail(f"{label}: expected one forwarded packet, got {len(captured)}")
if captured[0][0] is not interface:
fail(f"{label}: forwarded on wrong interface")
if captured[0][1] != raw:
fail(f"{label}: forwarded bytes mismatch")
def assert_drop(captured: list, label: str) -> None:
if captured:
fail(f"{label}: expected drop, got {len(captured)} forwarded packet(s)")
def seed_path(destination_hash: bytes, next_hop: bytes, outbound_interface) -> None:
Transport.path_table[destination_hash] = [
time.time(), next_hop, 1, time.time() + 60, [],
outbound_interface, None,
]
def make_link_entry(next_hop: bytes, responder_if, initiator_if,
destination_hash: bytes, validated: bool = False) -> list:
return [
time.time(), next_hop, responder_if, 1, initiator_if, 1,
destination_hash, validated, time.time() + 60,
]
def verify_linkrequest(vector: dict, initiator_if, responder_if, captured: list) -> list:
expected = vector["expected"]
destination_hash = bytes.fromhex(expected["destination_hash_hex"])
link_id = bytes.fromhex(expected["link_id_hex"])
next_hop = bytes.fromhex(vector["inputs"]["next_hop_transport_id_hex"])
seed_path(destination_hash, next_hop, responder_if)
Transport.inbound(
bytes.fromhex(expected["incoming_header2_linkrequest_raw_hex"]),
initiator_if,
)
assert_forward(
captured, responder_if,
bytes.fromhex(expected["forwarded_header1_linkrequest_raw_hex"]),
"LINKREQUEST last-hop forwarding",
)
if link_id not in Transport.link_table:
fail("LINKREQUEST did not create link_table entry")
entry = Transport.link_table[link_id]
if entry[IDX_LT_NH_TRID] != next_hop:
fail("link_table next-hop transport ID mismatch")
if entry[IDX_LT_NH_IF] is not responder_if or entry[IDX_LT_RCVD_IF] is not initiator_if:
fail("link_table interfaces do not preserve Link direction")
if entry[IDX_LT_REM_HOPS] != 1 or entry[IDX_LT_HOPS] != 1:
fail("link_table expected-hop values mismatch")
if entry[IDX_LT_DSTHASH] != destination_hash or entry[IDX_LT_VALIDATED]:
fail("link_table destination/initial validation state mismatch")
if entry[IDX_LT_PROOF_TMO] <= time.time():
fail("link_table proof timeout is not in the future")
print("PASS S12.2.4 relayed LINKREQUEST strips HEADER_2 and creates directional link_table state")
return entry
def verify_lrproof(vector: dict, bob_identity, initiator_if, responder_if,
captured: list) -> None:
expected = vector["expected"]
link_id = bytes.fromhex(expected["link_id_hex"])
destination_hash = bytes.fromhex(expected["destination_hash_hex"])
next_hop = bytes.fromhex(vector["inputs"]["next_hop_transport_id_hex"])
RNS.Identity.remember(bytes(32), destination_hash, bob_identity.get_public_key(), None)
captured.clear()
Transport.inbound(bytes.fromhex(expected["incoming_lrproof_raw_hex"]), responder_if)
assert_forward(
captured, initiator_if, bytes.fromhex(expected["forwarded_lrproof_raw_hex"]),
"valid LRPROOF",
)
if not Transport.link_table[link_id][IDX_LT_VALIDATED]:
fail("valid LRPROOF did not validate link_table entry")
for label, raw, interface in [
("wrong-interface LRPROOF", bytes.fromhex(expected["incoming_lrproof_raw_hex"]), initiator_if),
("wrong-hop LRPROOF", bytes.fromhex(expected["incoming_lrproof_raw_hex"])[:1] + b"\x01" + bytes.fromhex(expected["incoming_lrproof_raw_hex"])[2:], responder_if),
]:
captured.clear()
Transport.link_table[link_id] = make_link_entry(next_hop, responder_if, initiator_if, destination_hash)
Transport.inbound(raw, interface)
assert_drop(captured, label)
if Transport.link_table[link_id][IDX_LT_VALIDATED]:
fail(f"{label}: invalid proof marked link validated")
tampered = bytearray.fromhex(expected["incoming_lrproof_raw_hex"])
tampered[-4] ^= 0x01
captured.clear()
Transport.link_table[link_id] = make_link_entry(next_hop, responder_if, initiator_if, destination_hash)
Transport.inbound(bytes(tampered), responder_if)
assert_drop(captured, "bad-signature LRPROOF")
print("PASS S12.5.1 LRPROOF requires valid signature, direction, and hop count")
def verify_established_traffic(vector: dict, initiator_if, responder_if,
captured: list) -> None:
expected = vector["expected"]
link_id = bytes.fromhex(expected["link_id_hex"])
destination_hash = bytes.fromhex(expected["destination_hash_hex"])
next_hop = bytes.fromhex(vector["inputs"]["next_hop_transport_id_hex"])
incoming = bytes.fromhex(expected["incoming_link_data_raw_hex"])
forwarded = bytes.fromhex(expected["forwarded_link_data_raw_hex"])
for label, ingress, egress, validated in [
("initiator-to-responder", initiator_if, responder_if, True),
("responder-to-initiator", responder_if, initiator_if, True),
("pre-LRPROOF forwarding", initiator_if, responder_if, False),
]:
captured.clear()
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.link_table[link_id] = make_link_entry(
next_hop, responder_if, initiator_if, destination_hash, validated
)
Transport.inbound(incoming, ingress)
assert_forward(captured, egress, forwarded, label)
for label, raw, ingress in [
("wrong-hop Link DATA", incoming[:1] + b"\x01" + incoming[2:], initiator_if),
("wrong-interface Link DATA", incoming, FakeInterface("unrelated")),
]:
captured.clear()
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.link_table[link_id] = make_link_entry(
next_hop, responder_if, initiator_if, destination_hash, True
)
Transport.inbound(raw, ingress)
assert_drop(captured, label)
wrong_type = bytes([(incoming[0] & ~0x0C) | (RNS.Destination.SINGLE << 2)]) + incoming[1:]
captured.clear()
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.link_table[link_id] = make_link_entry(
next_hop, responder_if, initiator_if, destination_hash, True
)
Transport.inbound(wrong_type, initiator_if)
assert_forward(captured, responder_if, wrong_type[:1] + b"\x01" + wrong_type[2:], "non-LINK destination type")
class EndpointLink:
def __init__(self):
self.link_id = link_id
self.attached_interface = responder_if
self.received = []
def receive(self, packet):
self.received.append(packet)
endpoint_link = EndpointLink()
Transport.link_table.clear()
Transport.active_links.append(endpoint_link)
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.inbound(wrong_type, responder_if)
if endpoint_link.received:
fail("endpoint delivered non-LINK destination type to active Link")
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.inbound(incoming, responder_if)
if len(endpoint_link.received) != 1:
fail("endpoint did not deliver LINK destination type to active Link")
Transport.active_links.clear()
proof = bytes([(incoming[0] & ~0x03) | RNS.Packet.PROOF]) + incoming[1:35]
captured.clear()
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.link_table[link_id] = make_link_entry(
next_hop, responder_if, initiator_if, destination_hash, True
)
Transport.inbound(proof, responder_if)
assert_forward(captured, initiator_if, proof[:1] + b"\x01" + proof[2:], "Link PROOF")
if Transport.reverse_table:
fail("Link PROOF unexpectedly used reverse_table")
print("PASS S12.5.2 link_table forwards both directions and Link PROOF without reverse_table")
print("PASS S12.5.2 relay lookup ignores VALIDATED/dest_type; endpoint Link dispatch requires dest_type=LINK")
def verify_invalid_header_two(vector: dict, initiator_if, responder_if,
captured: list, original_identity) -> None:
expected = vector["expected"]
link_id = bytes.fromhex(expected["link_id_hex"])
destination_hash = bytes.fromhex(expected["destination_hash_hex"])
next_hop = bytes.fromhex(vector["inputs"]["next_hop_transport_id_hex"])
raw = bytes.fromhex(expected["invalid_header2_link_data_raw_hex"])
captured.clear()
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.link_table[link_id] = make_link_entry(
next_hop, responder_if, initiator_if, destination_hash, True
)
Transport.inbound(raw, initiator_if)
assert_forward(captured, responder_if, raw[:1] + b"\x01" + raw[2:], "HEADER_2 Link DATA first relay")
packet = RNS.Packet(None, captured[0][1])
if not packet.unpack():
fail("could not unpack forwarded HEADER_2 Link DATA")
Transport.packet_hashlist.clear()
Transport.packet_hashlist_prev.clear()
Transport.identity = RNS.Identity()
if Transport.packet_filter(packet):
fail("next node accepted HEADER_2 Link DATA addressed to prior relay")
Transport.identity = original_identity
print("PASS S6.4.3 HEADER_2 Link DATA survives first link_table relay but is rejected by the next node")
def main() -> None:
print(f"verify_transport_link.py against RNS {RNS.__version__}")
init_minimal_rns()
vector = load_json(VECTORS_PATH)
identities = load_json(IDS_PATH)["vectors"]
alice = next(item for item in identities if item["label"] == "alice")
bob = next(item for item in identities if item["label"] == "bob")
relay_identity = RNS.Identity.from_bytes(bytes.fromhex(alice["inputs"]["private_key_hex"]))
bob_identity = RNS.Identity.from_bytes(bytes.fromhex(bob["inputs"]["private_key_hex"]))
initiator_if = FakeInterface("initiator-side")
responder_if = FakeInterface("responder-side")
captured: list[tuple[object, bytes]] = []
original_transmit = Transport.transmit
original_transport_enabled = RNS.Reticulum.transport_enabled
original_identity = Transport.identity
try:
clear_transport_state()
if relay_identity.hash.hex() != vector["inputs"]["relay_identity_hash_hex"]:
fail("pinned relay identity hash mismatch")
Transport.identity = relay_identity
RNS.Reticulum.transport_enabled = staticmethod(lambda: True)
Transport.transmit = staticmethod(lambda interface, raw: captured.append((interface, raw)))
verify_linkrequest(vector, initiator_if, responder_if, captured)
verify_lrproof(vector, bob_identity, initiator_if, responder_if, captured)
verify_established_traffic(vector, initiator_if, responder_if, captured)
verify_invalid_header_two(vector, initiator_if, responder_if, captured, relay_identity)
print("ALL PASS")
finally:
Transport.transmit = original_transmit
RNS.Reticulum.transport_enabled = original_transport_enabled
Transport.identity = original_identity
clear_transport_state()
try:
RNS.Reticulum.exit_handler()
except Exception:
pass
if __name__ == "__main__":
main()