reticiulum-specification/tools/verify_resource.py

253 lines
9.1 KiB
Python

"""
Verifier for SPEC.md S10 (Resource fragmentation protocol).
Exercises upstream RNS 1.2.4 Resource construction with a deterministic fake
Link and verifies:
1. Default direct-LXMF single-packet content limit is 319 bytes.
2. Resource encrypts one complete stream, then slices that ciphertext into
RESOURCE packet bodies without packet-level re-encryption.
3. Resource hash, expected proof, and map-hash formulas.
4. RESOURCE_ADV fields and flags.
5. Multi-segment advertisement `d` is total logical-resource size.
6. An exhausted RESOURCE_REQ may carry part requests; upstream fulfils the
requested parts and emits RESOURCE_HMU.
7. RESOURCE_RCL is emitted for advertisement rejection, while an ordinary
receiver-side cancel is local-only.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import os
import sys
import time
import LXMF
import RNS
from LXMF.LXMessage import LXMessage
from RNS.Cryptography.Token import Token
from RNS.Resource import Resource, ResourceAdvertisement
FIXED_TOKEN_KEY = bytes(range(64))
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
class FakeLink:
"""Minimum Link surface needed by Resource and Packet construction."""
def __init__(self, mtu: int = RNS.Reticulum.MTU):
self.type = RNS.Destination.LINK
self.status = RNS.Link.ACTIVE
self.mtu = mtu
self.mdu = RNS.Link.MDU
self.hash = bytes.fromhex("00112233445566778899aabbccddeeff")
self.link_id = self.hash
self.rtt = 0.1
self.traffic_timeout_factor = 1
self.last_outbound = 0
self.tx = 0
self.txbytes = 0
self._token = Token(FIXED_TOKEN_KEY)
self.cancelled_incoming = []
self.cancelled_outgoing = []
def encrypt(self, data: bytes) -> bytes:
return self._token.encrypt(data)
def decrypt(self, data: bytes) -> bytes:
return self._token.decrypt(data)
def cancel_incoming_resource(self, resource) -> None:
self.cancelled_incoming.append(resource)
def cancel_outgoing_resource(self, resource) -> None:
self.cancelled_outgoing.append(resource)
def resource_concluded(self, resource) -> None:
pass
def verify_default_lxmf_threshold() -> None:
expected = RNS.Link.MDU - LXMessage.LXMF_OVERHEAD
if LXMessage.LINK_PACKET_MAX_CONTENT != expected:
fail(
"S10 direct-LXMF threshold formula mismatch: "
f"{LXMessage.LINK_PACKET_MAX_CONTENT} != {expected}"
)
if expected != 319:
fail(f"S10 default direct-LXMF threshold changed: got {expected}, want 319")
print("PASS S10 default direct-LXMF Resource threshold: content > 319 bytes")
def verify_preparation_and_advertisement() -> None:
link = FakeLink()
plaintext = (b"resource-verifier-" * 70) + bytes(range(64))
resource = Resource(plaintext, link, advertise=False, auto_compress=False)
stream = b"".join(part.data for part in resource.parts)
decrypted = link.decrypt(stream)
prefix = decrypted[:Resource.RANDOM_HASH_SIZE]
recovered = decrypted[Resource.RANDOM_HASH_SIZE:]
if recovered != plaintext:
fail("S10.2 whole-stream decrypt did not recover original plaintext")
if len(prefix) != Resource.RANDOM_HASH_SIZE:
fail("S10.2 throwaway prefix has wrong length")
for index, part in enumerate(resource.parts):
if part.context != RNS.Packet.RESOURCE:
fail(f"S10.3 part {index} has context {part.context:#x}, want RESOURCE")
if part.raw[19:] != part.data:
fail(f"S10.6 part {index} was packet-level encrypted or altered")
expected_hash = RNS.Identity.full_hash(plaintext + resource.random_hash)
expected_proof = RNS.Identity.full_hash(plaintext + expected_hash)
if resource.hash != expected_hash:
fail("S10.2 resource hash formula mismatch")
if resource.expected_proof != expected_proof:
fail("S10.2 expected proof formula mismatch")
for index, part in enumerate(resource.parts):
expected_map_hash = RNS.Identity.full_hash(part.data + resource.random_hash)[:Resource.MAPHASH_LEN]
if part.map_hash != expected_map_hash:
fail(f"S10.2 map-hash formula mismatch for part {index}")
adv = ResourceAdvertisement.unpack(ResourceAdvertisement(resource).pack())
if adv.t != len(stream):
fail(f"S10.4 ADV t mismatch: {adv.t} != {len(stream)}")
if adv.d != len(plaintext):
fail(f"S10.4 ADV d mismatch for single segment: {adv.d} != {len(plaintext)}")
if adv.n != len(resource.parts):
fail(f"S10.4 ADV n mismatch: {adv.n} != {len(resource.parts)}")
if adv.h != resource.hash or adv.r != resource.random_hash:
fail("S10.4 ADV hash or random-hash field mismatch")
if not adv.e or adv.c or adv.s or adv.u or adv.p or adv.x:
fail(f"S10.4 ADV flags unexpected: {adv.f:#x}")
print(
"PASS S10.2/S10.4/S10.6 whole-stream encryption, ciphertext slicing, "
"hash formulas, and ADV fields"
)
def verify_multisegment_total_size() -> None:
link = FakeLink()
logical_size = Resource.MAX_EFFICIENT_SIZE + 257
resource = Resource(b"M" * logical_size, link, advertise=False, auto_compress=False)
adv = ResourceAdvertisement.unpack(ResourceAdvertisement(resource).pack())
if resource.total_segments != 2 or not resource.split:
fail("S10.11 expected a two-segment Resource")
if adv.d != logical_size:
fail(f"S10.4 multi-segment ADV d is {adv.d}, want total logical size {logical_size}")
if resource.uncompressed_size >= logical_size:
fail("S10.4 first-segment plaintext unexpectedly equals total logical size")
print(
"PASS S10.4/S10.11 multi-segment ADV d is total logical-resource size, "
"not current-segment plaintext size"
)
def verify_exhausted_request_with_parts() -> None:
link = FakeLink()
payload_size = ResourceAdvertisement.HASHMAP_MAX_LEN * (link.mtu - RNS.Reticulum.HEADER_MAXSIZE - RNS.Reticulum.IFAC_MIN_SIZE)
resource = Resource(b"R" * payload_size, link, advertise=False, auto_compress=False)
if len(resource.parts) <= ResourceAdvertisement.HASHMAP_MAX_LEN:
fail("S10.7 fixture did not produce a multi-segment hashmap")
resource.status = Resource.TRANSFERRING
resource.adv_sent = time.time()
resource.rtt = 0.1
requested_part = resource.parts[0]
last_known_index = ResourceAdvertisement.HASHMAP_MAX_LEN - 1
last_known_hash = resource.parts[last_known_index].map_hash
request_data = (
bytes([Resource.HASHMAP_IS_EXHAUSTED])
+ last_known_hash
+ resource.hash
+ requested_part.map_hash
)
captured: list[RNS.Packet] = []
real_outbound = RNS.Transport.outbound
def fake_outbound(packet):
captured.append(packet)
return True
RNS.Transport.outbound = staticmethod(fake_outbound)
try:
resource.request(request_data)
finally:
RNS.Transport.outbound = real_outbound
contexts = [packet.context for packet in captured]
if RNS.Packet.RESOURCE not in contexts:
fail("S10.7 exhausted RESOURCE_REQ did not fulfil bundled part request")
if RNS.Packet.RESOURCE_HMU not in contexts:
fail("S10.7 exhausted RESOURCE_REQ did not emit RESOURCE_HMU")
print("PASS S10.7 exhausted RESOURCE_REQ fulfils bundled parts and emits RESOURCE_HMU")
def verify_receiver_reject_vs_cancel() -> None:
link = FakeLink()
outgoing = Resource(b"reject-me", link, advertise=False, auto_compress=False)
advertisement_plaintext = ResourceAdvertisement(outgoing).pack()
class AdvertisementPacket:
plaintext = advertisement_plaintext
link = None
AdvertisementPacket.link = link
captured: list[RNS.Packet] = []
real_outbound = RNS.Transport.outbound
def fake_outbound(packet):
captured.append(packet)
return True
RNS.Transport.outbound = staticmethod(fake_outbound)
try:
Resource.reject(AdvertisementPacket)
if [packet.context for packet in captured] != [RNS.Packet.RESOURCE_RCL]:
fail("S10.9 advertisement rejection did not emit exactly one RESOURCE_RCL")
captured.clear()
incoming = Resource(None, link)
incoming.status = Resource.TRANSFERRING
incoming.initiator = False
incoming.callback = None
incoming.cancel()
if captured:
fail("S10.9 ordinary receiver-side cancel unexpectedly emitted a packet")
if incoming not in link.cancelled_incoming:
fail("S10.9 ordinary receiver-side cancel did not remove incoming Resource")
finally:
RNS.Transport.outbound = real_outbound
print("PASS S10.9 RESOURCE_RCL rejects advertisements; ordinary receiver cancel is local-only")
def main() -> None:
print(f"verify_resource.py against RNS {RNS.__version__} / LXMF {LXMF.__version__}")
verify_default_lxmf_threshold()
verify_preparation_and_advertisement()
verify_multisegment_total_size()
verify_exhausted_request_with_parts()
verify_receiver_reject_vs_cancel()
print("ALL PASS")
if __name__ == "__main__":
main()