reticiulum-specification/tools/verify_announce_roundtrip.py
Rob 75169b0631 Add three high-value verifiers: token crypto, announce, LXMF opportunistic
tools/verify_token_crypto.py — locks in §3:
    - Opportunistic Token encrypt/decrypt round-trip with full
      ephemeral_pub(32) || iv(16) || aes(...) || hmac(32) layout check.
    - HKDF salt = recipient.identity_hash verified by re-deriving
      the key by hand and confirming decrypt succeeds.
    - Link-derived Token form (no eph_pub prefix) round-trip.
    - HMAC-then-AES order proven by tampering each region: HMAC
      mismatch raises before AES decrypt.
    - PKCS#7 padding boundaries (1B and 16B plaintexts).

  tools/verify_announce_roundtrip.py — locks in §4 + §4.5:
    - Build via upstream Destination.announce(send=False).
    - Body layout walk with context_flag branching for the optional
      ratchet slot.
    - signed_data reconstruction per §4.2 with empty-bytes-not-absent
      ratchet rule.
    - dest_hash recompute per §1.2.
    - random_hash[5:10] is a recent unix_seconds timestamp per §4.1
      (corrected — confirms upstream emits the timestamp half).
    - Upstream validate_announce accepts.
    - Tamper detection: bit-flips in signature, public_key, name_hash,
      random_hash, app_data are all rejected.

  tools/verify_lxmf_opportunistic.py — locks in §5.1, §5.2, §5.5, §5.6
  plus §3 layered correctly:
    - Two identities (Alice, Bob) with mutual discovery.
    - LXMessage build with title, content, fields.
    - Body layout: dest(16) || src(16) || sig(64) || msgpack.
    - Opportunistic-form strip of leading dest_hash before encryption.
    - Encrypt to Bob via Token, decrypt as Bob, byte-identical
      round-trip.
    - Re-prepend dest_hash and run unpack_from_bytes; confirms
      signature_validated=True and title/content/fields preserved.

All three pass against RNS 1.2.0 / LXMF 0.9.6.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 12:41:20 -04:00

205 lines
7.8 KiB
Python

