tools/verify_token_crypto.py — locks in §3:
- Opportunistic Token encrypt/decrypt round-trip with full
ephemeral_pub(32) || iv(16) || aes(...) || hmac(32) layout check.
- HKDF salt = recipient.identity_hash verified by re-deriving
the key by hand and confirming decrypt succeeds.
- Link-derived Token form (no eph_pub prefix) round-trip.
- HMAC-then-AES order proven by tampering each region: HMAC
mismatch raises before AES decrypt.
- PKCS#7 padding boundaries (1B and 16B plaintexts).
tools/verify_announce_roundtrip.py — locks in §4 + §4.5:
- Build via upstream Destination.announce(send=False).
- Body layout walk with context_flag branching for the optional
ratchet slot.
- signed_data reconstruction per §4.2 with empty-bytes-not-absent
ratchet rule.
- dest_hash recompute per §1.2.
- random_hash[5:10] is a recent unix_seconds timestamp per §4.1
(corrected — confirms upstream emits the timestamp half).
- Upstream validate_announce accepts.
- Tamper detection: bit-flips in signature, public_key, name_hash,
random_hash, app_data are all rejected.
tools/verify_lxmf_opportunistic.py — locks in §5.1, §5.2, §5.5, §5.6
plus §3 layered correctly:
- Two identities (Alice, Bob) with mutual discovery.
- LXMessage build with title, content, fields.
- Body layout: dest(16) || src(16) || sig(64) || msgpack.
- Opportunistic-form strip of leading dest_hash before encryption.
- Encrypt to Bob via Token, decrypt as Bob, byte-identical
round-trip.
- Re-prepend dest_hash and run unpack_from_bytes; confirms
signature_validated=True and title/content/fields preserved.
All three pass against RNS 1.2.0 / LXMF 0.9.6.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
205 lines
7.8 KiB
Python
205 lines
7.8 KiB
Python
"""
|
|
Verifier for SPEC.md S4 (announce wire format) and S4.5 (validation rules).
|
|
|
|
Builds an announce via upstream `Destination.announce(send=False)` against a
|
|
fresh identity, then walks the validation pipeline by hand:
|
|
|
|
1. Slice the announce body per S4.1, branched on context_flag.
|
|
2. Reconstruct signed_data per S4.2 (including the trailing app_data
|
|
and ratchet pub when present).
|
|
3. Verify the Ed25519 signature.
|
|
4. Recompute dest_hash = SHA256(name_hash || identity_hash)[:16] and
|
|
confirm it matches the outer packet header.
|
|
5. Confirm random_hash[5:10] is a recent unix-seconds timestamp per
|
|
S4.1 (corrected) — the upstream Python form, not the
|
|
microReticulum random-only form.
|
|
6. Run upstream RNS.Identity.validate_announce(packet) and confirm
|
|
it accepts.
|
|
7. Tamper with each of: signature, public_key, name_hash, random_hash,
|
|
ratchet_pub, app_data — and confirm validate_announce rejects each.
|
|
|
|
Exit code 0 on PASS, non-zero on FAIL.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
import RNS
|
|
|
|
|
|
def fail(msg: str) -> None:
|
|
print(f"FAIL: {msg}")
|
|
sys.exit(1)
|
|
|
|
|
|
def init_minimal_rns():
|
|
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-announce-")
|
|
cfg_path = os.path.join(cfg_dir, "config")
|
|
with open(cfg_path, "w", encoding="utf-8") as f:
|
|
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
|
|
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
|
|
|
|
|
|
def build_announce(identity, app_data=None):
|
|
"""Build an announce via upstream and return (packet, raw bytes after pack)."""
|
|
dest = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
|
"verify_announce", "test")
|
|
pkt = dest.announce(app_data=app_data, send=False)
|
|
pkt.pack()
|
|
return dest, pkt
|
|
|
|
|
|
def verify_body_layout_and_signature(dest, pkt):
|
|
"""S4.1 + S4.2 + S4.5 by-hand walk."""
|
|
raw = pkt.raw
|
|
flags = raw[0]
|
|
hops = raw[1]
|
|
dest_hash_in_header = raw[2:18]
|
|
context = raw[18]
|
|
|
|
# Validate that the outer header's dest_hash matches our destination
|
|
if dest_hash_in_header != dest.hash:
|
|
fail(f"dest_hash in header != Destination.hash:\n"
|
|
f" header: {dest_hash_in_header.hex()}\n"
|
|
f" dest: {dest.hash.hex()}")
|
|
|
|
context_flag = (flags >> 5) & 0x01
|
|
|
|
body = raw[19:]
|
|
KEYSIZE = 64
|
|
NAME_HASH = 10
|
|
RANDOM_HASH = 10
|
|
RATCHET = 32
|
|
SIG = 64
|
|
|
|
public_key = body[0:KEYSIZE]
|
|
name_hash = body[KEYSIZE:KEYSIZE+NAME_HASH]
|
|
random_hash = body[KEYSIZE+NAME_HASH:KEYSIZE+NAME_HASH+RANDOM_HASH]
|
|
|
|
# Branch on context_flag for ratchet slot
|
|
if context_flag == 1:
|
|
ratchet = body[KEYSIZE+NAME_HASH+RANDOM_HASH:KEYSIZE+NAME_HASH+RANDOM_HASH+RATCHET]
|
|
sig_start = KEYSIZE+NAME_HASH+RANDOM_HASH+RATCHET
|
|
else:
|
|
ratchet = b""
|
|
sig_start = KEYSIZE+NAME_HASH+RANDOM_HASH
|
|
|
|
signature = body[sig_start:sig_start+SIG]
|
|
app_data = body[sig_start+SIG:]
|
|
|
|
# signed_data per S4.2: dest_hash || pub || name_hash || random_hash || ratchet || app_data
|
|
signed_data = dest.hash + public_key + name_hash + random_hash + ratchet + app_data
|
|
|
|
# Verify Ed25519 signature using the announced identity's pub key
|
|
announced = RNS.Identity(create_keys=False)
|
|
announced.load_public_key(public_key)
|
|
if not announced.validate(signature, signed_data):
|
|
fail("S4.2 hand-rebuilt signed_data did not validate signature — "
|
|
"spec layout doesn't match upstream emission")
|
|
|
|
# S1.2: dest_hash recompute
|
|
expected = hashlib.sha256(name_hash + announced.hash).digest()[:16]
|
|
if expected != dest.hash:
|
|
fail(f"S1.2 dest_hash recompute mismatch:\n"
|
|
f" expected: {expected.hex()}\n"
|
|
f" actual: {dest.hash.hex()}")
|
|
|
|
# S4.1 corrected: random_hash[5:10] is BE-uint40 unix_seconds
|
|
import time
|
|
ts = int.from_bytes(random_hash[5:10], "big")
|
|
now = int(time.time())
|
|
if abs(now - ts) > 10:
|
|
fail(f"S4.1 random_hash timestamp half is not a recent unix_seconds value:\n"
|
|
f" random_hash[5:10] -> {ts}\n"
|
|
f" now -> {now}")
|
|
|
|
print("PASS S4.1/4.2 announce body layout + signature + dest_hash recompute")
|
|
print(f" context_flag={context_flag} ratchet={'yes' if ratchet else 'no'} "
|
|
f"app_data={len(app_data)}B ts_skew={now-ts}s")
|
|
|
|
return public_key, name_hash, random_hash, ratchet, signature, app_data
|
|
|
|
|
|
def verify_upstream_validate_announce(pkt):
|
|
"""S4.5: upstream validate_announce accepts a freshly built announce."""
|
|
if not pkt.unpack():
|
|
fail("Packet.unpack returned False on a fresh announce")
|
|
if not RNS.Identity.validate_announce(pkt):
|
|
fail("RNS.Identity.validate_announce rejected a freshly built announce")
|
|
print("PASS S4.5 RNS.Identity.validate_announce accepts upstream-built announce")
|
|
|
|
|
|
def verify_tamper_detection(dest, app_data=b"some_app_data"):
|
|
"""S4.5: confirm validate_announce rejects bit-flips in each field."""
|
|
# Helper to build a fresh announce, tamper, re-unpack, and validate.
|
|
# Each call builds a fresh announce_packet from the same Destination,
|
|
# which is fine — Destination.announce(send=False) doesn't re-register.
|
|
def tampered_validate(mutator, label):
|
|
pkt = dest.announce(app_data=app_data, send=False)
|
|
pkt.pack()
|
|
original_raw = pkt.raw
|
|
new_raw = mutator(original_raw)
|
|
# Build a new Packet from the tampered raw bytes via Transport.inbound's
|
|
# pattern — Packet(None, raw) + unpack
|
|
tpkt = RNS.Packet(None, new_raw)
|
|
if not tpkt.unpack():
|
|
# Some tampering corrupts framing enough that unpack returns False.
|
|
# Either way, the announce is rejected — that's the desired outcome.
|
|
return
|
|
if RNS.Identity.validate_announce(tpkt):
|
|
fail(f"S4.5 tampered announce ({label}) was accepted")
|
|
|
|
# 1. Flip a signature byte (signature lives near the end of the body)
|
|
def tamper_signature(raw):
|
|
# signature = body[..., -64-len(app_data):-len(app_data)] in the
|
|
# ratchet-present case. Easier: flip 1 byte 100 from end (well inside sig).
|
|
return raw[:-100] + bytes([raw[-100] ^ 0x01]) + raw[-99:]
|
|
tampered_validate(tamper_signature, "signature byte flipped")
|
|
|
|
# 2. Flip a public_key byte (offset 19 = first byte of pub in HEADER_1 announce)
|
|
def tamper_pubkey(raw):
|
|
return raw[:19] + bytes([raw[19] ^ 0x01]) + raw[20:]
|
|
tampered_validate(tamper_pubkey, "public_key first byte flipped")
|
|
|
|
# 3. Flip a name_hash byte (offset 19 + 64 = 83, first name_hash byte)
|
|
def tamper_namehash(raw):
|
|
return raw[:83] + bytes([raw[83] ^ 0x01]) + raw[84:]
|
|
tampered_validate(tamper_namehash, "name_hash first byte flipped")
|
|
|
|
# 4. Flip a random_hash byte (offset 19 + 64 + 10 = 93, first random_hash byte)
|
|
def tamper_randomhash(raw):
|
|
return raw[:93] + bytes([raw[93] ^ 0x01]) + raw[94:]
|
|
tampered_validate(tamper_randomhash, "random_hash first byte flipped")
|
|
|
|
# 5. Flip an app_data byte (last byte)
|
|
def tamper_appdata(raw):
|
|
return raw[:-1] + bytes([raw[-1] ^ 0x01])
|
|
tampered_validate(tamper_appdata, "app_data last byte flipped")
|
|
|
|
print("PASS S4.5 validate_announce rejects tampered signature / pub / name_hash / "
|
|
"random_hash / app_data")
|
|
|
|
|
|
def main():
|
|
print(f"verify_announce_roundtrip.py against RNS {RNS.__version__}")
|
|
rns_instance = init_minimal_rns()
|
|
try:
|
|
identity = RNS.Identity()
|
|
dest, pkt = build_announce(identity, app_data=b"hello-app-data")
|
|
verify_body_layout_and_signature(dest, pkt)
|
|
verify_upstream_validate_announce(pkt)
|
|
verify_tamper_detection(dest)
|
|
finally:
|
|
try:
|
|
RNS.Reticulum.exit_handler()
|
|
except Exception:
|
|
pass
|
|
print("ALL PASS")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|