reticiulum-specification/tools/verify_token_crypto.py
Rob cfd0d8249b Re-anchor against RNS 1.2.4 / LXMF 0.9.7 + track upstream distribution shift
Upstream RNS 1.2.4 (2026-05-07) announces it is "probably the last
release that is also published to GitHub" — pip continues until rnpkg
is complete and RNS is self-hosting. All 13 verifiers pass against
1.2.4 / 0.9.7; no wire-format, signing, or protocol behavior changed
between 1.2.0 and 1.2.4, so the changes here are purely currency:

- Pin tools/requirements.txt to rns==1.2.4 / lxmf==0.9.7 so the
  verifier stays reproducible if upstream stops mirroring to PyPI
  before the migration is ready.
- Add an "Upstream distribution shift" watch-list to todo.md (local
  Reticulum node, repo destination hash, rnpkg install/upgrade
  commands, rsg signature verification, mirroring source citations).
- Bump SPEC.md frontmatter and re-anchor ~50 line citations across
  Identity.py, Transport.py, Resource.py, Link.py, Reticulum.py,
  Packet.py, and LXMF/* (Identity.py drift was the heaviest at +13
  to +31 lines; Transport.py was variable). Fix one numeric
  (MAX_RANDOM_BLOBS = 32 → 64) and one semantic (§6.6.3 LRPROOF MTU
  clamp citation pointed at the wrong location — corrected to point
  at the transit-relay clamp at Transport.py:1539-1556).
- Update §10.4 decompression-bomb hazard to note upstream's 1.1.9 cap
  adoption, with citations to Resource.py:686-691 and Buffer.py:95-97
  plus a "do not use one-shot bz2.decompress()" warning.
- Re-anchor 11 flows/ files (version pins + ~30 line citations).
- Bump version labels in tools/README.md, test-vectors/README.md, and
  4 verifier docstrings + 2 hardcoded print strings.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 07:42:25 -04:00

206 lines
7.9 KiB
Python

"""
Verifier for SPEC.md S3 (Token cryptography).
Exercises the modified-Fernet Token construction in two directions
against upstream RNS 1.2.4:
1. Identity-style encrypt (with ephemeral X25519 prefix) per S3.1
opportunistic form. Round-trips a known plaintext through
RNS.Identity.encrypt -> RNS.Identity.decrypt.
2. Symmetric Token encrypt/decrypt (no ephemeral prefix) per S3.1
link-derived form. Builds a fresh symmetric key, encrypts a
known plaintext, validates the wire layout against the spec,
and round-trips back through Token.decrypt.
3. HMAC-then-AES order check (S3.3): a tampered HMAC byte is
detected before AES decryption is attempted, so the function
raises on HMAC failure rather than returning a malformed
plaintext.
4. HKDF salt = identity_hash check (S3.2): re-derive the
encryption key by hand using HKDF over the ECDH shared secret
with salt = recipient identity_hash, and confirm the resulting
key matches the one upstream uses to encrypt.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import sys
import RNS
from RNS.Cryptography.Token import Token
from RNS.Cryptography.HKDF import hkdf
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def verify_opportunistic_encrypt_decrypt():
"""S3.2 / S3.3: identity-style encrypt with ephemeral pub prefix,
HKDF derived from ECDH(ephemeral, recipient.X25519_pub) with
salt = recipient.identity_hash."""
recipient = RNS.Identity()
plaintext = b"hello, reticulum"
# Encrypt to recipient's identity. This builds an ephemeral X25519
# keypair internally, does ECDH, derives the Token key, and emits
# ephemeral_pub(32) || iv(16) || aes_ciphertext(...) || hmac(32)
ciphertext = recipient.encrypt(plaintext)
# Wire-layout sanity:
if len(ciphertext) < 32 + 16 + 16 + 32:
fail(f"opportunistic ciphertext too short: {len(ciphertext)} bytes")
eph_pub_bytes = ciphertext[:32]
iv = ciphertext[32:48]
hmac = ciphertext[-32:]
aes_body = ciphertext[48:-32]
if len(aes_body) % 16 != 0:
fail(f"AES body not block-aligned: {len(aes_body)} bytes (must be multiple of 16)")
if len(iv) != 16:
fail(f"IV is {len(iv)} bytes, want 16")
if len(hmac) != 32:
fail(f"HMAC is {len(hmac)} bytes, want 32")
# Round-trip through decrypt — uses recipient's long-term X25519
# private key (no ratchets configured on this fresh identity).
decrypted = recipient.decrypt(ciphertext)
if decrypted != plaintext:
fail(f"opportunistic round-trip mismatch:\n"
f" plaintext: {plaintext!r}\n"
f" decrypted: {decrypted!r}")
print("PASS S3.1/3.2/3.3 opportunistic Token encrypt/decrypt round-trip")
return recipient, eph_pub_bytes, iv, hmac, aes_body
def verify_hkdf_salt_is_identity_hash(recipient, eph_pub_bytes, iv, hmac_bytes, aes_body):
"""S3.2: confirm HKDF salt is the recipient's 16-byte identity_hash,
not the dest_hash or ratchet_pub or anything else."""
# We can't observe the exact ephemeral private key (it was generated
# inside RNS.Identity.encrypt). But we CAN take the recipient's
# private key and the captured ephemeral_pub, perform ECDH from the
# recipient's side, derive the Token key under salt = identity_hash,
# and confirm decrypt succeeds — equivalent to asserting the salt.
# Use the RNS-level X25519 dispatcher so the resulting public-key
# object matches whichever provider (proxy vs fallback) the recipient's
# private key uses — exchange() requires both sides be the same kind.
from RNS.Cryptography import X25519PublicKey
eph_pub = X25519PublicKey.from_public_bytes(eph_pub_bytes)
shared = recipient.prv.exchange(eph_pub)
derived = hkdf(
length=64,
derive_from=shared,
salt=recipient.hash, # <-- the 16-byte identity_hash
context=None,
)
# Reconstruct the Token from the derived key and verify HMAC + decrypt
token = Token(derived)
body = iv + aes_body + hmac_bytes
plaintext = token.decrypt(body)
if plaintext != b"hello, reticulum":
fail(f"HKDF-salt-by-hand decrypt mismatch: {plaintext!r}")
print("PASS S3.2 HKDF salt is recipient.identity_hash (decrypt succeeds with hand-derived key)")
def verify_symmetric_token_form(plaintext=b"link DATA payload"):
"""S3.1 link-derived form: no ephemeral prefix, just iv || ciphertext || hmac."""
key = Token.generate_key() # 64 bytes for AES-256-CBC
if len(key) != 64:
fail(f"Token.generate_key returned {len(key)} bytes, want 64")
token = Token(key)
wire = token.encrypt(plaintext)
# Layout: iv(16) || ciphertext(N*16) || hmac(32)
if len(wire) < 16 + 16 + 32:
fail(f"link-derived ciphertext too short: {len(wire)}")
iv = wire[:16]
ciphertext = wire[16:-32]
hmac_bytes = wire[-32:]
if len(ciphertext) % 16 != 0:
fail(f"link-derived ciphertext body not block-aligned: {len(ciphertext)}")
decrypted = token.decrypt(wire)
if decrypted != plaintext:
fail(f"link-derived round-trip mismatch:\n in: {plaintext!r}\n out: {decrypted!r}")
print("PASS S3.1 link-derived Token form (no ephemeral prefix, iv||ct||hmac)")
return key, wire
def verify_hmac_before_aes(key, wire):
"""S3.3: HMAC verification MUST run before AES decryption.
A tampered HMAC byte should raise rather than produce malformed plaintext."""
token = Token(key)
# Flip a single bit in the HMAC region — the last 32 bytes
tampered = wire[:-1] + bytes([wire[-1] ^ 0x01])
try:
token.decrypt(tampered)
fail("S3.3 tampered HMAC was accepted — encrypt-then-MAC verification missing")
except ValueError as e:
if "HMAC" not in str(e):
fail(f"S3.3 decrypt raised but with wrong error: {e}")
# Good: HMAC mismatch raised before AES decrypt could run
# Also flip a byte in the ciphertext (HMAC stays intact in shape but
# the HMAC wouldn't match the corrupted body). Same expected outcome.
if len(wire) > 64:
tampered2 = wire[:32] + bytes([wire[32] ^ 0x01]) + wire[33:]
try:
token.decrypt(tampered2)
fail("S3.3 tampered ciphertext was accepted — HMAC-then-AES order broken")
except ValueError:
pass
print("PASS S3.3 HMAC-then-AES order (tampered ciphertext rejected at HMAC stage)")
def verify_pkcs7_padding_handled():
"""S3.2 step 6: AES-CBC PKCS#7 padding is applied automatically by the
Token; clients must NOT pad manually (would produce double padding)."""
# 1-byte plaintext: PKCS#7 will pad with 15 bytes of 0x0F
one_byte = b"x"
key = Token.generate_key()
token = Token(key)
wire = token.encrypt(one_byte)
out = token.decrypt(wire)
if out != one_byte:
fail(f"S3.2 step 6 PKCS#7 round-trip on 1B plaintext failed: {out!r}")
# 16-byte plaintext (one full block): PKCS#7 adds a full block of 0x10
sixteen = b"sixteen ABCDEFGH"
assert len(sixteen) == 16
wire = Token(key).encrypt(sixteen)
if (len(wire) - 16 - 32) != 32:
fail(f"S3.2 step 6 16B plaintext should produce 32B AES body (one + full pad block), "
f"got {len(wire) - 16 - 32}")
out = Token(key).decrypt(wire)
if out != sixteen:
fail(f"S3.2 step 6 PKCS#7 round-trip on 16B plaintext failed: {out!r}")
print("PASS S3.2 step 6 PKCS#7 padding (1B and 16B boundaries)")
def main():
print(f"verify_token_crypto.py against RNS {RNS.__version__}")
recipient, eph_pub_bytes, iv, hmac_bytes, aes_body = verify_opportunistic_encrypt_decrypt()
verify_hkdf_salt_is_identity_hash(recipient, eph_pub_bytes, iv, hmac_bytes, aes_body)
key, wire = verify_symmetric_token_form()
verify_hmac_before_aes(key, wire)
verify_pkcs7_padding_handled()
print("ALL PASS")
if __name__ == "__main__":
main()