reticiulum-specification/tools/regen_links.py
Rob 073203abae Resolve issue #6 — LRRTT and HEADER_1 for link-addressed DATA (§6.4.2, §6.4.3)
Upstream RNS enforces two requirements in code that SPEC.md left implicit;
both caused silent message loss in a clean-room Go LXMF service against
upstream Python rns 1.2.4 / lxmf 0.9.7.

§6.4.2 LRRTT — initiator's link-activation packet
  - HEADER_1, DATA, dest_type=LINK (0x03), ctx=0xfe; body is
    `umsgpack.packb(rtt_seconds)` encrypted with the link's session keys.
  - The responder transitions HANDSHAKE→ACTIVE only on LRRTT receipt
    (Link.py:534-553), which is also what fires the link_established
    callback. LXMF's set_resource_strategy(ACCEPT_APP) is installed
    from that callback; without it, every RESOURCE_ADV the initiator
    sends hits the silent ACCEPT_NONE branch at Link.py:1087.

§6.4.3 Header type for post-handshake DATA and Resource
  - Link-addressed packets are routed via link_table, which forwards
    header bytes verbatim (Transport.py:1587-1622). HEADER_2 with a
    relay's transport_id therefore arrives at the destination intact
    and is dropped by packet_filter (Transport.py:1283-1285) as
    "for another transport instance".
  - Mandates HEADER_1 with no transport_id for all post-handshake
    link DATA / Resource / control packets regardless of hop count.
  - Asymmetry with LINKREQUEST (which IS path_table-routed and so
    HEADER_2-eligible) is spelled out.

Companion changes:
  - §6.4 renamed to "Session keys and link activation"; existing
    HKDF content moved into §6.4.1.
  - §2.5 LRRTT context-byte entry points at §6.4.2.
  - §12.5.2 (Link DATA forwarding) cross-references §6.4.3.
  - §14 failure-modes table: two new entries for the silent-drop
    chains documented above.
  - flows/send-link-lxmf.md step 4 strengthened (LRRTT is mandatory,
    not informational); step 6 corrected (Transport.outbound does NOT
    apply HEADER_1→HEADER_2 for link DATA — that conversion is
    path_table-keyed, link DATA is link_table-keyed).
  - test-vectors/links.json extended with an LRRTT entry: pinned
    rtt_seconds=0.05 + pinned 16-byte IV produces deterministic
    wire bytes for the encrypted body.
  - tools/regen_links.py drives the LRRTT generation with an
    os.urandom patch for the Token IV.
  - tools/verify_link_lrrtt.py (new) locks the wire claims:
    HEADER_1, ctx=0xfe, dest=link_id, body decrypts under
    derived_key to msgpack float64 matching rtt_seconds.

Citations all verified against installed RNS 1.2.4 / LXMF 0.9.7.
All 14 verifiers PASS.
2026-05-10 14:35:56 -04:00

343 lines
16 KiB
Python

