reticiulum-specification/tools/verify_token_crypto.py
Rob 75169b0631 Add three high-value verifiers: token crypto, announce, LXMF opportunistic
tools/verify_token_crypto.py — locks in §3:
    - Opportunistic Token encrypt/decrypt round-trip with full
      ephemeral_pub(32) || iv(16) || aes(...) || hmac(32) layout check.
    - HKDF salt = recipient.identity_hash verified by re-deriving
      the key by hand and confirming decrypt succeeds.
    - Link-derived Token form (no eph_pub prefix) round-trip.
    - HMAC-then-AES order proven by tampering each region: HMAC
      mismatch raises before AES decrypt.
    - PKCS#7 padding boundaries (1B and 16B plaintexts).

  tools/verify_announce_roundtrip.py — locks in §4 + §4.5:
    - Build via upstream Destination.announce(send=False).
    - Body layout walk with context_flag branching for the optional
      ratchet slot.
    - signed_data reconstruction per §4.2 with empty-bytes-not-absent
      ratchet rule.
    - dest_hash recompute per §1.2.
    - random_hash[5:10] is a recent unix_seconds timestamp per §4.1
      (corrected — confirms upstream emits the timestamp half).
    - Upstream validate_announce accepts.
    - Tamper detection: bit-flips in signature, public_key, name_hash,
      random_hash, app_data are all rejected.

  tools/verify_lxmf_opportunistic.py — locks in §5.1, §5.2, §5.5, §5.6
  plus §3 layered correctly:
    - Two identities (Alice, Bob) with mutual discovery.
    - LXMessage build with title, content, fields.
    - Body layout: dest(16) || src(16) || sig(64) || msgpack.
    - Opportunistic-form strip of leading dest_hash before encryption.
    - Encrypt to Bob via Token, decrypt as Bob, byte-identical
      round-trip.
    - Re-prepend dest_hash and run unpack_from_bytes; confirms
      signature_validated=True and title/content/fields preserved.

All three pass against RNS 1.2.0 / LXMF 0.9.6.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 12:41:20 -04:00

206 lines
7.9 KiB
Python

