Add three high-value verifiers: token crypto, announce, LXMF opportunistic
tools/verify_token_crypto.py — locks in §3:
- Opportunistic Token encrypt/decrypt round-trip with full
ephemeral_pub(32) || iv(16) || aes(...) || hmac(32) layout check.
- HKDF salt = recipient.identity_hash verified by re-deriving
the key by hand and confirming decrypt succeeds.
- Link-derived Token form (no eph_pub prefix) round-trip.
- HMAC-then-AES order proven by tampering each region: HMAC
mismatch raises before AES decrypt.
- PKCS#7 padding boundaries (1B and 16B plaintexts).
tools/verify_announce_roundtrip.py — locks in §4 + §4.5:
- Build via upstream Destination.announce(send=False).
- Body layout walk with context_flag branching for the optional
ratchet slot.
- signed_data reconstruction per §4.2 with empty-bytes-not-absent
ratchet rule.
- dest_hash recompute per §1.2.
- random_hash[5:10] is a recent unix_seconds timestamp per §4.1
(corrected — confirms upstream emits the timestamp half).
- Upstream validate_announce accepts.
- Tamper detection: bit-flips in signature, public_key, name_hash,
random_hash, app_data are all rejected.
tools/verify_lxmf_opportunistic.py — locks in §5.1, §5.2, §5.5, §5.6
plus §3 layered correctly:
- Two identities (Alice, Bob) with mutual discovery.
- LXMessage build with title, content, fields.
- Body layout: dest(16) || src(16) || sig(64) || msgpack.
- Opportunistic-form strip of leading dest_hash before encryption.
- Encrypt to Bob via Token, decrypt as Bob, byte-identical
round-trip.
- Re-prepend dest_hash and run unpack_from_bytes; confirms
signature_validated=True and title/content/fields preserved.
All three pass against RNS 1.2.0 / LXMF 0.9.6.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
282d5d59eb
commit
75169b0631
3 changed files with 577 additions and 0 deletions
205
tools/verify_announce_roundtrip.py
Normal file
205
tools/verify_announce_roundtrip.py
Normal file
|
|
@ -0,0 +1,205 @@
|
|||
"""
|
||||
Verifier for SPEC.md S4 (announce wire format) and S4.5 (validation rules).
|
||||
|
||||
Builds an announce via upstream `Destination.announce(send=False)` against a
|
||||
fresh identity, then walks the validation pipeline by hand:
|
||||
|
||||
1. Slice the announce body per S4.1, branched on context_flag.
|
||||
2. Reconstruct signed_data per S4.2 (including the trailing app_data
|
||||
and ratchet pub when present).
|
||||
3. Verify the Ed25519 signature.
|
||||
4. Recompute dest_hash = SHA256(name_hash || identity_hash)[:16] and
|
||||
confirm it matches the outer packet header.
|
||||
5. Confirm random_hash[5:10] is a recent unix-seconds timestamp per
|
||||
S4.1 (corrected) — the upstream Python form, not the
|
||||
microReticulum random-only form.
|
||||
6. Run upstream RNS.Identity.validate_announce(packet) and confirm
|
||||
it accepts.
|
||||
7. Tamper with each of: signature, public_key, name_hash, random_hash,
|
||||
ratchet_pub, app_data — and confirm validate_announce rejects each.
|
||||
|
||||
Exit code 0 on PASS, non-zero on FAIL.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import RNS
|
||||
|
||||
|
||||
def fail(msg: str) -> None:
|
||||
print(f"FAIL: {msg}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def init_minimal_rns():
|
||||
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-announce-")
|
||||
cfg_path = os.path.join(cfg_dir, "config")
|
||||
with open(cfg_path, "w", encoding="utf-8") as f:
|
||||
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
|
||||
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
|
||||
|
||||
|
||||
def build_announce(identity, app_data=None):
|
||||
"""Build an announce via upstream and return (packet, raw bytes after pack)."""
|
||||
dest = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||
"verify_announce", "test")
|
||||
pkt = dest.announce(app_data=app_data, send=False)
|
||||
pkt.pack()
|
||||
return dest, pkt
|
||||
|
||||
|
||||
def verify_body_layout_and_signature(dest, pkt):
|
||||
"""S4.1 + S4.2 + S4.5 by-hand walk."""
|
||||
raw = pkt.raw
|
||||
flags = raw[0]
|
||||
hops = raw[1]
|
||||
dest_hash_in_header = raw[2:18]
|
||||
context = raw[18]
|
||||
|
||||
# Validate that the outer header's dest_hash matches our destination
|
||||
if dest_hash_in_header != dest.hash:
|
||||
fail(f"dest_hash in header != Destination.hash:\n"
|
||||
f" header: {dest_hash_in_header.hex()}\n"
|
||||
f" dest: {dest.hash.hex()}")
|
||||
|
||||
context_flag = (flags >> 5) & 0x01
|
||||
|
||||
body = raw[19:]
|
||||
KEYSIZE = 64
|
||||
NAME_HASH = 10
|
||||
RANDOM_HASH = 10
|
||||
RATCHET = 32
|
||||
SIG = 64
|
||||
|
||||
public_key = body[0:KEYSIZE]
|
||||
name_hash = body[KEYSIZE:KEYSIZE+NAME_HASH]
|
||||
random_hash = body[KEYSIZE+NAME_HASH:KEYSIZE+NAME_HASH+RANDOM_HASH]
|
||||
|
||||
# Branch on context_flag for ratchet slot
|
||||
if context_flag == 1:
|
||||
ratchet = body[KEYSIZE+NAME_HASH+RANDOM_HASH:KEYSIZE+NAME_HASH+RANDOM_HASH+RATCHET]
|
||||
sig_start = KEYSIZE+NAME_HASH+RANDOM_HASH+RATCHET
|
||||
else:
|
||||
ratchet = b""
|
||||
sig_start = KEYSIZE+NAME_HASH+RANDOM_HASH
|
||||
|
||||
signature = body[sig_start:sig_start+SIG]
|
||||
app_data = body[sig_start+SIG:]
|
||||
|
||||
# signed_data per S4.2: dest_hash || pub || name_hash || random_hash || ratchet || app_data
|
||||
signed_data = dest.hash + public_key + name_hash + random_hash + ratchet + app_data
|
||||
|
||||
# Verify Ed25519 signature using the announced identity's pub key
|
||||
announced = RNS.Identity(create_keys=False)
|
||||
announced.load_public_key(public_key)
|
||||
if not announced.validate(signature, signed_data):
|
||||
fail("S4.2 hand-rebuilt signed_data did not validate signature — "
|
||||
"spec layout doesn't match upstream emission")
|
||||
|
||||
# S1.2: dest_hash recompute
|
||||
expected = hashlib.sha256(name_hash + announced.hash).digest()[:16]
|
||||
if expected != dest.hash:
|
||||
fail(f"S1.2 dest_hash recompute mismatch:\n"
|
||||
f" expected: {expected.hex()}\n"
|
||||
f" actual: {dest.hash.hex()}")
|
||||
|
||||
# S4.1 corrected: random_hash[5:10] is BE-uint40 unix_seconds
|
||||
import time
|
||||
ts = int.from_bytes(random_hash[5:10], "big")
|
||||
now = int(time.time())
|
||||
if abs(now - ts) > 10:
|
||||
fail(f"S4.1 random_hash timestamp half is not a recent unix_seconds value:\n"
|
||||
f" random_hash[5:10] -> {ts}\n"
|
||||
f" now -> {now}")
|
||||
|
||||
print("PASS S4.1/4.2 announce body layout + signature + dest_hash recompute")
|
||||
print(f" context_flag={context_flag} ratchet={'yes' if ratchet else 'no'} "
|
||||
f"app_data={len(app_data)}B ts_skew={now-ts}s")
|
||||
|
||||
return public_key, name_hash, random_hash, ratchet, signature, app_data
|
||||
|
||||
|
||||
def verify_upstream_validate_announce(pkt):
|
||||
"""S4.5: upstream validate_announce accepts a freshly built announce."""
|
||||
if not pkt.unpack():
|
||||
fail("Packet.unpack returned False on a fresh announce")
|
||||
if not RNS.Identity.validate_announce(pkt):
|
||||
fail("RNS.Identity.validate_announce rejected a freshly built announce")
|
||||
print("PASS S4.5 RNS.Identity.validate_announce accepts upstream-built announce")
|
||||
|
||||
|
||||
def verify_tamper_detection(dest, app_data=b"some_app_data"):
|
||||
"""S4.5: confirm validate_announce rejects bit-flips in each field."""
|
||||
# Helper to build a fresh announce, tamper, re-unpack, and validate.
|
||||
# Each call builds a fresh announce_packet from the same Destination,
|
||||
# which is fine — Destination.announce(send=False) doesn't re-register.
|
||||
def tampered_validate(mutator, label):
|
||||
pkt = dest.announce(app_data=app_data, send=False)
|
||||
pkt.pack()
|
||||
original_raw = pkt.raw
|
||||
new_raw = mutator(original_raw)
|
||||
# Build a new Packet from the tampered raw bytes via Transport.inbound's
|
||||
# pattern — Packet(None, raw) + unpack
|
||||
tpkt = RNS.Packet(None, new_raw)
|
||||
if not tpkt.unpack():
|
||||
# Some tampering corrupts framing enough that unpack returns False.
|
||||
# Either way, the announce is rejected — that's the desired outcome.
|
||||
return
|
||||
if RNS.Identity.validate_announce(tpkt):
|
||||
fail(f"S4.5 tampered announce ({label}) was accepted")
|
||||
|
||||
# 1. Flip a signature byte (signature lives near the end of the body)
|
||||
def tamper_signature(raw):
|
||||
# signature = body[..., -64-len(app_data):-len(app_data)] in the
|
||||
# ratchet-present case. Easier: flip 1 byte 100 from end (well inside sig).
|
||||
return raw[:-100] + bytes([raw[-100] ^ 0x01]) + raw[-99:]
|
||||
tampered_validate(tamper_signature, "signature byte flipped")
|
||||
|
||||
# 2. Flip a public_key byte (offset 19 = first byte of pub in HEADER_1 announce)
|
||||
def tamper_pubkey(raw):
|
||||
return raw[:19] + bytes([raw[19] ^ 0x01]) + raw[20:]
|
||||
tampered_validate(tamper_pubkey, "public_key first byte flipped")
|
||||
|
||||
# 3. Flip a name_hash byte (offset 19 + 64 = 83, first name_hash byte)
|
||||
def tamper_namehash(raw):
|
||||
return raw[:83] + bytes([raw[83] ^ 0x01]) + raw[84:]
|
||||
tampered_validate(tamper_namehash, "name_hash first byte flipped")
|
||||
|
||||
# 4. Flip a random_hash byte (offset 19 + 64 + 10 = 93, first random_hash byte)
|
||||
def tamper_randomhash(raw):
|
||||
return raw[:93] + bytes([raw[93] ^ 0x01]) + raw[94:]
|
||||
tampered_validate(tamper_randomhash, "random_hash first byte flipped")
|
||||
|
||||
# 5. Flip an app_data byte (last byte)
|
||||
def tamper_appdata(raw):
|
||||
return raw[:-1] + bytes([raw[-1] ^ 0x01])
|
||||
tampered_validate(tamper_appdata, "app_data last byte flipped")
|
||||
|
||||
print("PASS S4.5 validate_announce rejects tampered signature / pub / name_hash / "
|
||||
"random_hash / app_data")
|
||||
|
||||
|
||||
def main():
|
||||
print(f"verify_announce_roundtrip.py against RNS {RNS.__version__}")
|
||||
rns_instance = init_minimal_rns()
|
||||
try:
|
||||
identity = RNS.Identity()
|
||||
dest, pkt = build_announce(identity, app_data=b"hello-app-data")
|
||||
verify_body_layout_and_signature(dest, pkt)
|
||||
verify_upstream_validate_announce(pkt)
|
||||
verify_tamper_detection(dest)
|
||||
finally:
|
||||
try:
|
||||
RNS.Reticulum.exit_handler()
|
||||
except Exception:
|
||||
pass
|
||||
print("ALL PASS")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
166
tools/verify_lxmf_opportunistic.py
Normal file
166
tools/verify_lxmf_opportunistic.py
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
"""
|
||||
Verifier for SPEC.md S5 (LXMF wire format) — opportunistic delivery path.
|
||||
|
||||
Full end-to-end round-trip:
|
||||
|
||||
1. Build two identities (Alice, Bob) and their `lxmf.delivery`
|
||||
destinations.
|
||||
2. Make Bob's identity discoverable to Alice by simulating an
|
||||
announce reception (RNS.Identity.remember).
|
||||
3. Construct an LXMF message from Alice to Bob with method =
|
||||
OPPORTUNISTIC.
|
||||
4. Pack the LXMF body per S5.1: dest_hash || src_hash ||
|
||||
signature(64) || msgpack_payload, then strip the leading
|
||||
dest_hash for opportunistic wire form.
|
||||
5. Confirm the wire body length and structure match S5.1.
|
||||
6. Encrypt to Bob via Token (S3.1 opportunistic form).
|
||||
7. Decrypt as Bob and confirm the plaintext matches the
|
||||
pre-encryption packed bytes.
|
||||
8. Re-prepend Bob's dest_hash to obtain the canonical Link-form
|
||||
LXMF body (S5.2), then parse via LXMessage.unpack_from_bytes.
|
||||
9. Confirm the parsed message has signature_validated == True,
|
||||
correct source_hash and destination_hash, and that
|
||||
content / title / fields round-trip.
|
||||
|
||||
Exit code 0 on PASS, non-zero on FAIL.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import RNS
|
||||
import LXMF
|
||||
from LXMF.LXMessage import LXMessage
|
||||
|
||||
|
||||
def fail(msg: str) -> None:
|
||||
print(f"FAIL: {msg}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def init_minimal_rns():
|
||||
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-lxmf-")
|
||||
cfg_path = os.path.join(cfg_dir, "config")
|
||||
with open(cfg_path, "w", encoding="utf-8") as f:
|
||||
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
|
||||
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
|
||||
|
||||
|
||||
def main():
|
||||
print(f"verify_lxmf_opportunistic.py against RNS {RNS.__version__} / "
|
||||
f"LXMF {LXMF.__version__}")
|
||||
init_minimal_rns()
|
||||
try:
|
||||
# --- 1, 2: identities, destinations, mutual discovery
|
||||
alice_id = RNS.Identity()
|
||||
bob_id = RNS.Identity()
|
||||
|
||||
alice_dest = RNS.Destination(alice_id, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||
"lxmf", "delivery")
|
||||
bob_dest = RNS.Destination(bob_id, RNS.Destination.IN, RNS.Destination.SINGLE,
|
||||
"lxmf", "delivery")
|
||||
|
||||
# Make Bob's identity recallable to Alice
|
||||
RNS.Identity.remember(b"\x00"*32, bob_dest.hash, bob_id.get_public_key(), None)
|
||||
|
||||
# Alice's outbound destination to Bob (this is what LXMessage uses
|
||||
# for encryption; it must hold Bob's public key).
|
||||
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE,
|
||||
"lxmf", "delivery")
|
||||
|
||||
# --- 3: construct LXMessage
|
||||
title = "test title"
|
||||
content = "hello reticulum from alice"
|
||||
fields = {0xFF: "diag-marker"}
|
||||
lxm = LXMessage(
|
||||
destination = bob_dest_out,
|
||||
source = alice_dest,
|
||||
content = content.encode("utf-8"),
|
||||
title = title.encode("utf-8"),
|
||||
fields = fields,
|
||||
desired_method = LXMessage.OPPORTUNISTIC,
|
||||
)
|
||||
|
||||
# --- 4: pack
|
||||
lxm.pack()
|
||||
|
||||
# S5.5 message_id = SHA256(dest_hash || src_hash || msgpack_payload)
|
||||
# which lxm.hash already exposes.
|
||||
if lxm.hash is None or len(lxm.hash) != 32:
|
||||
fail(f"S5.5 message_id missing or wrong length: {len(lxm.hash) if lxm.hash else None}")
|
||||
|
||||
# S5.1/5.2: lxm.packed = dest(16) || src(16) || sig(64) || msgpack
|
||||
packed = lxm.packed
|
||||
if packed[:16] != bob_dest.hash:
|
||||
fail(f"S5.2 dest_hash slot mismatch:\n got: {packed[:16].hex()}\n"
|
||||
f" want: {bob_dest.hash.hex()}")
|
||||
if packed[16:32] != alice_dest.hash:
|
||||
fail(f"S5.2 src_hash slot mismatch:\n got: {packed[16:32].hex()}\n"
|
||||
f" want: {alice_dest.hash.hex()}")
|
||||
if len(packed[32:96]) != 64:
|
||||
fail(f"S5.2 signature slot is {len(packed[32:96])} bytes, want 64")
|
||||
|
||||
print("PASS S5.1/5.2 LXMF body layout: dest(16) || src(16) || sig(64) || msgpack")
|
||||
|
||||
# --- 5: opportunistic wire form strips the leading dest_hash
|
||||
# See S5.1 and LXMessage.__as_packet line 631:
|
||||
# RNS.Packet(dest, self.packed[16:]) for OPPORTUNISTIC
|
||||
opp_wire_plaintext = packed[16:] # what gets fed to Token encrypt
|
||||
if opp_wire_plaintext[:16] != alice_dest.hash:
|
||||
fail("S5.1 opportunistic-stripped form must start with src_hash")
|
||||
|
||||
# --- 6: encrypt to Bob via Token (this is what Packet.pack does)
|
||||
ciphertext = bob_dest_out.encrypt(opp_wire_plaintext)
|
||||
# Wire-layout sanity: ephemeral_pub(32) || iv(16) || aes(...) || hmac(32)
|
||||
if len(ciphertext) < 32 + 16 + 16 + 32:
|
||||
fail(f"opportunistic ciphertext too short: {len(ciphertext)} bytes")
|
||||
if (len(ciphertext) - 32 - 16 - 32) % 16 != 0:
|
||||
fail("opportunistic ciphertext AES body not block-aligned")
|
||||
|
||||
# --- 7: decrypt as Bob
|
||||
# bob_dest is the IN-direction destination for Bob; it can decrypt.
|
||||
decrypted = bob_dest.decrypt(ciphertext)
|
||||
if decrypted != opp_wire_plaintext:
|
||||
fail("S3 round-trip mismatch on opportunistic LXMF body")
|
||||
print("PASS S3 + S5.1 opportunistic encrypt/decrypt round-trip")
|
||||
|
||||
# --- 8: re-prepend dest_hash to feed unpack_from_bytes
|
||||
full_lxmf = bob_dest.hash + decrypted
|
||||
|
||||
# --- 9: parse and validate
|
||||
# Make Alice's identity recallable to Bob too so signature
|
||||
# validation can find her public key.
|
||||
RNS.Identity.remember(b"\x00"*32, alice_dest.hash, alice_id.get_public_key(), None)
|
||||
|
||||
parsed = LXMessage.unpack_from_bytes(full_lxmf)
|
||||
if not parsed.signature_validated:
|
||||
fail(f"S5.5/5.6 unpack_from_bytes did not validate signature: "
|
||||
f"unverified_reason={getattr(parsed, 'unverified_reason', None)}")
|
||||
if parsed.destination_hash != bob_dest.hash:
|
||||
fail(f"parsed dest_hash mismatch: {parsed.destination_hash.hex()} vs {bob_dest.hash.hex()}")
|
||||
if parsed.source_hash != alice_dest.hash:
|
||||
fail(f"parsed src_hash mismatch: {parsed.source_hash.hex()} vs {alice_dest.hash.hex()}")
|
||||
if parsed.title_as_string() != title:
|
||||
fail(f"parsed title mismatch: {parsed.title_as_string()!r} vs {title!r}")
|
||||
if parsed.content_as_string() != content:
|
||||
fail(f"parsed content mismatch: {parsed.content_as_string()!r} vs {content!r}")
|
||||
if parsed.fields != fields:
|
||||
fail(f"parsed fields mismatch: {parsed.fields!r} vs {fields!r}")
|
||||
|
||||
print(f"PASS S5.5/5.6 unpack_from_bytes round-trip: title/content/fields preserved, "
|
||||
f"signature_validated=True")
|
||||
|
||||
finally:
|
||||
try:
|
||||
RNS.Reticulum.exit_handler()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print("ALL PASS")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
206
tools/verify_token_crypto.py
Normal file
206
tools/verify_token_crypto.py
Normal file
|
|
@ -0,0 +1,206 @@
|
|||
"""
|
||||
Verifier for SPEC.md S3 (Token cryptography).
|
||||
|
||||
Exercises the modified-Fernet Token construction in two directions
|
||||
against upstream RNS 1.2.0:
|
||||
|
||||
1. Identity-style encrypt (with ephemeral X25519 prefix) per S3.1
|
||||
opportunistic form. Round-trips a known plaintext through
|
||||
RNS.Identity.encrypt -> RNS.Identity.decrypt.
|
||||
|
||||
2. Symmetric Token encrypt/decrypt (no ephemeral prefix) per S3.1
|
||||
link-derived form. Builds a fresh symmetric key, encrypts a
|
||||
known plaintext, validates the wire layout against the spec,
|
||||
and round-trips back through Token.decrypt.
|
||||
|
||||
3. HMAC-then-AES order check (S3.3): a tampered HMAC byte is
|
||||
detected before AES decryption is attempted, so the function
|
||||
raises on HMAC failure rather than returning a malformed
|
||||
plaintext.
|
||||
|
||||
4. HKDF salt = identity_hash check (S3.2): re-derive the
|
||||
encryption key by hand using HKDF over the ECDH shared secret
|
||||
with salt = recipient identity_hash, and confirm the resulting
|
||||
key matches the one upstream uses to encrypt.
|
||||
|
||||
Exit code 0 on PASS, non-zero on FAIL.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
import RNS
|
||||
from RNS.Cryptography.Token import Token
|
||||
from RNS.Cryptography.HKDF import hkdf
|
||||
|
||||
|
||||
def fail(msg: str) -> None:
|
||||
print(f"FAIL: {msg}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def verify_opportunistic_encrypt_decrypt():
|
||||
"""S3.2 / S3.3: identity-style encrypt with ephemeral pub prefix,
|
||||
HKDF derived from ECDH(ephemeral, recipient.X25519_pub) with
|
||||
salt = recipient.identity_hash."""
|
||||
recipient = RNS.Identity()
|
||||
plaintext = b"hello, reticulum"
|
||||
|
||||
# Encrypt to recipient's identity. This builds an ephemeral X25519
|
||||
# keypair internally, does ECDH, derives the Token key, and emits
|
||||
# ephemeral_pub(32) || iv(16) || aes_ciphertext(...) || hmac(32)
|
||||
ciphertext = recipient.encrypt(plaintext)
|
||||
|
||||
# Wire-layout sanity:
|
||||
if len(ciphertext) < 32 + 16 + 16 + 32:
|
||||
fail(f"opportunistic ciphertext too short: {len(ciphertext)} bytes")
|
||||
eph_pub_bytes = ciphertext[:32]
|
||||
iv = ciphertext[32:48]
|
||||
hmac = ciphertext[-32:]
|
||||
aes_body = ciphertext[48:-32]
|
||||
if len(aes_body) % 16 != 0:
|
||||
fail(f"AES body not block-aligned: {len(aes_body)} bytes (must be multiple of 16)")
|
||||
if len(iv) != 16:
|
||||
fail(f"IV is {len(iv)} bytes, want 16")
|
||||
if len(hmac) != 32:
|
||||
fail(f"HMAC is {len(hmac)} bytes, want 32")
|
||||
|
||||
# Round-trip through decrypt — uses recipient's long-term X25519
|
||||
# private key (no ratchets configured on this fresh identity).
|
||||
decrypted = recipient.decrypt(ciphertext)
|
||||
if decrypted != plaintext:
|
||||
fail(f"opportunistic round-trip mismatch:\n"
|
||||
f" plaintext: {plaintext!r}\n"
|
||||
f" decrypted: {decrypted!r}")
|
||||
|
||||
print("PASS S3.1/3.2/3.3 opportunistic Token encrypt/decrypt round-trip")
|
||||
return recipient, eph_pub_bytes, iv, hmac, aes_body
|
||||
|
||||
|
||||
def verify_hkdf_salt_is_identity_hash(recipient, eph_pub_bytes, iv, hmac_bytes, aes_body):
|
||||
"""S3.2: confirm HKDF salt is the recipient's 16-byte identity_hash,
|
||||
not the dest_hash or ratchet_pub or anything else."""
|
||||
# We can't observe the exact ephemeral private key (it was generated
|
||||
# inside RNS.Identity.encrypt). But we CAN take the recipient's
|
||||
# private key and the captured ephemeral_pub, perform ECDH from the
|
||||
# recipient's side, derive the Token key under salt = identity_hash,
|
||||
# and confirm decrypt succeeds — equivalent to asserting the salt.
|
||||
# Use the RNS-level X25519 dispatcher so the resulting public-key
|
||||
# object matches whichever provider (proxy vs fallback) the recipient's
|
||||
# private key uses — exchange() requires both sides be the same kind.
|
||||
from RNS.Cryptography import X25519PublicKey
|
||||
|
||||
eph_pub = X25519PublicKey.from_public_bytes(eph_pub_bytes)
|
||||
shared = recipient.prv.exchange(eph_pub)
|
||||
|
||||
derived = hkdf(
|
||||
length=64,
|
||||
derive_from=shared,
|
||||
salt=recipient.hash, # <-- the 16-byte identity_hash
|
||||
context=None,
|
||||
)
|
||||
|
||||
# Reconstruct the Token from the derived key and verify HMAC + decrypt
|
||||
token = Token(derived)
|
||||
body = iv + aes_body + hmac_bytes
|
||||
plaintext = token.decrypt(body)
|
||||
if plaintext != b"hello, reticulum":
|
||||
fail(f"HKDF-salt-by-hand decrypt mismatch: {plaintext!r}")
|
||||
|
||||
print("PASS S3.2 HKDF salt is recipient.identity_hash (decrypt succeeds with hand-derived key)")
|
||||
|
||||
|
||||
def verify_symmetric_token_form(plaintext=b"link DATA payload"):
|
||||
"""S3.1 link-derived form: no ephemeral prefix, just iv || ciphertext || hmac."""
|
||||
key = Token.generate_key() # 64 bytes for AES-256-CBC
|
||||
if len(key) != 64:
|
||||
fail(f"Token.generate_key returned {len(key)} bytes, want 64")
|
||||
|
||||
token = Token(key)
|
||||
wire = token.encrypt(plaintext)
|
||||
|
||||
# Layout: iv(16) || ciphertext(N*16) || hmac(32)
|
||||
if len(wire) < 16 + 16 + 32:
|
||||
fail(f"link-derived ciphertext too short: {len(wire)}")
|
||||
iv = wire[:16]
|
||||
ciphertext = wire[16:-32]
|
||||
hmac_bytes = wire[-32:]
|
||||
if len(ciphertext) % 16 != 0:
|
||||
fail(f"link-derived ciphertext body not block-aligned: {len(ciphertext)}")
|
||||
|
||||
decrypted = token.decrypt(wire)
|
||||
if decrypted != plaintext:
|
||||
fail(f"link-derived round-trip mismatch:\n in: {plaintext!r}\n out: {decrypted!r}")
|
||||
|
||||
print("PASS S3.1 link-derived Token form (no ephemeral prefix, iv||ct||hmac)")
|
||||
return key, wire
|
||||
|
||||
|
||||
def verify_hmac_before_aes(key, wire):
|
||||
"""S3.3: HMAC verification MUST run before AES decryption.
|
||||
A tampered HMAC byte should raise rather than produce malformed plaintext."""
|
||||
token = Token(key)
|
||||
|
||||
# Flip a single bit in the HMAC region — the last 32 bytes
|
||||
tampered = wire[:-1] + bytes([wire[-1] ^ 0x01])
|
||||
|
||||
try:
|
||||
token.decrypt(tampered)
|
||||
fail("S3.3 tampered HMAC was accepted — encrypt-then-MAC verification missing")
|
||||
except ValueError as e:
|
||||
if "HMAC" not in str(e):
|
||||
fail(f"S3.3 decrypt raised but with wrong error: {e}")
|
||||
# Good: HMAC mismatch raised before AES decrypt could run
|
||||
|
||||
# Also flip a byte in the ciphertext (HMAC stays intact in shape but
|
||||
# the HMAC wouldn't match the corrupted body). Same expected outcome.
|
||||
if len(wire) > 64:
|
||||
tampered2 = wire[:32] + bytes([wire[32] ^ 0x01]) + wire[33:]
|
||||
try:
|
||||
token.decrypt(tampered2)
|
||||
fail("S3.3 tampered ciphertext was accepted — HMAC-then-AES order broken")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
print("PASS S3.3 HMAC-then-AES order (tampered ciphertext rejected at HMAC stage)")
|
||||
|
||||
|
||||
def verify_pkcs7_padding_handled():
|
||||
"""S3.2 step 6: AES-CBC PKCS#7 padding is applied automatically by the
|
||||
Token; clients must NOT pad manually (would produce double padding)."""
|
||||
# 1-byte plaintext: PKCS#7 will pad with 15 bytes of 0x0F
|
||||
one_byte = b"x"
|
||||
key = Token.generate_key()
|
||||
token = Token(key)
|
||||
wire = token.encrypt(one_byte)
|
||||
out = token.decrypt(wire)
|
||||
if out != one_byte:
|
||||
fail(f"S3.2 step 6 PKCS#7 round-trip on 1B plaintext failed: {out!r}")
|
||||
|
||||
# 16-byte plaintext (one full block): PKCS#7 adds a full block of 0x10
|
||||
sixteen = b"sixteen ABCDEFGH"
|
||||
assert len(sixteen) == 16
|
||||
wire = Token(key).encrypt(sixteen)
|
||||
if (len(wire) - 16 - 32) != 32:
|
||||
fail(f"S3.2 step 6 16B plaintext should produce 32B AES body (one + full pad block), "
|
||||
f"got {len(wire) - 16 - 32}")
|
||||
out = Token(key).decrypt(wire)
|
||||
if out != sixteen:
|
||||
fail(f"S3.2 step 6 PKCS#7 round-trip on 16B plaintext failed: {out!r}")
|
||||
|
||||
print("PASS S3.2 step 6 PKCS#7 padding (1B and 16B boundaries)")
|
||||
|
||||
|
||||
def main():
|
||||
print(f"verify_token_crypto.py against RNS {RNS.__version__}")
|
||||
recipient, eph_pub_bytes, iv, hmac_bytes, aes_body = verify_opportunistic_encrypt_decrypt()
|
||||
verify_hkdf_salt_is_identity_hash(recipient, eph_pub_bytes, iv, hmac_bytes, aes_body)
|
||||
key, wire = verify_symmetric_token_form()
|
||||
verify_hmac_before_aes(key, wire)
|
||||
verify_pkcs7_padding_handled()
|
||||
print("ALL PASS")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue