reticiulum-specification/tools/regen_lxmf.py
Rob 038e39401f Bootstrap test-vectors/{announces,lxmf,links}.json + regenerators
Three deterministic vector files complete the test-vectors/ bootstrap.
Each regenerator pins every random source so output is byte-identical
across runs against a fixed upstream RNS / LXMF version.

- announces.json: two vectors (no-ratchet + with-ratchet) signed by
  Alice. Determinism via patched Identity.get_random_hash + module-
  local time.time shim inside RNS.Destination.

- lxmf.json: two opportunistic-LXMF vectors Alice -> Bob, captures
  full plaintext (S5.2 layout) plus Token-encrypted ciphertext (S3).
  Determinism via fixed LXMessage.timestamp, ephemeral X25519 priv,
  and Token CBC IV.

- links.json: full Link handshake — LINKREQUEST + LRPROOF wire bytes,
  derived link_id, ECDH shared secret, and HKDF-derived session key
  that both initiator and responder MUST agree on. Determinism via
  three queued ephemeral priv-key blobs (initiator X25519, initiator
  Ed25519, responder X25519) consumed in source-call order at
  RNS/Link.py:285, :286, :278.

Status table in test-vectors/README.md and tools/README.md updated to
reflect the completed bootstrap. todo.md cleaned up to reflect actual
state (the previous "Open ⚠️ items needing a runtime verifier" section
was stale — all three verifiers were completed earlier).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 21:56:44 -04:00

250 lines
9.9 KiB
Python

