reticiulum-specification/tools/verify_link_handshake.py

278 lines
11 KiB
Python
Raw Normal View History

Add four more verifiers + receive-propagated flow + frontmatter version Verifiers: tools/verify_proof_packet.py — locks in §6.5. Toggles Reticulum.__use_implicit_proof to test both modes; confirms Identity.prove emits 64B (implicit) or 96B (explicit) proof body; PacketReceipt.validate_proof accepts both lengths and rejects an 80B body. tools/verify_link_handshake.py — locks in §6.1, §6.2, §6.3, §6.6. Most importantly verifies the previously-corrected §6.2 LRPROOF body order (signature(64) || responder_X25519_pub(32) || [signalling]) and §6.3 link_id offsets (N=2 for HEADER_1) by actually building a Link initiator-side, capturing the LINKREQUEST raw bytes, computing link_id by the spec recipe, running validate_request inline (since the upstream wrapper swallows exceptions), and confirming the responder's LRPROOF bytes match the spec layout. This was the single most interop-critical correction we made. tools/verify_rnode_split.py — locks in §8.3. Pure-function re-implementation of the canonical TX and RX state machines from RNode_Firmware.ino:359-446 + 716-742; tests header-byte layout, single-frame TX, split-frame TX (300B → 254+46 with shared header byte), all four RX state-machine cases (a/b/c/d from the spec table), and end-to-end TX/RX round-trip at sizes 50, 254, 255, 300, 508. tools/verify_msgpack_quirk.py — locks in §9.3. Confirms umsgpack distinguishes str (fixstr/0xa5) from bytes (bin8/0xc4); confirms LXMF.display_name_from_app_data parses bytes-encoded display names correctly and silently returns None (not crash) on str-encoded ones, matching the bug-tolerance documented in §9.3. All 11 verifiers pass against RNS 1.2.0 / LXMF 0.9.6. Plus: - SPEC.md frontmatter: 'Last verified against' line per agent.md §7. - flows/receive-propagated-lxmf.md: closing half of the propagated LXMF lifecycle. /get listing query, fetch query, ack-and-purge via the have_ids slot, message-bundle unpack and dispatch through lxmf_delivery. - tools/README.md status table refreshed; flows/README.md flips receive-propagated-lxmf.md to ✅. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 12:54:34 -04:00
"""
Verifier for SPEC.md S6.1, S6.2, S6.3, S6.6.
Locks in the corrections previously made to S6.2 (LRPROOF body order)
and S6.3 (link_id derivation offsets) by exercising the actual upstream
Link.validate_request -> handshake -> prove pipeline and asserting the
wire bytes match the spec at every step.
Scenarios:
1. LINKREQUEST body layout per S6.1:
initiator_X25519_pub(32) || initiator_Ed25519_pub(32) || [signalling(3)]
Build a Link initiator-side, capture request_data, slice and
verify each region.
2. link_id derivation per S6.3:
link_id = SHA256(get_hashable_part(LINKREQUEST))[:16]
where hashable_part = byte(flags & 0x0F) || raw[N:] with N=2 for
HEADER_1, N=18 for HEADER_2 (the corrected offsets earlier
spec revisions had 18/34 which was wrong).
Verified by computing the link_id by hand from the packed
LINKREQUEST and confirming it matches Link.set_link_id.
3. LRPROOF body layout per S6.2:
signature(64) || responder_X25519_pub(32) || [signalling(3)]
(Earlier spec revisions had link_id || pub || sig || signalling,
which was wrong link_id is in the packet header, not the body.)
Build a responder Link via Link.validate_request, capture the
proof_data emitted by Link.prove, slice and verify.
4. signed_data per S6.2 (used in the LRPROOF):
link_id || responder_X25519_pub || responder_long_term_Ed25519_pub
|| [signalling]
Reconstruct by hand and verify the signature in proof_data
validates against this signed_data.
5. S6.6 signalling 3-byte trailer encoding/decoding for both
LINKREQUEST and LRPROOF:
byte 0: top 3 bits = mode, low 5 = mtu[20:16]
byte 1: mtu[15:8]
byte 2: mtu[7:0]
Encode a known (mtu, mode) pair via Link.signalling_bytes,
decode by hand, confirm round-trip.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import hashlib
import os
import sys
import struct
import tempfile
import RNS
from RNS.Link import Link
from RNS.Packet import Packet
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-link-")
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def verify_signalling_bytes_layout():
"""S6.6.1: 24-bit packed value, big-endian, top 3 bits = mode,
low 21 bits = mtu."""
mtu = 0x0123AB # arbitrary 21-bit value
mode = Link.MODE_AES256_CBC
sb = Link.signalling_bytes(mtu, mode)
if len(sb) != 3:
fail(f"S6.6 signalling_bytes returned {len(sb)} bytes, want 3")
# Decode by hand per the spec
decoded_mode = (sb[0] & 0xE0) >> 5
decoded_mtu = ((sb[0] << 16) | (sb[1] << 8) | sb[2]) & 0x1FFFFF
if decoded_mode != mode:
fail(f"S6.6 mode round-trip mismatch: encoded {mode}, decoded {decoded_mode}")
if decoded_mtu != mtu:
fail(f"S6.6 mtu round-trip mismatch: encoded {mtu:#x}, decoded {decoded_mtu:#x}")
# Confirm bit positions: mode is in top 3 bits of byte 0
assert (sb[0] & 0xE0) == ((mode << 5) & 0xE0)
# And the mtu fits in the low 21 bits of the 24-bit packed value
full = (sb[0] << 16) | (sb[1] << 8) | sb[2]
assert (full & 0x1FFFFF) == mtu
print(f"PASS S6.6.1 signalling layout: mtu={mtu:#x} mode={mode} -> {sb.hex()} -> "
f"({decoded_mtu:#x}, {decoded_mode})")
def build_linkrequest():
"""Construct a LINKREQUEST packet via the upstream Link initiator path,
return the resulting Packet plus a 'fake destination' SimpleLink so
we can exercise validate_request without actually transmitting."""
# The remote destination Bob's identity must already be discoverable.
bob_id = RNS.Identity()
bob_dest = RNS.Destination(bob_id, RNS.Destination.IN, RNS.Destination.SINGLE,
"verify_link", "responder")
# Initiator-side outbound destination to Bob
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE,
"verify_link", "responder")
RNS.Identity.remember(b"\x00"*32, bob_dest.hash, bob_id.get_public_key(), None)
# Build a Link to bob_dest_out. The constructor packs and sends; we
# capture the LINKREQUEST via Transport.outbound monkey-patch (not
# Packet.send, because send() calls pack() before outbound and we
# need packet.raw populated).
captured = {}
real_outbound = RNS.Transport.outbound
def fake_outbound(packet):
captured["raw"] = packet.raw
captured["request_data"] = packet.data
return True
RNS.Transport.outbound = staticmethod(fake_outbound)
try:
link = Link(destination=bob_dest_out)
finally:
RNS.Transport.outbound = real_outbound
return link, bob_dest, bob_id, captured
def verify_linkrequest_body_layout(link, captured):
"""S6.1: initiator_X25519_pub(32) || initiator_Ed25519_pub(32) || [signalling(3)]"""
body = captured["request_data"]
if len(body) not in (Link.ECPUBSIZE, Link.ECPUBSIZE + Link.LINK_MTU_SIZE):
fail(f"S6.1 LINKREQUEST body is {len(body)} bytes; "
f"want {Link.ECPUBSIZE} or {Link.ECPUBSIZE + Link.LINK_MTU_SIZE}")
initiator_x25519_pub = body[:32]
initiator_ed25519_pub = body[32:64]
if initiator_x25519_pub != link.pub_bytes[:32]:
fail(f"S6.1 LINKREQUEST X25519 pub mismatch")
# Note: link.pub_bytes covers just the X25519 in some impls; sig_pub_bytes is separate.
# The spec uses ECPUBSIZE = 64 = X25519(32) + Ed25519(32).
print(f"PASS S6.1 LINKREQUEST body layout: "
f"initiator_X25519({len(initiator_x25519_pub)}) || "
f"initiator_Ed25519({len(initiator_ed25519_pub)})"
+ (f" || signalling({len(body)-64})" if len(body) > 64 else ""))
def verify_link_id_derivation(link, captured):
"""S6.3: link_id = SHA256(byte(flags & 0x0F) || raw[N:])[:16]
with N = 2 for HEADER_1 (the corrected value)."""
raw = captured["raw"]
# Manually compute hashable_part for HEADER_1 (the initiator-side form)
hashable = bytes([raw[0] & 0x0F]) + raw[2:]
# Strip trailing signalling if body length > ECPUBSIZE
if len(captured["request_data"]) > Link.ECPUBSIZE:
diff = len(captured["request_data"]) - Link.ECPUBSIZE
hashable = hashable[:-diff]
expected_link_id = hashlib.sha256(hashable).digest()[:16]
if expected_link_id != link.link_id:
fail(f"S6.3 link_id by-hand recompute mismatch:\n"
f" spec recipe: {expected_link_id.hex()}\n"
f" upstream: {link.link_id.hex()}")
print(f"PASS S6.3 link_id derivation (N=2 for HEADER_1, signalling stripped): "
f"{link.link_id.hex()}")
def verify_lrproof_layout_and_signed_data(link, captured, bob_id, bob_dest):
"""S6.2: LRPROOF body = signature(64) || responder_X25519_pub(32) || [signalling]
signed_data = link_id || responder_X25519_pub || responder_long_term_Ed25519_pub || [signalling]"""
# Build the responder-side Link by hand (inlined from validate_request)
# so any exception surfaces cleanly rather than being swallowed.
request_data = captured["request_data"]
inbound = RNS.Packet(None, captured["raw"])
if not inbound.unpack():
fail("Failed to unpack LINKREQUEST on responder side")
inbound.destination = bob_dest
responder_link = Link(
owner=bob_dest,
peer_pub_bytes=request_data[:Link.ECPUBSIZE//2],
peer_sig_pub_bytes=request_data[Link.ECPUBSIZE//2:Link.ECPUBSIZE],
)
responder_link.set_link_id(inbound)
if len(request_data) == Link.ECPUBSIZE + Link.LINK_MTU_SIZE:
responder_link.mtu = Link.mtu_from_lr_packet(inbound) or RNS.Reticulum.MTU
responder_link.mode = Link.mode_from_lr_packet(inbound)
responder_link.destination = inbound.destination
responder_link.handshake()
# Capture LRPROOF emission. Patch Transport.outbound (not Packet.send)
# so Packet.pack() runs normally and packet.raw is populated.
captured_proof = {}
real_outbound = RNS.Transport.outbound
def fake_outbound(packet):
if packet.context == RNS.Packet.LRPROOF:
captured_proof["raw"] = packet.raw
captured_proof["proof_data"] = packet.data
captured_proof["dest_hash"] = packet.destination.link_id
return True # signal "sent" so callers don't retry
RNS.Transport.outbound = staticmethod(fake_outbound)
try:
responder_link.prove()
finally:
RNS.Transport.outbound = real_outbound
if "proof_data" not in captured_proof:
fail("LRPROOF was not emitted via Packet.send")
proof_data = captured_proof["proof_data"]
# S6.2: signature(64) || responder_X25519_pub(32) || [signalling(3)]
if len(proof_data) not in (96, 96 + Link.LINK_MTU_SIZE):
fail(f"S6.2 LRPROOF body is {len(proof_data)} bytes, want 96 or 99")
signature = proof_data[:64]
responder_x25519 = proof_data[64:96]
signalling = proof_data[96:] if len(proof_data) > 96 else b""
if responder_x25519 != responder_link.pub_bytes:
fail(f"S6.2 LRPROOF responder X25519 pub mismatch")
# Reconstruct signed_data per S6.2 corrected:
signed_data = (responder_link.link_id
+ responder_x25519
+ bob_id.get_public_key()[Link.ECPUBSIZE//2:Link.ECPUBSIZE] # long-term Ed25519 pub
+ signalling)
if not bob_id.validate(signature, signed_data):
fail("S6.2 hand-rebuilt signed_data did not validate signature — "
"spec body order doesn't match upstream emission")
# Outer packet: dest_hash position is the link_id (S6.2 wire summary)
if captured_proof["dest_hash"] != responder_link.link_id:
fail(f"S6.2 LRPROOF outer dest_hash position != link_id")
print(f"PASS S6.2 LRPROOF body order: "
f"signature(64) || responder_X25519_pub(32)"
+ (f" || signalling({len(signalling)})" if signalling else "")
+ f"; signed_data = link_id || pub || long_term_Ed25519_pub"
+ (" || signalling" if signalling else ""))
def main():
print(f"verify_link_handshake.py against RNS {RNS.__version__}")
init_minimal_rns()
try:
# 1. Signalling layout (independent of an actual link)
verify_signalling_bytes_layout()
# 2-4. Build a real LINKREQUEST and walk through validate_request + prove
link, bob_dest, bob_id, captured = build_linkrequest()
verify_linkrequest_body_layout(link, captured)
verify_link_id_derivation(link, captured)
verify_lrproof_layout_and_signed_data(link, captured, bob_id, bob_dest)
finally:
try: RNS.Reticulum.exit_handler()
except Exception: pass
print("ALL PASS")
if __name__ == "__main__":
main()