reticiulum-specification/tools/verify_link_lxmf.py
John Poole 7ffbb0ef5e Completed the full link-delivered LXMF unit:
Tier 1 audit: `link-lxmf-tier1-rns-1.2.4-lxmf-0.9.7.md`
Tier 2 vectors/verifier: link-lxmf.json, regen_link_lxmf.py, and verify_link_lxmf.py
Tier 3 promotion: updated SPEC.md, flows, status, and documentation
Key correction: the 319/320 boundary uses upstream’s computed LXMF content_size, not simply raw message content length.
Also corrected stale flow descriptions for KEEPALIVE (0xFA) and encrypted LINKCLOSE teardown (0xFC).
Verification:
Deterministic vector regeneration: identical SHA-256
Portable-path and formatting checks: pass
Full pinned suite: 17 passed, 0 failed
2026-06-08 13:54:27 -07:00

205 lines
8.6 KiB
Python

"""
Verifier for DIRECT LXMF delivery over an established Reticulum Link.
Validates deterministic PACKET and Resource vectors, the exact upstream
content_size boundary (319/320), full-body DIRECT parsing, wrong-link-key
rejection, and LXMRouter's DIRECT receive dispatch.
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
import LXMF
import RNS
from LXMF.LXMessage import LXMessage
from LXMF.LXMRouter import LXMRouter
from RNS.Cryptography.Token import Token
from RNS.Resource import Resource, ResourceAdvertisement
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
VECTORS_PATH = os.path.join(REPO_ROOT, "test-vectors", "link-lxmf.json")
IDS_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
LINKS_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
def fail(message: str) -> None:
print(f"FAIL: {message}")
sys.exit(1)
def init_minimal_rns():
config_dir = tempfile.mkdtemp(prefix="rns-verify-link-lxmf-")
config_path = os.path.join(config_dir, "config")
with open(config_path, "w", encoding="utf-8") as config:
config.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=config_dir, loglevel=0)
def load_json(path: str):
with open(path, "r", encoding="utf-8") as input_file:
return json.load(input_file)
def content_size(packed: bytes) -> int:
payload = packed[2 * LXMessage.DESTINATION_LENGTH + LXMessage.SIGNATURE_LENGTH :]
return len(payload) - LXMessage.TIMESTAMP_SIZE - LXMessage.STRUCT_OVERHEAD
def parse_and_check(packed: bytes, expected_size: int) -> LXMessage:
parsed = LXMessage.unpack_from_bytes(packed)
if not parsed.signature_validated:
fail(f"DIRECT LXMF signature invalid at content_size {expected_size}")
if len(parsed.content) != expected_size or parsed.content != b"x" * expected_size:
fail(f"DIRECT LXMF content mismatch at content_size {expected_size}")
if parsed.method != LXMessage.UNKNOWN:
fail("unpack_from_bytes unexpectedly assigned a delivery method")
return parsed
def verify_vectors(derived_key: bytes, link_id: bytes) -> tuple[bytes, bytes]:
vectors = load_json(VECTORS_PATH)["vectors"]
packet_vector = next(vector for vector in vectors if vector["expected"]["representation"] == LXMessage.PACKET)
resource_vector = next(vector for vector in vectors if vector["expected"]["representation"] == LXMessage.RESOURCE)
token = Token(derived_key)
packet_packed = bytes.fromhex(packet_vector["expected"]["lxmf_packed_hex"])
packet_raw = bytes.fromhex(packet_vector["expected"]["link_packet_raw_hex"])
packet_ciphertext = bytes.fromhex(packet_vector["expected"]["link_packet_ciphertext_hex"])
if content_size(packet_packed) != 319 or len(packet_packed) != RNS.Link.MDU:
fail("319 boundary vector does not fill the Link MDU exactly")
if len(packet_raw) != 19 + len(packet_ciphertext) or len(packet_raw) > RNS.Reticulum.MTU:
fail("319 boundary vector has an invalid Link packet wire length")
if packet_raw[:2] != bytes([RNS.Destination.LINK << 2, 0]):
fail("DIRECT PACKET vector flags/hops are not HEADER_1 DATA LINK")
if packet_raw[2:18] != link_id or packet_raw[18] != RNS.Packet.NONE:
fail("DIRECT PACKET vector link_id/context mismatch")
if packet_raw[19:] != packet_ciphertext:
fail("DIRECT PACKET raw body differs from recorded ciphertext")
if token.decrypt(packet_ciphertext) != packet_packed:
fail("DIRECT PACKET did not decrypt to the complete LXMF body")
parse_and_check(packet_packed, 319)
resource_packed = bytes.fromhex(resource_vector["expected"]["lxmf_packed_hex"])
parts = [bytes.fromhex(part["body_hex"]) for part in resource_vector["expected"]["resource_parts"]]
stream = b"".join(parts)
decrypted = token.decrypt(stream)
prefix = bytes.fromhex(resource_vector["inputs"]["throwaway_prefix_hex"])
if decrypted[: Resource.RANDOM_HASH_SIZE] != prefix:
fail("DIRECT Resource throwaway prefix mismatch")
if decrypted[Resource.RANDOM_HASH_SIZE :] != resource_packed:
fail("DIRECT Resource did not decrypt to the complete LXMF body")
if content_size(resource_packed) != 320:
fail("Resource boundary vector computed content_size is not 320")
parse_and_check(resource_packed, 320)
adv = ResourceAdvertisement.unpack(
bytes.fromhex(resource_vector["expected"]["resource_advertisement_plaintext_hex"])
)
if adv.d != len(resource_packed) or adv.h.hex() != resource_vector["expected"]["resource_hash_hex"]:
fail("DIRECT Resource advertisement size/hash mismatch")
try:
Token(bytes(reversed(derived_key))).decrypt(packet_ciphertext)
except Exception:
pass
else:
fail("DIRECT packet decrypted under the wrong Link key")
print("PASS S5.2/S10.1 DIRECT PACKET/RESOURCE boundary vectors: 319 -> PACKET, 320 -> RESOURCE")
print("PASS S3/S5.5/S5.6 DIRECT Link decrypt and signed full-body parse; wrong Link key rejected")
return packet_packed, resource_packed
def verify_upstream_boundary(alice_identity, bob_identity) -> None:
alice = RNS.Destination(
alice_identity, RNS.Destination.IN, RNS.Destination.SINGLE, "verify_link_lxmf", "alice"
)
bob = RNS.Destination(
bob_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "verify_link_lxmf", "bob"
)
for size, expected in [(319, LXMessage.PACKET), (320, LXMessage.RESOURCE)]:
message = LXMessage(bob, alice, b"x" * size, b"", {}, LXMessage.DIRECT)
message.timestamp = 1700000000.0
message.pack()
if content_size(message.packed) != size or message.representation != expected:
fail(f"upstream boundary mismatch at {size}")
print("PASS S10.1 upstream selection uses computed LXMF content_size at the 319/320 boundary")
def verify_router_direct_dispatch(packet_packed: bytes) -> None:
router = object.__new__(LXMRouter)
captured = {}
router.lxmf_delivery = lambda data, destination_type, **kwargs: captured.update(
data=data, destination_type=destination_type, method=kwargs.get("method")
)
class ImmediateThread:
def __init__(self, target, daemon=None):
self.target = target
def start(self):
self.target()
class FakePacket:
destination_type = RNS.Destination.LINK
rssi = -80
snr = 7
q = 90
ratchet_id = bytes.fromhex("00112233445566778899aabbccddeeff")
proved = False
def prove(self):
self.proved = True
packet = FakePacket()
router_mod = sys.modules["LXMF.LXMRouter"]
real_thread = router_mod.threading.Thread
router_mod.threading.Thread = ImmediateThread
try:
router.delivery_packet(packet_packed, packet)
finally:
router_mod.threading.Thread = real_thread
if not packet.proved:
fail("LXMRouter.delivery_packet did not prove DIRECT Link DATA")
if captured.get("data") != packet_packed:
fail("LXMRouter.delivery_packet altered or prepended DIRECT LXMF data")
if captured.get("destination_type") != RNS.Destination.LINK:
fail("LXMRouter.delivery_packet lost LINK destination type")
if captured.get("method") != LXMessage.DIRECT:
fail("LXMRouter.delivery_packet did not classify LINK data as DIRECT")
print("PASS S5.2 DIRECT receive dispatch proves DATA and passes full LXMF body unchanged")
def main() -> None:
print(f"verify_link_lxmf.py against RNS {RNS.__version__} / LXMF {LXMF.__version__}")
init_minimal_rns()
try:
identities = load_json(IDS_PATH)["vectors"]
links = load_json(LINKS_PATH)["vectors"][0]
alice = next(vector for vector in identities if vector["label"] == "alice")
bob = next(vector for vector in identities if vector["label"] == "bob")
alice_identity = RNS.Identity.from_bytes(bytes.fromhex(alice["inputs"]["private_key_hex"]))
bob_identity = RNS.Identity.from_bytes(bytes.fromhex(bob["inputs"]["private_key_hex"]))
RNS.Identity.remember(bytes(32), bytes.fromhex(alice["expected"]["destination_hash_hex"]), alice_identity.get_public_key(), None)
derived_key = bytes.fromhex(links["expected"]["derived_key_hex"])
link_id = bytes.fromhex(links["expected"]["link_id_hex"])
packet_packed, _ = verify_vectors(derived_key, link_id)
verify_upstream_boundary(alice_identity, bob_identity)
verify_router_direct_dispatch(packet_packed)
print("ALL PASS")
finally:
try:
RNS.Reticulum.exit_handler()
except Exception:
pass
if __name__ == "__main__":
main()