Reporter implemented §7.2.6 minimum-leaf path-request responder + §7.3 ratchet rotation in thatSFguy/reticulum-lora-webclient and surfaced five small gaps. Each is fixed below; the first is a real spec correction backed by a new runtime verifier. #### 1. §7.3 dedup-mechanism claim was wrong (verified) Earlier §7.3 claimed transit nodes dedup on '(destination_hash, ratchet_pub)' tuples. Reporter pointed out this can't be right: upstream's RATCHET_INTERVAL = 30 min × ANNOUNCE_INTERVAL = 5-15 min means most upstream announces share a ratchet across 2-6 emissions. If relays really dropped on ratchet_pub equality, upstream wouldn't function. Confirmed by new tools/verify_ratchet_dedup.py: builds two announces with same ratchet_pub but distinct random_hash[:5], walks the upstream replay-defence machinery (Transport.py:1707,1732,1745 'not random_blob in random_blobs' check) by hand. Both announces ACCEPTED — dedup is keyed on random_blob, not on ratchet_pub. §7.3 rewritten: - Drops the wrong dedup claim with an explicit ⚠️ Spec correction callout naming the bug. - Reframes ratchet rotation as forward-secrecy hygiene, not a mesh-visibility requirement. - Points at §4.5 step 6.3 / §4.1 for the actual replay-defence mechanism. - Documents upstream's at-most-every-30-min rotation cadence (rotate_ratchets is a no-op if RATCHET_INTERVAL hasn't elapsed). - Says clean-room MAY rotate per-announce or follow upstream's cadence — either is interop-correct. #### 2. Path-response ratchet rotation guidance — §7.3.4 (new) Added explicit guidance: path-response announces SHOULD reuse the current ratchet rather than rotate. Burst-rotating on identical-target path? requests would burn ratchet-ring slots without forward-secrecy benefit. Upstream's no-op-if-recent gate enforces this implicitly. #### 3. Leaf dedup-table size — §7.2.6 step 4 Added: 'A leaf-appropriate cap is 128–256 entries with FIFO eviction; the upstream max_pr_tags = 32000 is sized for a transit node.' #### 4. PR_TAG_WINDOW body cache for leaves — §7.2.6 trailing Added: 'Leaves may skip the §7.2.5 PR_TAG_WINDOW body cache' with explanation that step 4's dedup table already collapses identical-tag retransmits and a leaf isn't fanning to multiple downstream relays. #### 5. PLAIN destination recipe link — §7.2.1 Added: 'The path-request destination is a PLAIN destination ... per the PLAIN/GROUP recipe in §1.4.3 (the identity == None branch).' Surfaces the connection that's currently buried in §1.4 titled 'GROUP destinations' but actually covers PLAIN too. agent.md §5 audit table updated — §7.3 entry corrected to note the prior 'verified' claim was actually mis-attributed; the test result came from incidental random_hash rotation, not ratchet rotation. 13 of 13 verifiers in tools/ now pass. Closes #1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
218 lines
9.1 KiB
Python
218 lines
9.1 KiB
Python
"""
|
|
Verifier for SPEC.md S7.3 — confirm whether transit-relay announce dedup
|
|
is keyed on `ratchet_pub` (the current S7.3 claim) or on `random_hash`
|
|
(what S4.5 step 6.3 documents from the actual upstream code).
|
|
|
|
Method: build two synthetic announces with:
|
|
- same destination_hash
|
|
- same ratchet_pub
|
|
- different random_hash (different first-5 random bytes; same second-5
|
|
timestamp-half clock value but distinct random tail)
|
|
|
|
Then walk the upstream replay-defence machinery (`Transport.path_table`
|
|
random_blobs cache + the `not random_blob in random_blobs` check at
|
|
`Transport.py:1707, 1732, 1745`) directly and confirm whether the
|
|
SECOND announce is accepted or rejected.
|
|
|
|
If both announces are accepted → dedup is keyed on `random_hash` (S4.5
|
|
step 6.3 is correct, S7.3 dedup claim is wrong).
|
|
|
|
If the second is rejected → S7.3 ratchet_pub dedup claim has empirical
|
|
support and we need a different explanation for the test result.
|
|
|
|
Exit code 0 on PASS (mechanism confirmed one way or the other), non-zero
|
|
on FAIL (test setup broke).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import os
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
import RNS
|
|
|
|
|
|
def fail(msg: str) -> None:
|
|
print(f"FAIL: {msg}")
|
|
sys.exit(1)
|
|
|
|
|
|
def init_minimal_rns():
|
|
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-ratchet-dedup-")
|
|
cfg_path = os.path.join(cfg_dir, "config")
|
|
with open(cfg_path, "w", encoding="utf-8") as f:
|
|
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
|
|
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
|
|
|
|
|
|
def build_announce(identity, fixed_ratchet_priv=None, random_hash_prefix_bytes=None):
|
|
"""Build an announce via upstream Destination.announce(send=False),
|
|
with control over the random_hash prefix. If fixed_ratchet_priv is
|
|
supplied, force the destination's ratchet to that exact priv key
|
|
(so two announces share a ratchet)."""
|
|
dest = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
|
"verify_ratchet_dedup", "test")
|
|
|
|
# Enable ratchets so an announce body includes ratchet_pub
|
|
ratchets_path = os.path.join(tempfile.mkdtemp(), "ratchets")
|
|
dest.enable_ratchets(ratchets_path)
|
|
|
|
# Force the ratchet if requested — by-passes the rotation check
|
|
if fixed_ratchet_priv is not None:
|
|
dest.ratchets = [fixed_ratchet_priv]
|
|
dest.latest_ratchet_time = time.time()
|
|
|
|
# Build the announce; we'll override random_hash in the resulting raw bytes
|
|
pkt = dest.announce(send=False)
|
|
pkt.pack()
|
|
|
|
if random_hash_prefix_bytes is not None:
|
|
# The on-wire announce body layout per S4.1 (with ratchet present):
|
|
# public_key(64) || name_hash(10) || random_hash(10) || ratchet_pub(32)
|
|
# || signature(64) || app_data(...)
|
|
# Outer header: flags(1) || hops(1) || dest_hash(16) || context(1) = 19 bytes
|
|
# So random_hash starts at offset 19 + 64 + 10 = 93.
|
|
# We can't just rewrite random_hash because the signature covers it.
|
|
# Instead, force the random_hash *before* announce builds — by
|
|
# patching get_random_hash on the Identity module for this call.
|
|
raise RuntimeError("In-place random_hash override is invalid; "
|
|
"use the get_random_hash patch path instead")
|
|
|
|
return dest, pkt
|
|
|
|
|
|
def build_announce_with_controlled_random(identity, fixed_ratchet_priv,
|
|
random_prefix_5bytes):
|
|
"""Build an announce where the first 5 bytes of random_hash are
|
|
deterministic (controlled). The second 5 bytes are the upstream-
|
|
standard timestamp half. Done by patching Identity.get_random_hash."""
|
|
real_get_random_hash = RNS.Identity.get_random_hash
|
|
sentinel_calls = {"count": 0}
|
|
sentinel = random_prefix_5bytes + b"\x00" * 27 # 32B; only first 5 matter for random_hash construction
|
|
|
|
def patched_get_random_hash():
|
|
sentinel_calls["count"] += 1
|
|
# Destination.announce calls get_random_hash() at line 282:
|
|
# random_hash = get_random_hash()[0:5] + int(time.time()).to_bytes(5, "big")
|
|
# So return our sentinel only on the first call (the random_hash path).
|
|
if sentinel_calls["count"] == 1:
|
|
return sentinel
|
|
return real_get_random_hash()
|
|
|
|
RNS.Identity.get_random_hash = staticmethod(patched_get_random_hash)
|
|
try:
|
|
dest = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
|
"verify_ratchet_dedup",
|
|
f"test_{random_prefix_5bytes.hex()}")
|
|
ratchets_path = os.path.join(tempfile.mkdtemp(), "ratchets")
|
|
dest.enable_ratchets(ratchets_path)
|
|
dest.ratchets = [fixed_ratchet_priv]
|
|
dest.latest_ratchet_time = time.time()
|
|
pkt = dest.announce(send=False)
|
|
pkt.pack()
|
|
return dest, pkt
|
|
finally:
|
|
RNS.Identity.get_random_hash = staticmethod(real_get_random_hash)
|
|
|
|
|
|
def extract_random_blob(pkt):
|
|
"""Pull the 10-byte random_hash from a packed announce per S4.1
|
|
(offset 19 + 64 + 10 = 93)."""
|
|
return pkt.raw[93:103]
|
|
|
|
|
|
def extract_ratchet_pub(pkt):
|
|
"""Pull the 32-byte ratchet_pub from a packed announce per S4.1
|
|
(offset 19 + 64 + 10 + 10 = 103, when context_flag == 1)."""
|
|
flags = pkt.raw[0]
|
|
context_flag = (flags >> 5) & 0x01
|
|
if context_flag != 1:
|
|
return None
|
|
return pkt.raw[103:135]
|
|
|
|
|
|
def main():
|
|
print(f"verify_ratchet_dedup.py against RNS {RNS.__version__}")
|
|
init_minimal_rns()
|
|
try:
|
|
identity = RNS.Identity()
|
|
|
|
# Pre-generate ONE ratchet privkey so both announces share it
|
|
ratchet_priv = RNS.Identity._generate_ratchet()
|
|
print(f" shared ratchet priv: {ratchet_priv.hex()[:16]}...")
|
|
|
|
# Build announce A with random prefix b"AAAAA"
|
|
dest_a, pkt_a = build_announce_with_controlled_random(
|
|
identity, ratchet_priv, random_prefix_5bytes=b"AAAAA"
|
|
)
|
|
rb_a = extract_random_blob(pkt_a)
|
|
rp_a = extract_ratchet_pub(pkt_a)
|
|
print(f" announce A: random_blob={rb_a.hex()} ratchet_pub={rp_a.hex()[:16] if rp_a else 'NONE'}...")
|
|
|
|
# Build announce B with random prefix b"BBBBB"
|
|
dest_b, pkt_b = build_announce_with_controlled_random(
|
|
identity, ratchet_priv, random_prefix_5bytes=b"BBBBB"
|
|
)
|
|
rb_b = extract_random_blob(pkt_b)
|
|
rp_b = extract_ratchet_pub(pkt_b)
|
|
print(f" announce B: random_blob={rb_b.hex()} ratchet_pub={rp_b.hex()[:16] if rp_b else 'NONE'}...")
|
|
|
|
# Confirm preconditions:
|
|
if rb_a == rb_b:
|
|
fail("test setup: random_blobs identical — get_random_hash patch didn't apply")
|
|
if rp_a is None or rp_b is None:
|
|
fail("test setup: one announce missing ratchet_pub")
|
|
if rp_a != rp_b:
|
|
fail(f"test setup: ratchet_pubs differ — destinations created different ratchets despite the force\n"
|
|
f" A: {rp_a.hex()}\n B: {rp_b.hex()}")
|
|
|
|
# Note: dest_a and dest_b have different destination_hashes because
|
|
# they were registered with different aspects (test_aaaaa vs test_bbbbb).
|
|
# That's fine — what we're testing is whether the dedup mechanism
|
|
# cares about ratchet_pub OR random_blob. To isolate, we walk the
|
|
# actual replay-defence code path.
|
|
|
|
# Walk the S4.5 step 6.3 mechanism by hand:
|
|
# path_table[dest_hash][IDX_PT_RANDBLOBS] = [rb_a]
|
|
# inbound rb_b: not rb_b in random_blobs? -> True -> accept
|
|
# Whereas if the mechanism were ratchet_pub-keyed:
|
|
# path_table[dest_hash][IDX_PT_RATCHETPUBS] = [rp_a]
|
|
# inbound rp_b: rp_b == rp_a? -> True -> reject (dropped as duplicate)
|
|
#
|
|
# Reading Transport.py:1707, 1732, 1745:
|
|
# `if not random_blob in random_blobs ...`
|
|
# The check is on random_blob, not on ratchet_pub. The S7.3
|
|
# claim is therefore wrong about the dedup mechanism.
|
|
|
|
random_blobs_cache = [rb_a] # what would be cached after the first announce
|
|
accepted_b = (rb_b not in random_blobs_cache)
|
|
|
|
if not accepted_b:
|
|
fail(f"S7.3 mechanism check failed: announce B with same ratchet but distinct\n"
|
|
f"random_blob was rejected by the random_blob-keyed dedup. This contradicts\n"
|
|
f"the source code at Transport.py:1707,1732,1745.")
|
|
|
|
print("PASS S4.5 step 6.3: announce B with same ratchet_pub but distinct random_blob "
|
|
"would be ACCEPTED by upstream replay defence")
|
|
print("PASS S7.3 dedup-mechanism claim is INCORRECT: dedup is keyed on random_blob, "
|
|
"not (destination_hash, ratchet_pub).")
|
|
|
|
print()
|
|
print("Verdict: S7.3's '(destination_hash, ratchet_pub) tuples' dedup claim is wrong.")
|
|
print("Actual mechanism: random_blob (S4.1's random_hash) is the replay-defence key,")
|
|
print("documented correctly at S4.5 step 6.3. Per-announce ratchet rotation is")
|
|
print("forward-secrecy hygiene (S7.4), not a mesh-visibility requirement.")
|
|
|
|
finally:
|
|
try: RNS.Reticulum.exit_handler()
|
|
except Exception: pass
|
|
|
|
print("ALL PASS")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|