reticiulum-specification/tools/verify_link_lrrtt.py
Rob 073203abae Resolve issue #6 — LRRTT and HEADER_1 for link-addressed DATA (§6.4.2, §6.4.3)
Upstream RNS enforces two requirements in code that SPEC.md left implicit;
both caused silent message loss in a clean-room Go LXMF service against
upstream Python rns 1.2.4 / lxmf 0.9.7.

§6.4.2 LRRTT — initiator's link-activation packet
  - HEADER_1, DATA, dest_type=LINK (0x03), ctx=0xfe; body is
    `umsgpack.packb(rtt_seconds)` encrypted with the link's session keys.
  - The responder transitions HANDSHAKE→ACTIVE only on LRRTT receipt
    (Link.py:534-553), which is also what fires the link_established
    callback. LXMF's set_resource_strategy(ACCEPT_APP) is installed
    from that callback; without it, every RESOURCE_ADV the initiator
    sends hits the silent ACCEPT_NONE branch at Link.py:1087.

§6.4.3 Header type for post-handshake DATA and Resource
  - Link-addressed packets are routed via link_table, which forwards
    header bytes verbatim (Transport.py:1587-1622). HEADER_2 with a
    relay's transport_id therefore arrives at the destination intact
    and is dropped by packet_filter (Transport.py:1283-1285) as
    "for another transport instance".
  - Mandates HEADER_1 with no transport_id for all post-handshake
    link DATA / Resource / control packets regardless of hop count.
  - Asymmetry with LINKREQUEST (which IS path_table-routed and so
    HEADER_2-eligible) is spelled out.

Companion changes:
  - §6.4 renamed to "Session keys and link activation"; existing
    HKDF content moved into §6.4.1.
  - §2.5 LRRTT context-byte entry points at §6.4.2.
  - §12.5.2 (Link DATA forwarding) cross-references §6.4.3.
  - §14 failure-modes table: two new entries for the silent-drop
    chains documented above.
  - flows/send-link-lxmf.md step 4 strengthened (LRRTT is mandatory,
    not informational); step 6 corrected (Transport.outbound does NOT
    apply HEADER_1→HEADER_2 for link DATA — that conversion is
    path_table-keyed, link DATA is link_table-keyed).
  - test-vectors/links.json extended with an LRRTT entry: pinned
    rtt_seconds=0.05 + pinned 16-byte IV produces deterministic
    wire bytes for the encrypted body.
  - tools/regen_links.py drives the LRRTT generation with an
    os.urandom patch for the Token IV.
  - tools/verify_link_lrrtt.py (new) locks the wire claims:
    HEADER_1, ctx=0xfe, dest=link_id, body decrypts under
    derived_key to msgpack float64 matching rtt_seconds.

Citations all verified against installed RNS 1.2.4 / LXMF 0.9.7.
All 14 verifiers PASS.
2026-05-10 14:35:56 -04:00

128 lines
4.9 KiB
Python