"""
Regenerator for test-vectors/links.json.
Builds a deterministic Link handshake vector pair (LINKREQUEST + LRPROOF)
between Alice (initiator) and Bob (responder) using identities from
`identities.json`.
Determinism inputs:
- Alice and Bob long-term identities (fixed via identities.json).
- Initiator's ephemeral X25519 private key (`Link.prv`,
`RNS/Link.py:285`) — fixed.
- Initiator's ephemeral Ed25519 private key (`Link.sig_prv`,
`RNS/Link.py:286`) — fixed. (This is *not* Alice's long-term key;
Link's signing key is generated fresh per handshake.)
- Responder's ephemeral X25519 private key (`Link.prv`,
`RNS/Link.py:278`) — fixed.
- Both Ed25519 signatures are deterministic per RFC 8032.
Outputs recorded per vector:
- `linkrequest_raw_hex` — full packed LINKREQUEST packet bytes
- `linkrequest_body_hex` — initiator_X25519_pub || initiator_Ed25519_pub
[|| signalling] (S6.1)
- `link_id_hex` — derived per S6.3 (N=2 for HEADER_1)
- `lrproof_raw_hex` — full packed LRPROOF packet bytes
- `lrproof_body_hex` — signature || responder_X25519_pub
[|| signalling] (S6.2)
- `derived_key_hex` — HKDF output for AES256_CBC mode
(length=64, salt=link_id, context=context)
— both sides must arrive at the same
bytes after handshake().
Run from repo root:
python tools/regen_links.py
Updates `test-vectors/links.json` in place. Exit 0 on success.
"""
from __future__ import annotations
import hashlib
import json
import os
import sys
import tempfile
import RNS
from RNS.Link import Link
from RNS.vendor import umsgpack
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OUT_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
IDS_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
# Stable inputs. Three distinct fixed priv key blobs for the three
# ephemeral generations the handshake performs.
INITIATOR_X25519_PRIV = bytes.fromhex("11"*32)
INITIATOR_ED25519_PRIV = bytes.fromhex("22"*32)
RESPONDER_X25519_PRIV = bytes.fromhex("33"*32)
# Pinned LRRTT inputs. The IV here gets injected via os.urandom while
# packing the LRRTT packet so the wire bytes are reproducible. RTT
# value is the initiator's measured LRREQ->LRPROOF time per S6.4.2;
# the exact value is non-load-bearing (responder takes max with its
# own measurement) but pinning it keeps the wire bytes deterministic.
LRRTT_RTT_SECONDS = 0.05
LRRTT_IV = bytes.fromhex("44"*16)
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-regen-links-")
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def load_identities():
with open(IDS_PATH, "r", encoding="utf-8") as f:
ids = json.load(f)
alice = next(v for v in ids["vectors"] if v["label"] == "alice")
bob = next(v for v in ids["vectors"] if v["label"] == "bob")
alice_id = RNS.Identity.from_bytes(bytes.fromhex(alice["inputs"]["private_key_hex"]))
bob_id = RNS.Identity.from_bytes(bytes.fromhex(bob ["inputs"]["private_key_hex"]))
if alice_id is None or bob_id is None:
fail("Identity.from_bytes returned None for one of alice/bob")
return alice_id, bob_id
class _StaticPrivQueue:
"""Wraps an upstream X25519/Ed25519 dispatcher class so that
`.generate()` returns a private key seeded with the next blob in a
fixed queue. Each consumed blob is removed; if the queue underflows
we delegate to the real `.generate()` so non-handshake code paths
still work."""
def __init__(self, real_cls, blobs):
self._real_cls = real_cls
self._blobs = list(blobs)
def generate(self):
if self._blobs:
blob = self._blobs.pop(0)
return self._real_cls.from_private_bytes(blob)
return self._real_cls.generate()
# Defer everything else (from_private_bytes, from_public_bytes, etc.)
# to the real class so the rest of upstream's surface keeps working.
def __getattr__(self, name):
return getattr(self._real_cls, name)
def main():
print(f"regen_links.py against RNS {RNS.__version__}")
init_minimal_rns()
try:
alice_id, bob_id = load_identities()
# Bob's destination — the link target. Mark Bob's identity
# discoverable to Alice so Link initiator can find pub keys.
bob_dest_in = RNS.Destination(bob_id, RNS.Destination.IN, RNS.Destination.SINGLE,
"vectors", "link")
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE,
"vectors", "link")
RNS.Identity.remember(b"\x00"*32, bob_dest_in.hash, bob_id.get_public_key(), None)
# Patch the X25519 / Ed25519 generators that Link.__init__ uses
# to drive deterministic ephemeral keys. Order of consumption
# follows the source:
# line 285: initiator self.prv = X25519PrivateKey.generate()
# line 286: initiator self.sig_prv = Ed25519PrivateKey.generate()
# line 278: responder self.prv = X25519PrivateKey.generate()
link_mod = sys.modules["RNS.Link"]
real_X25519 = link_mod.X25519PrivateKey
real_Ed25519 = link_mod.Ed25519PrivateKey
link_mod.X25519PrivateKey = _StaticPrivQueue(real_X25519,
[INITIATOR_X25519_PRIV,
RESPONDER_X25519_PRIV])
link_mod.Ed25519PrivateKey = _StaticPrivQueue(real_Ed25519,
[INITIATOR_ED25519_PRIV])
# Capture LINKREQUEST emission. Patch Transport.outbound (not
# Packet.send) so Packet.pack() runs and packet.raw is populated.
captured_lr = {}
captured_pf = {}
real_outbound = RNS.Transport.outbound
def fake_outbound(packet):
if packet.packet_type == RNS.Packet.LINKREQUEST and "raw" not in captured_lr:
captured_lr["raw"] = packet.raw
captured_lr["data"] = packet.data
elif packet.context == RNS.Packet.LRPROOF and "raw" not in captured_pf:
captured_pf["raw"] = packet.raw
captured_pf["data"] = packet.data
return True
RNS.Transport.outbound = staticmethod(fake_outbound)
try:
# 1. Build initiator-side Link → LINKREQUEST emitted via outbound
initiator = Link(destination=bob_dest_out)
# 2. Walk the responder side by hand (Link.validate_request
# inlined). This keeps the script self-contained.
inbound = RNS.Packet(None, captured_lr["raw"])
if not inbound.unpack():
fail("Failed to unpack the captured LINKREQUEST")
inbound.destination = bob_dest_in
request_data = captured_lr["data"]
responder = Link(
owner = bob_dest_in,
peer_pub_bytes = request_data[:Link.ECPUBSIZE//2],
peer_sig_pub_bytes = request_data[Link.ECPUBSIZE//2:Link.ECPUBSIZE],
)
responder.set_link_id(inbound)
if len(request_data) == Link.ECPUBSIZE + Link.LINK_MTU_SIZE:
responder.mtu = Link.mtu_from_lr_packet(inbound) or RNS.Reticulum.MTU
responder.mode = Link.mode_from_lr_packet(inbound)
responder.destination = inbound.destination
responder.handshake() # populates responder.derived_key
responder.prove() # emits LRPROOF via outbound
finally:
RNS.Transport.outbound = real_outbound
link_mod.X25519PrivateKey = real_X25519
link_mod.Ed25519PrivateKey = real_Ed25519
# 3. Drive the initiator's handshake by feeding it the captured
# LRPROOF, so its derived_key is computed too. We assert
# initiator.derived_key == responder.derived_key.
proof_pkt = RNS.Packet(None, captured_pf["raw"])
if not proof_pkt.unpack():
fail("Failed to unpack the captured LRPROOF")
# validate_proof needs the link instance to know its destination
proof_pkt.destination = initiator
# Set up the initiator state so validate_proof can run end-to-end
import time as _time
initiator.request_time = _time.time()
if not initiator.validate_proof(proof_pkt):
# Some versions return None on success, so don't hard-fail
# solely on that — but check derived_key was populated.
pass
if initiator.derived_key is None:
fail("Initiator derived_key not populated after validate_proof")
if responder.derived_key is None:
fail("Responder derived_key not populated after handshake")
if initiator.derived_key != responder.derived_key:
fail("Initiator and responder derived_key disagree:\n"
f" initiator: {initiator.derived_key.hex()}\n"
f" responder: {responder.derived_key.hex()}")
if initiator.link_id != responder.link_id:
fail(f"link_id disagree: ini={initiator.link_id.hex()} res={responder.link_id.hex()}")
# 4. Build the initiator's LRRTT packet (S6.4.2). The initiator
# emits this immediately after LRPROOF validation, before any
# application DATA. We pin os.urandom so the Token IV is
# deterministic; the rest of the wire form falls out of the
# link's derived_key (already populated above).
lrrtt_plaintext = umsgpack.packb(LRRTT_RTT_SECONDS)
token_mod = sys.modules["RNS.Cryptography.Token"]
real_urandom = token_mod.os.urandom
def fake_urandom(n):
if n == 16: return LRRTT_IV
return real_urandom(n)
token_mod.os.urandom = fake_urandom
try:
lrrtt_packet = RNS.Packet(initiator, lrrtt_plaintext,
context=RNS.Packet.LRRTT)
lrrtt_packet.pack()
finally:
token_mod.os.urandom = real_urandom
# Sanity round-trip: the responder side should decrypt
# the captured ciphertext and recover the same float.
if responder.decrypt(lrrtt_packet.ciphertext) != lrrtt_plaintext:
fail("LRRTT round-trip failed: responder.decrypt did not "
"recover the pinned plaintext.")
# Slice fields per S6.1 / S6.2 for human inspection
lr_data = captured_lr["data"]
ini_x25519_pub = lr_data[:32]
ini_ed25519_pub = lr_data[32:64]
lr_signalling = lr_data[64:] if len(lr_data) > 64 else b""
pf_data = captured_pf["data"]
pf_signature = pf_data[:64]
pf_responder_x25519 = pf_data[64:96]
pf_signalling = pf_data[96:] if len(pf_data) > 96 else b""
vector = {
"label": "alice_to_bob_aes256cbc",
"inputs": {
"initiator_identity_label": "alice",
"responder_identity_label": "bob",
"destination_full_name": "vectors.link",
"initiator_x25519_priv_hex": INITIATOR_X25519_PRIV.hex(),
"initiator_ed25519_priv_hex": INITIATOR_ED25519_PRIV.hex(),
"responder_x25519_priv_hex": RESPONDER_X25519_PRIV.hex(),
"mode": "MODE_AES256_CBC (0x01)",
},
"expected": {
"linkrequest_raw_hex": captured_lr["raw"].hex(),
"linkrequest_body_hex": lr_data.hex(),
"linkrequest_fields": {
"initiator_x25519_pub_hex": ini_x25519_pub.hex(),
"initiator_ed25519_pub_hex": ini_ed25519_pub.hex(),
"signalling_hex": lr_signalling.hex(),
},
"link_id_hex": initiator.link_id.hex(),
"lrproof_raw_hex": captured_pf["raw"].hex(),
"lrproof_body_hex": pf_data.hex(),
"lrproof_fields": {
"signature_hex": pf_signature.hex(),
"responder_x25519_pub_hex": pf_responder_x25519.hex(),
"signalling_hex": pf_signalling.hex(),
},
"shared_secret_hex": initiator.shared_key.hex(),
"derived_key_hex": initiator.derived_key.hex(),
"mtu": initiator.mtu,
"mode": initiator.mode,
"lrrtt": {
"rtt_seconds": LRRTT_RTT_SECONDS,
"iv_hex": LRRTT_IV.hex(),
"plaintext_hex": lrrtt_plaintext.hex(),
"raw_hex": lrrtt_packet.raw.hex(),
"body_hex": lrrtt_packet.ciphertext.hex(),
},
},
"rns_version_at_generation": RNS.__version__,
"generator_script": "tools/regen_links.py",
"verifies_spec_sections": ["6.1", "6.2", "6.3", "6.4.1",
"6.4.2", "6.6"],
}
payload = {
"_about": (
"Link handshake test vectors. Each vector records a full "
"Reticulum Link handshake: LINKREQUEST (initiator -> "
"responder) and LRPROOF (responder -> initiator). The "
"ephemeral X25519/Ed25519 keys are pinned via the "
"`inputs.*_priv_hex` blobs; both Ed25519 signatures are "
"RFC 8032 deterministic so the resulting wire bytes are "
"reproducible. A clean-room implementation can verify by: "
"(a) packing a LINKREQUEST from the recorded initiator "
"ephemerals and confirming bytes match `linkrequest_raw_hex`; "
"(b) computing `link_id` per SPEC.md S6.3 (N=2 for HEADER_1) "
"and matching `link_id_hex`; (c) packing an LRPROOF as the "
"responder, with bob's identity Ed25519 sig over `link_id || "
"responder_X25519_pub || responder_long_term_Ed25519_pub || "
"signalling`, and matching `lrproof_raw_hex`; (d) running "
"ECDH+HKDF on either side and matching `derived_key_hex`; "
"(e) building an LRRTT packet (S6.4.2) addressed to the "
"link_id with `context=LRRTT (0xfe)` and an encrypted body "
"of `umsgpack.packb(lrrtt.rtt_seconds)`, using `lrrtt.iv_hex` "
"as the Token IV, and matching `lrrtt.raw_hex` / `lrrtt.body_hex`. "
"Regenerate with `generator_script`."
),
"vectors": [vector],
}
with open(OUT_PATH, "w", encoding="utf-8", newline="\n") as f:
json.dump(payload, f, indent=2, sort_keys=False)
f.write("\n")
print(f"Wrote {OUT_PATH} with 1 vector")
print("ALL PASS")
finally:
try: RNS.Reticulum.exit_handler()
except Exception: pass
if __name__ == "__main__":
main()