"""
Verifier for SPEC.md S4 (announce wire format) and S4.5 (validation rules).
Builds an announce via upstream `Destination.announce(send=False)` against a
fresh identity, then walks the validation pipeline by hand:
1. Slice the announce body per S4.1, branched on context_flag.
2. Reconstruct signed_data per S4.2 (including the trailing app_data
and ratchet pub when present).
3. Verify the Ed25519 signature.
4. Recompute dest_hash = SHA256(name_hash || identity_hash)[:16] and
confirm it matches the outer packet header.
5. Confirm random_hash[5:10] is a recent unix-seconds timestamp per
S4.1 (corrected) — the upstream Python form, not the
microReticulum random-only form.
6. Run upstream RNS.Identity.validate_announce(packet) and confirm
it accepts.
7. Tamper with each of: signature, public_key, name_hash, random_hash,
ratchet_pub, app_data — and confirm validate_announce rejects each.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import hashlib
import os
import sys
import tempfile
import RNS
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-announce-")
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def build_announce(identity, app_data=None):
"""Build an announce via upstream and return (packet, raw bytes after pack)."""
dest = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE,
"verify_announce", "test")
pkt = dest.announce(app_data=app_data, send=False)
pkt.pack()
return dest, pkt
def verify_body_layout_and_signature(dest, pkt):
"""S4.1 + S4.2 + S4.5 by-hand walk."""
raw = pkt.raw
flags = raw[0]
hops = raw[1]
dest_hash_in_header = raw[2:18]
context = raw[18]
# Validate that the outer header's dest_hash matches our destination
if dest_hash_in_header != dest.hash:
fail(f"dest_hash in header != Destination.hash:\n"
f" header: {dest_hash_in_header.hex()}\n"
f" dest: {dest.hash.hex()}")
context_flag = (flags >> 5) & 0x01
body = raw[19:]
KEYSIZE = 64
NAME_HASH = 10
RANDOM_HASH = 10
RATCHET = 32
SIG = 64
public_key = body[0:KEYSIZE]
name_hash = body[KEYSIZE:KEYSIZE+NAME_HASH]
random_hash = body[KEYSIZE+NAME_HASH:KEYSIZE+NAME_HASH+RANDOM_HASH]
# Branch on context_flag for ratchet slot
if context_flag == 1:
ratchet = body[KEYSIZE+NAME_HASH+RANDOM_HASH:KEYSIZE+NAME_HASH+RANDOM_HASH+RATCHET]
sig_start = KEYSIZE+NAME_HASH+RANDOM_HASH+RATCHET
else:
ratchet = b""
sig_start = KEYSIZE+NAME_HASH+RANDOM_HASH
signature = body[sig_start:sig_start+SIG]
app_data = body[sig_start+SIG:]
# signed_data per S4.2: dest_hash || pub || name_hash || random_hash || ratchet || app_data
signed_data = dest.hash + public_key + name_hash + random_hash + ratchet + app_data
# Verify Ed25519 signature using the announced identity's pub key
announced = RNS.Identity(create_keys=False)
announced.load_public_key(public_key)
if not announced.validate(signature, signed_data):
fail("S4.2 hand-rebuilt signed_data did not validate signature — "
"spec layout doesn't match upstream emission")
# S1.2: dest_hash recompute
expected = hashlib.sha256(name_hash + announced.hash).digest()[:16]
if expected != dest.hash:
fail(f"S1.2 dest_hash recompute mismatch:\n"
f" expected: {expected.hex()}\n"
f" actual: {dest.hash.hex()}")
# S4.1 corrected: random_hash[5:10] is BE-uint40 unix_seconds
import time
ts = int.from_bytes(random_hash[5:10], "big")
now = int(time.time())
if abs(now - ts) > 10:
fail(f"S4.1 random_hash timestamp half is not a recent unix_seconds value:\n"
f" random_hash[5:10] -> {ts}\n"
f" now -> {now}")
print("PASS S4.1/4.2 announce body layout + signature + dest_hash recompute")
print(f" context_flag={context_flag} ratchet={'yes' if ratchet else 'no'} "
f"app_data={len(app_data)}B ts_skew={now-ts}s")
return public_key, name_hash, random_hash, ratchet, signature, app_data
def verify_upstream_validate_announce(pkt):
"""S4.5: upstream validate_announce accepts a freshly built announce."""
if not pkt.unpack():
fail("Packet.unpack returned False on a fresh announce")
if not RNS.Identity.validate_announce(pkt):
fail("RNS.Identity.validate_announce rejected a freshly built announce")
print("PASS S4.5 RNS.Identity.validate_announce accepts upstream-built announce")
def verify_tamper_detection(dest, app_data=b"some_app_data"):
"""S4.5: confirm validate_announce rejects bit-flips in each field."""
# Helper to build a fresh announce, tamper, re-unpack, and validate.
# Each call builds a fresh announce_packet from the same Destination,
# which is fine — Destination.announce(send=False) doesn't re-register.
def tampered_validate(mutator, label):
pkt = dest.announce(app_data=app_data, send=False)
pkt.pack()
original_raw = pkt.raw
new_raw = mutator(original_raw)
# Build a new Packet from the tampered raw bytes via Transport.inbound's
# pattern — Packet(None, raw) + unpack
tpkt = RNS.Packet(None, new_raw)
if not tpkt.unpack():
# Some tampering corrupts framing enough that unpack returns False.
# Either way, the announce is rejected — that's the desired outcome.
return
if RNS.Identity.validate_announce(tpkt):
fail(f"S4.5 tampered announce ({label}) was accepted")
# 1. Flip a signature byte (signature lives near the end of the body)
def tamper_signature(raw):
# signature = body[..., -64-len(app_data):-len(app_data)] in the
# ratchet-present case. Easier: flip 1 byte 100 from end (well inside sig).
return raw[:-100] + bytes([raw[-100] ^ 0x01]) + raw[-99:]
tampered_validate(tamper_signature, "signature byte flipped")
# 2. Flip a public_key byte (offset 19 = first byte of pub in HEADER_1 announce)
def tamper_pubkey(raw):
return raw[:19] + bytes([raw[19] ^ 0x01]) + raw[20:]
tampered_validate(tamper_pubkey, "public_key first byte flipped")
# 3. Flip a name_hash byte (offset 19 + 64 = 83, first name_hash byte)
def tamper_namehash(raw):
return raw[:83] + bytes([raw[83] ^ 0x01]) + raw[84:]
tampered_validate(tamper_namehash, "name_hash first byte flipped")
# 4. Flip a random_hash byte (offset 19 + 64 + 10 = 93, first random_hash byte)
def tamper_randomhash(raw):
return raw[:93] + bytes([raw[93] ^ 0x01]) + raw[94:]
tampered_validate(tamper_randomhash, "random_hash first byte flipped")
# 5. Flip an app_data byte (last byte)
def tamper_appdata(raw):
return raw[:-1] + bytes([raw[-1] ^ 0x01])
tampered_validate(tamper_appdata, "app_data last byte flipped")
print("PASS S4.5 validate_announce rejects tampered signature / pub / name_hash / "
"random_hash / app_data")
def main():
print(f"verify_announce_roundtrip.py against RNS {RNS.__version__}")
rns_instance = init_minimal_rns()
try:
identity = RNS.Identity()
dest, pkt = build_announce(identity, app_data=b"hello-app-data")
verify_body_layout_and_signature(dest, pkt)
verify_upstream_validate_announce(pkt)
verify_tamper_detection(dest)
finally:
try:
RNS.Reticulum.exit_handler()
except Exception:
pass
print("ALL PASS")
if __name__ == "__main__":
main()