"""
Verifier for SPEC.md S6.4.2 — LRRTT, the initiator's link-activation packet.
Consumes `test-vectors/links.json`. Asserts that the recorded LRRTT
vector decomposes as documented in SPEC.md S6.4.2:
- Wire header is HEADER_1 with no transport_id (per S6.4.3).
- packet_type = DATA (0x00).
- destination_type = LINK (0x03), dest_hash = link_id.
- context = LRRTT (0xfe).
- Body is link-form Token encryption (S3.1, no eph_pub prefix):
IV(16) || AES256_CBC(plaintext, key, iv) || HMAC(32)
keyed off the link's derived_key.
- Plaintext decodes via umsgpack as a single float64 matching
the recorded rtt_seconds.
Run from repo root:
python tools/verify_link_lrrtt.py
Prints PASS lines and exits 0 if every assertion holds.
"""
from __future__ import annotations
import json
import os
import sys
import RNS
from RNS.Cryptography.Token import Token
from RNS.vendor import umsgpack
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
VEC_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def main() -> int:
with open(VEC_PATH, "r", encoding="utf-8") as f:
vectors = json.load(f)["vectors"]
for v in vectors:
label = v["label"]
expected = v["expected"]
if "lrrtt" not in expected:
print(f"SKIP {label}: no lrrtt field")
continue
lrrtt = expected["lrrtt"]
link_id = bytes.fromhex(expected["link_id_hex"])
derived_key = bytes.fromhex(expected["derived_key_hex"])
rtt_seconds = lrrtt["rtt_seconds"]
iv = bytes.fromhex(lrrtt["iv_hex"])
plaintext = bytes.fromhex(lrrtt["plaintext_hex"])
raw = bytes.fromhex(lrrtt["raw_hex"])
body = bytes.fromhex(lrrtt["body_hex"])
# 1. Header decomposition (S2.1 + S6.4.2 + S6.4.3).
if len(raw) < 19:
fail(f"{label}: raw is shorter than the 19-byte HEADER_1 frame")
flags = raw[0]
hops = raw[1]
dest_hash = raw[2:18]
context = raw[18]
ciphertext = raw[19:]
header_type = (flags & 0b01000000) >> 6
context_flag = (flags & 0b00100000) >> 5
transport_type = (flags & 0b00010000) >> 4
destination_type = (flags & 0b00001100) >> 2
packet_type = (flags & 0b00000011)
if header_type != 0:
fail(f"{label}: header_type = {header_type}, expected HEADER_1 (0) per S6.4.3")
if packet_type != 0:
fail(f"{label}: packet_type = {packet_type}, expected DATA (0) per S6.4.2")
if destination_type != 3:
fail(f"{label}: destination_type = {destination_type}, expected LINK (3) per S6.4.2")
if context != 0xfe:
fail(f"{label}: context = 0x{context:02x}, expected LRRTT (0xfe) per S6.4.2")
if dest_hash != link_id:
fail(f"{label}: dest_hash = {dest_hash.hex()}, expected link_id = {link_id.hex()}")
if hops != 0:
fail(f"{label}: hops = {hops}, originator emits 0")
print(f"PASS {label} S6.4.2 LRRTT header: HEADER_1 + DATA + LINK + ctx=0xfe + dest_hash=link_id")
print(f" (context_flag={context_flag}, transport_type={transport_type})")
# 2. Body length and IV match the link-form Token layout (S3.1).
if ciphertext != body:
fail(f"{label}: raw[19:] != body — vector self-inconsistency")
if ciphertext[:16] != iv:
fail(f"{label}: body IV {ciphertext[:16].hex()} != recorded iv_hex {iv.hex()}")
print(f"PASS {label} S3.1 link-form Token: 16B IV || ciphertext || 32B HMAC, no eph_pub prefix")
# 3. Token decrypt round-trip using the link's derived_key.
recovered = Token(derived_key).decrypt(body)
if recovered != plaintext:
fail(f"{label}: Token decrypt did not recover plaintext\n"
f" got: {recovered.hex() if recovered else None}\n"
f" exp: {plaintext.hex()}")
print(f"PASS {label} S6.4.2 body decrypts under derived_key to recorded plaintext")
# 4. Plaintext is umsgpack(float) matching rtt_seconds.
decoded = umsgpack.unpackb(plaintext)
if not isinstance(decoded, float):
fail(f"{label}: plaintext umsgpack-decoded to {type(decoded).__name__}, expected float")
if decoded != rtt_seconds:
fail(f"{label}: decoded float {decoded!r} != recorded rtt_seconds {rtt_seconds!r}")
# msgpack float64 tag is 0xcb plus 8 IEEE 754 bytes — total 9 bytes.
if len(plaintext) != 9 or plaintext[0] != 0xcb:
fail(f"{label}: plaintext is not a 9-byte msgpack float64 (tag 0xcb)")
print(f"PASS {label} S6.4.2 plaintext = umsgpack(float64) rtt_seconds={rtt_seconds!r} (9B, tag 0xcb)")
print("ALL PASS")
return 0
if __name__ == "__main__":
sys.exit(main())