"""
Verifier for SPEC.md S3 (Token cryptography).
Exercises the modified-Fernet Token construction in two directions
against upstream RNS 1.2.0:
1. Identity-style encrypt (with ephemeral X25519 prefix) per S3.1
opportunistic form. Round-trips a known plaintext through
RNS.Identity.encrypt -> RNS.Identity.decrypt.
2. Symmetric Token encrypt/decrypt (no ephemeral prefix) per S3.1
link-derived form. Builds a fresh symmetric key, encrypts a
known plaintext, validates the wire layout against the spec,
and round-trips back through Token.decrypt.
3. HMAC-then-AES order check (S3.3): a tampered HMAC byte is
detected before AES decryption is attempted, so the function
raises on HMAC failure rather than returning a malformed
plaintext.
4. HKDF salt = identity_hash check (S3.2): re-derive the
encryption key by hand using HKDF over the ECDH shared secret
with salt = recipient identity_hash, and confirm the resulting
key matches the one upstream uses to encrypt.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import sys
import RNS
from RNS.Cryptography.Token import Token
from RNS.Cryptography.HKDF import hkdf
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def verify_opportunistic_encrypt_decrypt():
"""S3.2 / S3.3: identity-style encrypt with ephemeral pub prefix,
HKDF derived from ECDH(ephemeral, recipient.X25519_pub) with
salt = recipient.identity_hash."""
recipient = RNS.Identity()
plaintext = b"hello, reticulum"
# Encrypt to recipient's identity. This builds an ephemeral X25519
# keypair internally, does ECDH, derives the Token key, and emits
# ephemeral_pub(32) || iv(16) || aes_ciphertext(...) || hmac(32)
ciphertext = recipient.encrypt(plaintext)
# Wire-layout sanity:
if len(ciphertext) < 32 + 16 + 16 + 32:
fail(f"opportunistic ciphertext too short: {len(ciphertext)} bytes")
eph_pub_bytes = ciphertext[:32]
iv = ciphertext[32:48]
hmac = ciphertext[-32:]
aes_body = ciphertext[48:-32]
if len(aes_body) % 16 != 0:
fail(f"AES body not block-aligned: {len(aes_body)} bytes (must be multiple of 16)")
if len(iv) != 16:
fail(f"IV is {len(iv)} bytes, want 16")
if len(hmac) != 32:
fail(f"HMAC is {len(hmac)} bytes, want 32")
# Round-trip through decrypt — uses recipient's long-term X25519
# private key (no ratchets configured on this fresh identity).
decrypted = recipient.decrypt(ciphertext)
if decrypted != plaintext:
fail(f"opportunistic round-trip mismatch:\n"
f" plaintext: {plaintext!r}\n"
f" decrypted: {decrypted!r}")
print("PASS S3.1/3.2/3.3 opportunistic Token encrypt/decrypt round-trip")
return recipient, eph_pub_bytes, iv, hmac, aes_body
def verify_hkdf_salt_is_identity_hash(recipient, eph_pub_bytes, iv, hmac_bytes, aes_body):
"""S3.2: confirm HKDF salt is the recipient's 16-byte identity_hash,
not the dest_hash or ratchet_pub or anything else."""
# We can't observe the exact ephemeral private key (it was generated
# inside RNS.Identity.encrypt). But we CAN take the recipient's
# private key and the captured ephemeral_pub, perform ECDH from the
# recipient's side, derive the Token key under salt = identity_hash,
# and confirm decrypt succeeds — equivalent to asserting the salt.
# Use the RNS-level X25519 dispatcher so the resulting public-key
# object matches whichever provider (proxy vs fallback) the recipient's
# private key uses — exchange() requires both sides be the same kind.
from RNS.Cryptography import X25519PublicKey
eph_pub = X25519PublicKey.from_public_bytes(eph_pub_bytes)
shared = recipient.prv.exchange(eph_pub)
derived = hkdf(
length=64,
derive_from=shared,
salt=recipient.hash, # <-- the 16-byte identity_hash
context=None,
)
# Reconstruct the Token from the derived key and verify HMAC + decrypt
token = Token(derived)
body = iv + aes_body + hmac_bytes
plaintext = token.decrypt(body)
if plaintext != b"hello, reticulum":
fail(f"HKDF-salt-by-hand decrypt mismatch: {plaintext!r}")
print("PASS S3.2 HKDF salt is recipient.identity_hash (decrypt succeeds with hand-derived key)")
def verify_symmetric_token_form(plaintext=b"link DATA payload"):
"""S3.1 link-derived form: no ephemeral prefix, just iv || ciphertext || hmac."""
key = Token.generate_key() # 64 bytes for AES-256-CBC
if len(key) != 64:
fail(f"Token.generate_key returned {len(key)} bytes, want 64")
token = Token(key)
wire = token.encrypt(plaintext)
# Layout: iv(16) || ciphertext(N*16) || hmac(32)
if len(wire) < 16 + 16 + 32:
fail(f"link-derived ciphertext too short: {len(wire)}")
iv = wire[:16]
ciphertext = wire[16:-32]
hmac_bytes = wire[-32:]
if len(ciphertext) % 16 != 0:
fail(f"link-derived ciphertext body not block-aligned: {len(ciphertext)}")
decrypted = token.decrypt(wire)
if decrypted != plaintext:
fail(f"link-derived round-trip mismatch:\n in: {plaintext!r}\n out: {decrypted!r}")
print("PASS S3.1 link-derived Token form (no ephemeral prefix, iv||ct||hmac)")
return key, wire
def verify_hmac_before_aes(key, wire):
"""S3.3: HMAC verification MUST run before AES decryption.
A tampered HMAC byte should raise rather than produce malformed plaintext."""
token = Token(key)
# Flip a single bit in the HMAC region — the last 32 bytes
tampered = wire[:-1] + bytes([wire[-1] ^ 0x01])
try:
token.decrypt(tampered)
fail("S3.3 tampered HMAC was accepted — encrypt-then-MAC verification missing")
except ValueError as e:
if "HMAC" not in str(e):
fail(f"S3.3 decrypt raised but with wrong error: {e}")
# Good: HMAC mismatch raised before AES decrypt could run
# Also flip a byte in the ciphertext (HMAC stays intact in shape but
# the HMAC wouldn't match the corrupted body). Same expected outcome.
if len(wire) > 64:
tampered2 = wire[:32] + bytes([wire[32] ^ 0x01]) + wire[33:]
try:
token.decrypt(tampered2)
fail("S3.3 tampered ciphertext was accepted — HMAC-then-AES order broken")
except ValueError:
pass
print("PASS S3.3 HMAC-then-AES order (tampered ciphertext rejected at HMAC stage)")
def verify_pkcs7_padding_handled():
"""S3.2 step 6: AES-CBC PKCS#7 padding is applied automatically by the
Token; clients must NOT pad manually (would produce double padding)."""
# 1-byte plaintext: PKCS#7 will pad with 15 bytes of 0x0F
one_byte = b"x"
key = Token.generate_key()
token = Token(key)
wire = token.encrypt(one_byte)
out = token.decrypt(wire)
if out != one_byte:
fail(f"S3.2 step 6 PKCS#7 round-trip on 1B plaintext failed: {out!r}")
# 16-byte plaintext (one full block): PKCS#7 adds a full block of 0x10
sixteen = b"sixteen ABCDEFGH"
assert len(sixteen) == 16
wire = Token(key).encrypt(sixteen)
if (len(wire) - 16 - 32) != 32:
fail(f"S3.2 step 6 16B plaintext should produce 32B AES body (one + full pad block), "
f"got {len(wire) - 16 - 32}")
out = Token(key).decrypt(wire)
if out != sixteen:
fail(f"S3.2 step 6 PKCS#7 round-trip on 16B plaintext failed: {out!r}")
print("PASS S3.2 step 6 PKCS#7 padding (1B and 16B boundaries)")
def main():
print(f"verify_token_crypto.py against RNS {RNS.__version__}")
recipient, eph_pub_bytes, iv, hmac_bytes, aes_body = verify_opportunistic_encrypt_decrypt()
verify_hkdf_salt_is_identity_hash(recipient, eph_pub_bytes, iv, hmac_bytes, aes_body)
key, wire = verify_symmetric_token_form()
verify_hmac_before_aes(key, wire)
verify_pkcs7_padding_handled()
print("ALL PASS")
if __name__ == "__main__":
main()