Upstream RNS enforces two requirements in code that SPEC.md left implicit;
both caused silent message loss in a clean-room Go LXMF service against
upstream Python rns 1.2.4 / lxmf 0.9.7.
§6.4.2 LRRTT — initiator's link-activation packet
- HEADER_1, DATA, dest_type=LINK (0x03), ctx=0xfe; body is
`umsgpack.packb(rtt_seconds)` encrypted with the link's session keys.
- The responder transitions HANDSHAKE→ACTIVE only on LRRTT receipt
(Link.py:534-553), which is also what fires the link_established
callback. LXMF's set_resource_strategy(ACCEPT_APP) is installed
from that callback; without it, every RESOURCE_ADV the initiator
sends hits the silent ACCEPT_NONE branch at Link.py:1087.
§6.4.3 Header type for post-handshake DATA and Resource
- Link-addressed packets are routed via link_table, which forwards
header bytes verbatim (Transport.py:1587-1622). HEADER_2 with a
relay's transport_id therefore arrives at the destination intact
and is dropped by packet_filter (Transport.py:1283-1285) as
"for another transport instance".
- Mandates HEADER_1 with no transport_id for all post-handshake
link DATA / Resource / control packets regardless of hop count.
- Asymmetry with LINKREQUEST (which IS path_table-routed and so
HEADER_2-eligible) is spelled out.
Companion changes:
- §6.4 renamed to "Session keys and link activation"; existing
HKDF content moved into §6.4.1.
- §2.5 LRRTT context-byte entry points at §6.4.2.
- §12.5.2 (Link DATA forwarding) cross-references §6.4.3.
- §14 failure-modes table: two new entries for the silent-drop
chains documented above.
- flows/send-link-lxmf.md step 4 strengthened (LRRTT is mandatory,
not informational); step 6 corrected (Transport.outbound does NOT
apply HEADER_1→HEADER_2 for link DATA — that conversion is
path_table-keyed, link DATA is link_table-keyed).
- test-vectors/links.json extended with an LRRTT entry: pinned
rtt_seconds=0.05 + pinned 16-byte IV produces deterministic
wire bytes for the encrypted body.
- tools/regen_links.py drives the LRRTT generation with an
os.urandom patch for the Token IV.
- tools/verify_link_lrrtt.py (new) locks the wire claims:
HEADER_1, ctx=0xfe, dest=link_id, body decrypts under
derived_key to msgpack float64 matching rtt_seconds.
Citations all verified against installed RNS 1.2.4 / LXMF 0.9.7.
All 14 verifiers PASS.
343 lines
16 KiB
Python
343 lines
16 KiB
Python
"""
|
|
Regenerator for test-vectors/links.json.
|
|
|
|
Builds a deterministic Link handshake vector pair (LINKREQUEST + LRPROOF)
|
|
between Alice (initiator) and Bob (responder) using identities from
|
|
`identities.json`.
|
|
|
|
Determinism inputs:
|
|
|
|
- Alice and Bob long-term identities (fixed via identities.json).
|
|
- Initiator's ephemeral X25519 private key (`Link.prv`,
|
|
`RNS/Link.py:285`) — fixed.
|
|
- Initiator's ephemeral Ed25519 private key (`Link.sig_prv`,
|
|
`RNS/Link.py:286`) — fixed. (This is *not* Alice's long-term key;
|
|
Link's signing key is generated fresh per handshake.)
|
|
- Responder's ephemeral X25519 private key (`Link.prv`,
|
|
`RNS/Link.py:278`) — fixed.
|
|
- Both Ed25519 signatures are deterministic per RFC 8032.
|
|
|
|
Outputs recorded per vector:
|
|
|
|
- `linkrequest_raw_hex` — full packed LINKREQUEST packet bytes
|
|
- `linkrequest_body_hex` — initiator_X25519_pub || initiator_Ed25519_pub
|
|
[|| signalling] (S6.1)
|
|
- `link_id_hex` — derived per S6.3 (N=2 for HEADER_1)
|
|
- `lrproof_raw_hex` — full packed LRPROOF packet bytes
|
|
- `lrproof_body_hex` — signature || responder_X25519_pub
|
|
[|| signalling] (S6.2)
|
|
- `derived_key_hex` — HKDF output for AES256_CBC mode
|
|
(length=64, salt=link_id, context=context)
|
|
— both sides must arrive at the same
|
|
bytes after handshake().
|
|
|
|
Run from repo root:
|
|
|
|
python tools/regen_links.py
|
|
|
|
Updates `test-vectors/links.json` in place. Exit 0 on success.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
import RNS
|
|
from RNS.Link import Link
|
|
from RNS.vendor import umsgpack
|
|
|
|
|
|
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
OUT_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
|
|
IDS_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
|
|
|
|
|
|
# Stable inputs. Three distinct fixed priv key blobs for the three
|
|
# ephemeral generations the handshake performs.
|
|
INITIATOR_X25519_PRIV = bytes.fromhex("11"*32)
|
|
INITIATOR_ED25519_PRIV = bytes.fromhex("22"*32)
|
|
RESPONDER_X25519_PRIV = bytes.fromhex("33"*32)
|
|
|
|
# Pinned LRRTT inputs. The IV here gets injected via os.urandom while
|
|
# packing the LRRTT packet so the wire bytes are reproducible. RTT
|
|
# value is the initiator's measured LRREQ->LRPROOF time per S6.4.2;
|
|
# the exact value is non-load-bearing (responder takes max with its
|
|
# own measurement) but pinning it keeps the wire bytes deterministic.
|
|
LRRTT_RTT_SECONDS = 0.05
|
|
LRRTT_IV = bytes.fromhex("44"*16)
|
|
|
|
|
|
def fail(msg: str) -> None:
|
|
print(f"FAIL: {msg}")
|
|
sys.exit(1)
|
|
|
|
|
|
def init_minimal_rns():
|
|
cfg_dir = tempfile.mkdtemp(prefix="rns-regen-links-")
|
|
cfg_path = os.path.join(cfg_dir, "config")
|
|
with open(cfg_path, "w", encoding="utf-8") as f:
|
|
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
|
|
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
|
|
|
|
|
|
def load_identities():
|
|
with open(IDS_PATH, "r", encoding="utf-8") as f:
|
|
ids = json.load(f)
|
|
alice = next(v for v in ids["vectors"] if v["label"] == "alice")
|
|
bob = next(v for v in ids["vectors"] if v["label"] == "bob")
|
|
alice_id = RNS.Identity.from_bytes(bytes.fromhex(alice["inputs"]["private_key_hex"]))
|
|
bob_id = RNS.Identity.from_bytes(bytes.fromhex(bob ["inputs"]["private_key_hex"]))
|
|
if alice_id is None or bob_id is None:
|
|
fail("Identity.from_bytes returned None for one of alice/bob")
|
|
return alice_id, bob_id
|
|
|
|
|
|
class _StaticPrivQueue:
|
|
"""Wraps an upstream X25519/Ed25519 dispatcher class so that
|
|
`.generate()` returns a private key seeded with the next blob in a
|
|
fixed queue. Each consumed blob is removed; if the queue underflows
|
|
we delegate to the real `.generate()` so non-handshake code paths
|
|
still work."""
|
|
def __init__(self, real_cls, blobs):
|
|
self._real_cls = real_cls
|
|
self._blobs = list(blobs)
|
|
|
|
def generate(self):
|
|
if self._blobs:
|
|
blob = self._blobs.pop(0)
|
|
return self._real_cls.from_private_bytes(blob)
|
|
return self._real_cls.generate()
|
|
|
|
# Defer everything else (from_private_bytes, from_public_bytes, etc.)
|
|
# to the real class so the rest of upstream's surface keeps working.
|
|
def __getattr__(self, name):
|
|
return getattr(self._real_cls, name)
|
|
|
|
|
|
def main():
|
|
print(f"regen_links.py against RNS {RNS.__version__}")
|
|
init_minimal_rns()
|
|
try:
|
|
alice_id, bob_id = load_identities()
|
|
|
|
# Bob's destination — the link target. Mark Bob's identity
|
|
# discoverable to Alice so Link initiator can find pub keys.
|
|
bob_dest_in = RNS.Destination(bob_id, RNS.Destination.IN, RNS.Destination.SINGLE,
|
|
"vectors", "link")
|
|
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
|
"vectors", "link")
|
|
RNS.Identity.remember(b"\x00"*32, bob_dest_in.hash, bob_id.get_public_key(), None)
|
|
|
|
# Patch the X25519 / Ed25519 generators that Link.__init__ uses
|
|
# to drive deterministic ephemeral keys. Order of consumption
|
|
# follows the source:
|
|
# line 285: initiator self.prv = X25519PrivateKey.generate()
|
|
# line 286: initiator self.sig_prv = Ed25519PrivateKey.generate()
|
|
# line 278: responder self.prv = X25519PrivateKey.generate()
|
|
link_mod = sys.modules["RNS.Link"]
|
|
real_X25519 = link_mod.X25519PrivateKey
|
|
real_Ed25519 = link_mod.Ed25519PrivateKey
|
|
|
|
link_mod.X25519PrivateKey = _StaticPrivQueue(real_X25519,
|
|
[INITIATOR_X25519_PRIV,
|
|
RESPONDER_X25519_PRIV])
|
|
link_mod.Ed25519PrivateKey = _StaticPrivQueue(real_Ed25519,
|
|
[INITIATOR_ED25519_PRIV])
|
|
|
|
# Capture LINKREQUEST emission. Patch Transport.outbound (not
|
|
# Packet.send) so Packet.pack() runs and packet.raw is populated.
|
|
captured_lr = {}
|
|
captured_pf = {}
|
|
real_outbound = RNS.Transport.outbound
|
|
|
|
def fake_outbound(packet):
|
|
if packet.packet_type == RNS.Packet.LINKREQUEST and "raw" not in captured_lr:
|
|
captured_lr["raw"] = packet.raw
|
|
captured_lr["data"] = packet.data
|
|
elif packet.context == RNS.Packet.LRPROOF and "raw" not in captured_pf:
|
|
captured_pf["raw"] = packet.raw
|
|
captured_pf["data"] = packet.data
|
|
return True
|
|
|
|
RNS.Transport.outbound = staticmethod(fake_outbound)
|
|
try:
|
|
# 1. Build initiator-side Link → LINKREQUEST emitted via outbound
|
|
initiator = Link(destination=bob_dest_out)
|
|
|
|
# 2. Walk the responder side by hand (Link.validate_request
|
|
# inlined). This keeps the script self-contained.
|
|
inbound = RNS.Packet(None, captured_lr["raw"])
|
|
if not inbound.unpack():
|
|
fail("Failed to unpack the captured LINKREQUEST")
|
|
inbound.destination = bob_dest_in
|
|
|
|
request_data = captured_lr["data"]
|
|
responder = Link(
|
|
owner = bob_dest_in,
|
|
peer_pub_bytes = request_data[:Link.ECPUBSIZE//2],
|
|
peer_sig_pub_bytes = request_data[Link.ECPUBSIZE//2:Link.ECPUBSIZE],
|
|
)
|
|
responder.set_link_id(inbound)
|
|
|
|
if len(request_data) == Link.ECPUBSIZE + Link.LINK_MTU_SIZE:
|
|
responder.mtu = Link.mtu_from_lr_packet(inbound) or RNS.Reticulum.MTU
|
|
responder.mode = Link.mode_from_lr_packet(inbound)
|
|
|
|
responder.destination = inbound.destination
|
|
responder.handshake() # populates responder.derived_key
|
|
responder.prove() # emits LRPROOF via outbound
|
|
finally:
|
|
RNS.Transport.outbound = real_outbound
|
|
link_mod.X25519PrivateKey = real_X25519
|
|
link_mod.Ed25519PrivateKey = real_Ed25519
|
|
|
|
# 3. Drive the initiator's handshake by feeding it the captured
|
|
# LRPROOF, so its derived_key is computed too. We assert
|
|
# initiator.derived_key == responder.derived_key.
|
|
proof_pkt = RNS.Packet(None, captured_pf["raw"])
|
|
if not proof_pkt.unpack():
|
|
fail("Failed to unpack the captured LRPROOF")
|
|
# validate_proof needs the link instance to know its destination
|
|
proof_pkt.destination = initiator
|
|
# Set up the initiator state so validate_proof can run end-to-end
|
|
import time as _time
|
|
initiator.request_time = _time.time()
|
|
if not initiator.validate_proof(proof_pkt):
|
|
# Some versions return None on success, so don't hard-fail
|
|
# solely on that — but check derived_key was populated.
|
|
pass
|
|
if initiator.derived_key is None:
|
|
fail("Initiator derived_key not populated after validate_proof")
|
|
if responder.derived_key is None:
|
|
fail("Responder derived_key not populated after handshake")
|
|
if initiator.derived_key != responder.derived_key:
|
|
fail("Initiator and responder derived_key disagree:\n"
|
|
f" initiator: {initiator.derived_key.hex()}\n"
|
|
f" responder: {responder.derived_key.hex()}")
|
|
if initiator.link_id != responder.link_id:
|
|
fail(f"link_id disagree: ini={initiator.link_id.hex()} res={responder.link_id.hex()}")
|
|
|
|
# 4. Build the initiator's LRRTT packet (S6.4.2). The initiator
|
|
# emits this immediately after LRPROOF validation, before any
|
|
# application DATA. We pin os.urandom so the Token IV is
|
|
# deterministic; the rest of the wire form falls out of the
|
|
# link's derived_key (already populated above).
|
|
lrrtt_plaintext = umsgpack.packb(LRRTT_RTT_SECONDS)
|
|
token_mod = sys.modules["RNS.Cryptography.Token"]
|
|
real_urandom = token_mod.os.urandom
|
|
|
|
def fake_urandom(n):
|
|
if n == 16: return LRRTT_IV
|
|
return real_urandom(n)
|
|
|
|
token_mod.os.urandom = fake_urandom
|
|
try:
|
|
lrrtt_packet = RNS.Packet(initiator, lrrtt_plaintext,
|
|
context=RNS.Packet.LRRTT)
|
|
lrrtt_packet.pack()
|
|
finally:
|
|
token_mod.os.urandom = real_urandom
|
|
|
|
# Sanity round-trip: the responder side should decrypt
|
|
# the captured ciphertext and recover the same float.
|
|
if responder.decrypt(lrrtt_packet.ciphertext) != lrrtt_plaintext:
|
|
fail("LRRTT round-trip failed: responder.decrypt did not "
|
|
"recover the pinned plaintext.")
|
|
|
|
# Slice fields per S6.1 / S6.2 for human inspection
|
|
lr_data = captured_lr["data"]
|
|
ini_x25519_pub = lr_data[:32]
|
|
ini_ed25519_pub = lr_data[32:64]
|
|
lr_signalling = lr_data[64:] if len(lr_data) > 64 else b""
|
|
|
|
pf_data = captured_pf["data"]
|
|
pf_signature = pf_data[:64]
|
|
pf_responder_x25519 = pf_data[64:96]
|
|
pf_signalling = pf_data[96:] if len(pf_data) > 96 else b""
|
|
|
|
vector = {
|
|
"label": "alice_to_bob_aes256cbc",
|
|
"inputs": {
|
|
"initiator_identity_label": "alice",
|
|
"responder_identity_label": "bob",
|
|
"destination_full_name": "vectors.link",
|
|
"initiator_x25519_priv_hex": INITIATOR_X25519_PRIV.hex(),
|
|
"initiator_ed25519_priv_hex": INITIATOR_ED25519_PRIV.hex(),
|
|
"responder_x25519_priv_hex": RESPONDER_X25519_PRIV.hex(),
|
|
"mode": "MODE_AES256_CBC (0x01)",
|
|
},
|
|
"expected": {
|
|
"linkrequest_raw_hex": captured_lr["raw"].hex(),
|
|
"linkrequest_body_hex": lr_data.hex(),
|
|
"linkrequest_fields": {
|
|
"initiator_x25519_pub_hex": ini_x25519_pub.hex(),
|
|
"initiator_ed25519_pub_hex": ini_ed25519_pub.hex(),
|
|
"signalling_hex": lr_signalling.hex(),
|
|
},
|
|
"link_id_hex": initiator.link_id.hex(),
|
|
"lrproof_raw_hex": captured_pf["raw"].hex(),
|
|
"lrproof_body_hex": pf_data.hex(),
|
|
"lrproof_fields": {
|
|
"signature_hex": pf_signature.hex(),
|
|
"responder_x25519_pub_hex": pf_responder_x25519.hex(),
|
|
"signalling_hex": pf_signalling.hex(),
|
|
},
|
|
"shared_secret_hex": initiator.shared_key.hex(),
|
|
"derived_key_hex": initiator.derived_key.hex(),
|
|
"mtu": initiator.mtu,
|
|
"mode": initiator.mode,
|
|
"lrrtt": {
|
|
"rtt_seconds": LRRTT_RTT_SECONDS,
|
|
"iv_hex": LRRTT_IV.hex(),
|
|
"plaintext_hex": lrrtt_plaintext.hex(),
|
|
"raw_hex": lrrtt_packet.raw.hex(),
|
|
"body_hex": lrrtt_packet.ciphertext.hex(),
|
|
},
|
|
},
|
|
"rns_version_at_generation": RNS.__version__,
|
|
"generator_script": "tools/regen_links.py",
|
|
"verifies_spec_sections": ["6.1", "6.2", "6.3", "6.4.1",
|
|
"6.4.2", "6.6"],
|
|
}
|
|
|
|
payload = {
|
|
"_about": (
|
|
"Link handshake test vectors. Each vector records a full "
|
|
"Reticulum Link handshake: LINKREQUEST (initiator -> "
|
|
"responder) and LRPROOF (responder -> initiator). The "
|
|
"ephemeral X25519/Ed25519 keys are pinned via the "
|
|
"`inputs.*_priv_hex` blobs; both Ed25519 signatures are "
|
|
"RFC 8032 deterministic so the resulting wire bytes are "
|
|
"reproducible. A clean-room implementation can verify by: "
|
|
"(a) packing a LINKREQUEST from the recorded initiator "
|
|
"ephemerals and confirming bytes match `linkrequest_raw_hex`; "
|
|
"(b) computing `link_id` per SPEC.md S6.3 (N=2 for HEADER_1) "
|
|
"and matching `link_id_hex`; (c) packing an LRPROOF as the "
|
|
"responder, with bob's identity Ed25519 sig over `link_id || "
|
|
"responder_X25519_pub || responder_long_term_Ed25519_pub || "
|
|
"signalling`, and matching `lrproof_raw_hex`; (d) running "
|
|
"ECDH+HKDF on either side and matching `derived_key_hex`; "
|
|
"(e) building an LRRTT packet (S6.4.2) addressed to the "
|
|
"link_id with `context=LRRTT (0xfe)` and an encrypted body "
|
|
"of `umsgpack.packb(lrrtt.rtt_seconds)`, using `lrrtt.iv_hex` "
|
|
"as the Token IV, and matching `lrrtt.raw_hex` / `lrrtt.body_hex`. "
|
|
"Regenerate with `generator_script`."
|
|
),
|
|
"vectors": [vector],
|
|
}
|
|
with open(OUT_PATH, "w", encoding="utf-8", newline="\n") as f:
|
|
json.dump(payload, f, indent=2, sort_keys=False)
|
|
f.write("\n")
|
|
print(f"Wrote {OUT_PATH} with 1 vector")
|
|
print("ALL PASS")
|
|
finally:
|
|
try: RNS.Reticulum.exit_handler()
|
|
except Exception: pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|