"""
Regenerator for test-vectors/lxmf.json.
Builds two deterministic opportunistic-LXMF wire-byte vectors using
Alice and Bob from `identities.json`:
1. `alice_to_bob_simple` — title b"hello", content b"hi bob",
no fields
2. `alice_to_bob_with_fields` — content + title + a small msgpack-
serialisable fields dict
For each vector we record:
- `lxmf_packed_hex` — the post-`LXMessage.pack()` plaintext body
(`dest(16) || src(16) || sig(64) || msgpack`)
per S5.1 / S5.2.
- `opportunistic_plaintext_hex` — the same body with the leading 16
dest_hash stripped (S5.1 opportunistic form
fed to Token).
- `token_ciphertext_hex` — the Token-encrypted ciphertext that goes
on the wire as the opportunistic packet body
per S3.
Determinism inputs:
- Alice + Bob private keys (from identities.json). Ed25519 sign is
deterministic so the LXMF signature is reproducible.
- `LXMessage.timestamp` pre-set to a fixed value (overrides the
`time.time()` default at `LXMF/LXMessage.py:354`).
- The Token ephemeral X25519 priv (patched X25519PrivateKey.generate
in RNS.Identity).
- The Token CBC IV (patched os.urandom in RNS.Cryptography.Token).
After generation we decrypt with Bob's identity, re-prepend Bob's
dest_hash, parse via `LXMessage.unpack_from_bytes`, and assert
signature_validated + title/content/fields round-trip.
Run from repo root:
python tools/regen_lxmf.py
Updates `test-vectors/lxmf.json` in place. Exit 0 on success.
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
import RNS
import LXMF
from LXMF.LXMessage import LXMessage
from RNS.vendor import umsgpack
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OUT_PATH = os.path.join(REPO_ROOT, "test-vectors", "lxmf.json")
IDS_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
FIXED_LXMF_TIMESTAMP = 1700000000.0
FIXED_EPH_X25519_PRIV = bytes.fromhex("d1d2d3d4d5d6d7d8d9dadbdcdddedfe0e1e2e3e4e5e6e7e8e9eaebecedeeeff0")
FIXED_TOKEN_IV = bytes.fromhex("11223344556677889900aabbccddeeff") # 16 B for AES-CBC
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-regen-lxmf-")
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def load_identities():
with open(IDS_PATH, "r", encoding="utf-8") as f:
ids = json.load(f)
alice = next(v for v in ids["vectors"] if v["label"] == "alice")
bob = next(v for v in ids["vectors"] if v["label"] == "bob")
alice_id = RNS.Identity.from_bytes(bytes.fromhex(alice["inputs"]["private_key_hex"]))
bob_id = RNS.Identity.from_bytes(bytes.fromhex(bob ["inputs"]["private_key_hex"]))
if alice_id is None or bob_id is None:
fail("Identity.from_bytes returned None for one of alice/bob")
return alice_id, bob_id
class _StaticX25519Priv:
"""Drop-in replacement for X25519PrivateKey whose .generate()
classmethod returns a private key seeded with FIXED_EPH_X25519_PRIV."""
_real = None
@classmethod
def generate(cls):
# cls._real is the original RNS dispatcher; from_private_bytes
# builds an X25519 priv with the same interface as generate().
return cls._real.from_private_bytes(FIXED_EPH_X25519_PRIV)
def build_vector(alice_dest, bob_dest_in, bob_dest_out, label, title, content, fields):
import sys as _sys
id_mod = _sys.modules["RNS.Identity"]
token_mod = _sys.modules["RNS.Cryptography.Token"]
lxm = LXMessage(
destination = bob_dest_out,
source = alice_dest,
content = content,
title = title,
fields = fields,
desired_method = LXMessage.OPPORTUNISTIC,
)
# Fix the timestamp before pack() so the msgpack payload is stable
lxm.timestamp = FIXED_LXMF_TIMESTAMP
lxm.pack()
packed = lxm.packed
if packed[:16] != bob_dest_out.hash:
fail(f"S5.2 dest_hash slot mismatch ({label})")
if packed[16:32] != alice_dest.hash:
fail(f"S5.2 src_hash slot mismatch ({label})")
opp_plaintext = packed[16:] # opportunistic-form input to Token
# --- Patch ephemeral X25519 generate + Token IV for deterministic CT
real_X25519 = id_mod.X25519PrivateKey
_StaticX25519Priv._real = real_X25519
id_mod.X25519PrivateKey = _StaticX25519Priv
real_urandom = token_mod.os.urandom
def patched_urandom(n):
if n == 16: return FIXED_TOKEN_IV
return real_urandom(n)
token_mod.os.urandom = patched_urandom
try:
ciphertext = bob_dest_out.encrypt(opp_plaintext)
finally:
id_mod.X25519PrivateKey = real_X25519
token_mod.os.urandom = real_urandom
# --- Round-trip: Bob can decrypt and re-parse the plaintext
decrypted = bob_dest_in.decrypt(ciphertext)
if decrypted != opp_plaintext:
fail(f"Token round-trip mismatch ({label})")
full_lxmf = bob_dest_in.hash + decrypted
parsed = LXMessage.unpack_from_bytes(full_lxmf)
if not parsed.signature_validated:
fail(f"S5.5/5.6 unpack_from_bytes did not validate signature ({label})")
if parsed.title_as_string() != title.decode("utf-8"):
fail(f"title round-trip mismatch ({label})")
if parsed.content_as_string() != content.decode("utf-8"):
fail(f"content round-trip mismatch ({label})")
if parsed.fields != fields:
fail(f"fields round-trip mismatch ({label})")
return {
"label": label,
"inputs": {
"src_identity_label": "alice",
"dst_identity_label": "bob",
"title_utf8": title.decode("utf-8"),
"content_utf8": content.decode("utf-8"),
"fields": fields,
"lxmf_timestamp": FIXED_LXMF_TIMESTAMP,
"ephemeral_x25519_priv_hex": FIXED_EPH_X25519_PRIV.hex(),
"token_iv_hex": FIXED_TOKEN_IV.hex(),
},
"expected": {
"lxmf_packed_hex": packed.hex(),
"opportunistic_plaintext_hex": opp_plaintext.hex(),
"token_ciphertext_hex": ciphertext.hex(),
"fields_layout": {
"destination_hash_hex": packed[0:16].hex(),
"source_hash_hex": packed[16:32].hex(),
"signature_hex": packed[32:96].hex(),
"msgpack_payload_hex": packed[96:].hex(),
},
},
"rns_version_at_generation": RNS.__version__,
"lxmf_version_at_generation": LXMF.__version__,
"generator_script": "tools/regen_lxmf.py",
"verifies_spec_sections": ["3", "5.1", "5.2", "5.5", "5.6"],
}
def main():
print(f"regen_lxmf.py against RNS {RNS.__version__} / LXMF {LXMF.__version__}")
init_minimal_rns()
try:
alice_id, bob_id = load_identities()
# Register once; subsequent registrations of same (identity, dir,
# type, aspects) raise. Reuse across vectors.
alice_dest = RNS.Destination(alice_id, RNS.Destination.IN, RNS.Destination.SINGLE,
"lxmf", "delivery")
bob_dest_in = RNS.Destination(bob_id, RNS.Destination.IN, RNS.Destination.SINGLE,
"lxmf", "delivery")
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE,
"lxmf", "delivery")
RNS.Identity.remember(b"\x00"*32, alice_dest.hash, alice_id.get_public_key(), None)
RNS.Identity.remember(b"\x00"*32, bob_dest_in.hash, bob_id .get_public_key(), None)
vectors = [
build_vector(alice_dest, bob_dest_in, bob_dest_out,
label = "alice_to_bob_simple",
title = b"hello",
content = b"hi bob",
fields = {}),
build_vector(alice_dest, bob_dest_in, bob_dest_out,
label = "alice_to_bob_with_fields",
title = b"meeting",
content = b"see attached",
fields = {0x01: "k1", 0x02: 42}),
]
payload = {
"_about": (
"Opportunistic LXMF test vectors. "
"`expected.lxmf_packed_hex` is the full plaintext body "
"per SPEC.md S5.2: dest(16) || src(16) || sig(64) || msgpack. "
"`expected.opportunistic_plaintext_hex` is the same with "
"the leading dest_hash stripped (S5.1 wire form). "
"`expected.token_ciphertext_hex` is the deterministic Token "
"encryption of the opportunistic plaintext (S3) using the "
"fixed ephemeral X25519 priv + IV. To verify against "
"upstream: decrypt with Bob's identity, re-prepend Bob's "
"dest_hash, then call LXMessage.unpack_from_bytes — it MUST "
"succeed with signature_validated == True and title/content/"
"fields matching the `inputs` block. Regenerate with "
"`generator_script`."
),
"vectors": vectors,
}
with open(OUT_PATH, "w", encoding="utf-8", newline="\n") as f:
json.dump(payload, f, indent=2, sort_keys=False)
f.write("\n")
print(f"Wrote {OUT_PATH} with {len(vectors)} vectors")
print("ALL PASS")
finally:
try: RNS.Reticulum.exit_handler()
except Exception: pass
if __name__ == "__main__":
main()