Tier 1 audit: `link-lxmf-tier1-rns-1.2.4-lxmf-0.9.7.md` Tier 2 vectors/verifier: link-lxmf.json, regen_link_lxmf.py, and verify_link_lxmf.py Tier 3 promotion: updated SPEC.md, flows, status, and documentation Key correction: the 319/320 boundary uses upstream’s computed LXMF content_size, not simply raw message content length. Also corrected stale flow descriptions for KEEPALIVE (0xFA) and encrypted LINKCLOSE teardown (0xFC). Verification: Deterministic vector regeneration: identical SHA-256 Portable-path and formatting checks: pass Full pinned suite: 17 passed, 0 failed
205 lines
8.6 KiB
Python
205 lines
8.6 KiB
Python
"""
|
|
Verifier for DIRECT LXMF delivery over an established Reticulum Link.
|
|
|
|
Validates deterministic PACKET and Resource vectors, the exact upstream
|
|
content_size boundary (319/320), full-body DIRECT parsing, wrong-link-key
|
|
rejection, and LXMRouter's DIRECT receive dispatch.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
import LXMF
|
|
import RNS
|
|
from LXMF.LXMessage import LXMessage
|
|
from LXMF.LXMRouter import LXMRouter
|
|
from RNS.Cryptography.Token import Token
|
|
from RNS.Resource import Resource, ResourceAdvertisement
|
|
|
|
|
|
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
VECTORS_PATH = os.path.join(REPO_ROOT, "test-vectors", "link-lxmf.json")
|
|
IDS_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
|
|
LINKS_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
print(f"FAIL: {message}")
|
|
sys.exit(1)
|
|
|
|
|
|
def init_minimal_rns():
|
|
config_dir = tempfile.mkdtemp(prefix="rns-verify-link-lxmf-")
|
|
config_path = os.path.join(config_dir, "config")
|
|
with open(config_path, "w", encoding="utf-8") as config:
|
|
config.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
|
|
return RNS.Reticulum(configdir=config_dir, loglevel=0)
|
|
|
|
|
|
def load_json(path: str):
|
|
with open(path, "r", encoding="utf-8") as input_file:
|
|
return json.load(input_file)
|
|
|
|
|
|
def content_size(packed: bytes) -> int:
|
|
payload = packed[2 * LXMessage.DESTINATION_LENGTH + LXMessage.SIGNATURE_LENGTH :]
|
|
return len(payload) - LXMessage.TIMESTAMP_SIZE - LXMessage.STRUCT_OVERHEAD
|
|
|
|
|
|
def parse_and_check(packed: bytes, expected_size: int) -> LXMessage:
|
|
parsed = LXMessage.unpack_from_bytes(packed)
|
|
if not parsed.signature_validated:
|
|
fail(f"DIRECT LXMF signature invalid at content_size {expected_size}")
|
|
if len(parsed.content) != expected_size or parsed.content != b"x" * expected_size:
|
|
fail(f"DIRECT LXMF content mismatch at content_size {expected_size}")
|
|
if parsed.method != LXMessage.UNKNOWN:
|
|
fail("unpack_from_bytes unexpectedly assigned a delivery method")
|
|
return parsed
|
|
|
|
|
|
def verify_vectors(derived_key: bytes, link_id: bytes) -> tuple[bytes, bytes]:
|
|
vectors = load_json(VECTORS_PATH)["vectors"]
|
|
packet_vector = next(vector for vector in vectors if vector["expected"]["representation"] == LXMessage.PACKET)
|
|
resource_vector = next(vector for vector in vectors if vector["expected"]["representation"] == LXMessage.RESOURCE)
|
|
token = Token(derived_key)
|
|
|
|
packet_packed = bytes.fromhex(packet_vector["expected"]["lxmf_packed_hex"])
|
|
packet_raw = bytes.fromhex(packet_vector["expected"]["link_packet_raw_hex"])
|
|
packet_ciphertext = bytes.fromhex(packet_vector["expected"]["link_packet_ciphertext_hex"])
|
|
if content_size(packet_packed) != 319 or len(packet_packed) != RNS.Link.MDU:
|
|
fail("319 boundary vector does not fill the Link MDU exactly")
|
|
if len(packet_raw) != 19 + len(packet_ciphertext) or len(packet_raw) > RNS.Reticulum.MTU:
|
|
fail("319 boundary vector has an invalid Link packet wire length")
|
|
if packet_raw[:2] != bytes([RNS.Destination.LINK << 2, 0]):
|
|
fail("DIRECT PACKET vector flags/hops are not HEADER_1 DATA LINK")
|
|
if packet_raw[2:18] != link_id or packet_raw[18] != RNS.Packet.NONE:
|
|
fail("DIRECT PACKET vector link_id/context mismatch")
|
|
if packet_raw[19:] != packet_ciphertext:
|
|
fail("DIRECT PACKET raw body differs from recorded ciphertext")
|
|
if token.decrypt(packet_ciphertext) != packet_packed:
|
|
fail("DIRECT PACKET did not decrypt to the complete LXMF body")
|
|
parse_and_check(packet_packed, 319)
|
|
|
|
resource_packed = bytes.fromhex(resource_vector["expected"]["lxmf_packed_hex"])
|
|
parts = [bytes.fromhex(part["body_hex"]) for part in resource_vector["expected"]["resource_parts"]]
|
|
stream = b"".join(parts)
|
|
decrypted = token.decrypt(stream)
|
|
prefix = bytes.fromhex(resource_vector["inputs"]["throwaway_prefix_hex"])
|
|
if decrypted[: Resource.RANDOM_HASH_SIZE] != prefix:
|
|
fail("DIRECT Resource throwaway prefix mismatch")
|
|
if decrypted[Resource.RANDOM_HASH_SIZE :] != resource_packed:
|
|
fail("DIRECT Resource did not decrypt to the complete LXMF body")
|
|
if content_size(resource_packed) != 320:
|
|
fail("Resource boundary vector computed content_size is not 320")
|
|
parse_and_check(resource_packed, 320)
|
|
|
|
adv = ResourceAdvertisement.unpack(
|
|
bytes.fromhex(resource_vector["expected"]["resource_advertisement_plaintext_hex"])
|
|
)
|
|
if adv.d != len(resource_packed) or adv.h.hex() != resource_vector["expected"]["resource_hash_hex"]:
|
|
fail("DIRECT Resource advertisement size/hash mismatch")
|
|
|
|
try:
|
|
Token(bytes(reversed(derived_key))).decrypt(packet_ciphertext)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
fail("DIRECT packet decrypted under the wrong Link key")
|
|
|
|
print("PASS S5.2/S10.1 DIRECT PACKET/RESOURCE boundary vectors: 319 -> PACKET, 320 -> RESOURCE")
|
|
print("PASS S3/S5.5/S5.6 DIRECT Link decrypt and signed full-body parse; wrong Link key rejected")
|
|
return packet_packed, resource_packed
|
|
|
|
|
|
def verify_upstream_boundary(alice_identity, bob_identity) -> None:
|
|
alice = RNS.Destination(
|
|
alice_identity, RNS.Destination.IN, RNS.Destination.SINGLE, "verify_link_lxmf", "alice"
|
|
)
|
|
bob = RNS.Destination(
|
|
bob_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "verify_link_lxmf", "bob"
|
|
)
|
|
for size, expected in [(319, LXMessage.PACKET), (320, LXMessage.RESOURCE)]:
|
|
message = LXMessage(bob, alice, b"x" * size, b"", {}, LXMessage.DIRECT)
|
|
message.timestamp = 1700000000.0
|
|
message.pack()
|
|
if content_size(message.packed) != size or message.representation != expected:
|
|
fail(f"upstream boundary mismatch at {size}")
|
|
print("PASS S10.1 upstream selection uses computed LXMF content_size at the 319/320 boundary")
|
|
|
|
|
|
def verify_router_direct_dispatch(packet_packed: bytes) -> None:
|
|
router = object.__new__(LXMRouter)
|
|
captured = {}
|
|
router.lxmf_delivery = lambda data, destination_type, **kwargs: captured.update(
|
|
data=data, destination_type=destination_type, method=kwargs.get("method")
|
|
)
|
|
|
|
class ImmediateThread:
|
|
def __init__(self, target, daemon=None):
|
|
self.target = target
|
|
|
|
def start(self):
|
|
self.target()
|
|
|
|
class FakePacket:
|
|
destination_type = RNS.Destination.LINK
|
|
rssi = -80
|
|
snr = 7
|
|
q = 90
|
|
ratchet_id = bytes.fromhex("00112233445566778899aabbccddeeff")
|
|
proved = False
|
|
|
|
def prove(self):
|
|
self.proved = True
|
|
|
|
packet = FakePacket()
|
|
router_mod = sys.modules["LXMF.LXMRouter"]
|
|
real_thread = router_mod.threading.Thread
|
|
router_mod.threading.Thread = ImmediateThread
|
|
try:
|
|
router.delivery_packet(packet_packed, packet)
|
|
finally:
|
|
router_mod.threading.Thread = real_thread
|
|
|
|
if not packet.proved:
|
|
fail("LXMRouter.delivery_packet did not prove DIRECT Link DATA")
|
|
if captured.get("data") != packet_packed:
|
|
fail("LXMRouter.delivery_packet altered or prepended DIRECT LXMF data")
|
|
if captured.get("destination_type") != RNS.Destination.LINK:
|
|
fail("LXMRouter.delivery_packet lost LINK destination type")
|
|
if captured.get("method") != LXMessage.DIRECT:
|
|
fail("LXMRouter.delivery_packet did not classify LINK data as DIRECT")
|
|
print("PASS S5.2 DIRECT receive dispatch proves DATA and passes full LXMF body unchanged")
|
|
|
|
|
|
def main() -> None:
|
|
print(f"verify_link_lxmf.py against RNS {RNS.__version__} / LXMF {LXMF.__version__}")
|
|
init_minimal_rns()
|
|
try:
|
|
identities = load_json(IDS_PATH)["vectors"]
|
|
links = load_json(LINKS_PATH)["vectors"][0]
|
|
alice = next(vector for vector in identities if vector["label"] == "alice")
|
|
bob = next(vector for vector in identities if vector["label"] == "bob")
|
|
alice_identity = RNS.Identity.from_bytes(bytes.fromhex(alice["inputs"]["private_key_hex"]))
|
|
bob_identity = RNS.Identity.from_bytes(bytes.fromhex(bob["inputs"]["private_key_hex"]))
|
|
RNS.Identity.remember(bytes(32), bytes.fromhex(alice["expected"]["destination_hash_hex"]), alice_identity.get_public_key(), None)
|
|
derived_key = bytes.fromhex(links["expected"]["derived_key_hex"])
|
|
link_id = bytes.fromhex(links["expected"]["link_id_hex"])
|
|
|
|
packet_packed, _ = verify_vectors(derived_key, link_id)
|
|
verify_upstream_boundary(alice_identity, bob_identity)
|
|
verify_router_direct_dispatch(packet_packed)
|
|
print("ALL PASS")
|
|
finally:
|
|
try:
|
|
RNS.Reticulum.exit_handler()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|