reticiulum-specification/tools/verify_request_response.py
John Poole cd851dab87 Completed the REQUEST/RESPONSE three-tier work unit.
Added:

Tier 1 audit: audits/request-response-tier1-rns-1.2.4.md
Deterministic test-vectors/request-response.json
tools/regen_request_response.py
tools/verify_request_response.py
Corrected two material specification errors:

ALLOW_ALL = 0x01, ALLOW_LIST = 0x02; prior values were reversed.
Resource file metadata is carried inside Resource plaintext with flag x; advertisement m remains the hashmap.
Also verified packet/Resource request-ID formulas, response correlation, wrong-ID behavior, receipt states, and malformed envelopes.

Verification:

Deterministic vector SHA-256: 5cdad638…56ab2
git diff --check: pass
Portable-path scan: pass
Full pinned suite: 18 passed, 0 failed
2026-06-08 14:03:50 -07:00

174 lines
7.3 KiB
Python

"""
Verifier for SPEC.md S11 generic Link REQUEST/RESPONSE protocol.
Checks packet and Resource wire forms, distinct request-ID formulas,
response correlation behavior, malformed/wrong-ID handling, path hashes, and
authorization policy constants.
"""
from __future__ import annotations
import json
import os
import sys
import RNS
from RNS.Cryptography.Token import Token
from RNS.Link import RequestReceipt
from RNS.Resource import Resource, ResourceAdvertisement
from RNS.vendor import umsgpack
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
VECTORS_PATH = os.path.join(REPO_ROOT, "test-vectors", "request-response.json")
LINKS_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
def fail(message: str) -> None:
print(f"FAIL: {message}")
sys.exit(1)
def load_json(path: str):
with open(path, "r", encoding="utf-8") as input_file:
return json.load(input_file)
def verify_packet_forms(vector: dict, token: Token, link_id: bytes) -> None:
request = vector["packet_request"]
request_raw = bytes.fromhex(request["raw_hex"])
request_packet = RNS.Packet(None, request_raw)
if not request_packet.unpack():
fail("S11.1 packet REQUEST did not unpack")
if request_packet.context != RNS.Packet.REQUEST or request_packet.destination_hash != link_id:
fail("S11.1 packet REQUEST header mismatch")
plaintext = token.decrypt(request_packet.data)
if plaintext.hex() != request["plaintext_hex"]:
fail("S11.1 packet REQUEST decrypt mismatch")
decoded = umsgpack.unpackb(plaintext)
if decoded[1].hex() != vector["inputs"]["path_hash_hex"] or not isinstance(decoded[2], dict):
fail("S11.1 packet REQUEST path hash or single-packed data mismatch")
if request_packet.getTruncatedHash().hex() != request["request_id_hex"]:
fail("S11.1 packet request_id is not the truncated packet hash")
if RNS.Identity.truncated_hash(plaintext).hex() == request["request_id_hex"]:
fail("S11.1 fixture did not distinguish packet hash from plaintext hash")
if len(plaintext) > RNS.Link.MDU:
fail("S11.1 packet REQUEST fixture exceeds Link MDU")
response = vector["packet_response"]
response_packet = RNS.Packet(None, bytes.fromhex(response["raw_hex"]))
if not response_packet.unpack() or response_packet.context != RNS.Packet.RESPONSE:
fail("S11.2 packet RESPONSE header mismatch")
response_plaintext = token.decrypt(response_packet.data)
response_decoded = umsgpack.unpackb(response_plaintext)
if (
response_decoded[0].hex() != request["request_id_hex"]
or response_decoded[0].hex() != response["correlation_request_id_hex"]
):
fail("S11.2 packet RESPONSE did not carry packet request_id")
print("PASS S11.1/S11.2 packet REQUEST/RESPONSE wire forms and packet-hash request_id")
def verify_resource_form(vector: dict, key: str, token: Token, expect_response: bool) -> None:
resource = vector[key]
plaintext = bytes.fromhex(resource["plaintext_hex"])
request_id = bytes.fromhex(resource["request_id_hex"])
stream = b"".join(bytes.fromhex(part["body_hex"]) for part in resource["parts"])
decrypted = token.decrypt(stream)[Resource.RANDOM_HASH_SIZE :]
if decrypted != plaintext:
fail(f"S11 Resource {key} did not decrypt to packed plaintext")
adv = ResourceAdvertisement.unpack(bytes.fromhex(resource["advertisement_plaintext_hex"]))
if adv.q != request_id:
fail(f"S11 Resource {key} ADV q mismatch")
if expect_response and (not adv.p or adv.u):
fail("S11.2 Resource response flags mismatch")
if not expect_response and (not adv.u or adv.p):
fail("S11.1 Resource request flags mismatch")
if not expect_response and RNS.Identity.truncated_hash(plaintext) != request_id:
fail("S11.1 Resource request_id is not truncated plaintext hash")
if len(plaintext) <= RNS.Link.MDU:
fail(f"S11 Resource {key} fixture does not exceed Link MDU")
def verify_correlation_behavior(vector: dict) -> None:
expected_id = bytes.fromhex(vector["packet_request"]["request_id_hex"])
class Pending:
def __init__(self, request_id):
self.request_id = request_id
self.response_size = None
self.response_transfer_size = None
self.received = []
def response_received(self, response, metadata=None):
self.received.append((response, metadata))
class FakeLink:
status = RNS.Link.ACTIVE
def __init__(self):
self.pending_requests = [Pending(expected_id)]
link = FakeLink()
wrong_id = bytes(reversed(expected_id))
RNS.Link.handle_response(link, wrong_id, b"wrong", 5, 5)
if len(link.pending_requests) != 1 or link.pending_requests[0].received:
fail("S11.2 wrong request_id response was not ignored")
RNS.Link.handle_response(link, expected_id, b"right", 5, 5)
if link.pending_requests or not link.pending_requests == []:
fail("S11.2 matched response did not remove pending request")
print("PASS S11.2 wrong-ID response remains unmatched; matching response resolves pending request")
def verify_constants_and_receipt_states(vector: dict) -> None:
if (RNS.Destination.ALLOW_NONE, RNS.Destination.ALLOW_ALL, RNS.Destination.ALLOW_LIST) != (0, 1, 2):
fail("S11.4 authorization constants changed")
path = vector["inputs"]["path"]
if RNS.Identity.truncated_hash(path.encode("utf-8")).hex() != vector["inputs"]["path_hash_hex"]:
fail("S11.3 path hash mismatch")
if (RequestReceipt.FAILED, RequestReceipt.SENT, RequestReceipt.DELIVERED, RequestReceipt.RECEIVING, RequestReceipt.READY) != (0, 1, 2, 3, 4):
fail("S11.5 RequestReceipt states changed")
print("PASS S11.3/S11.4/S11.5 path hash, authorization constants, and receipt states")
def verify_malformed_envelopes() -> None:
class FakeLink:
status = RNS.Link.ACTIVE
try:
RNS.Link.handle_request(FakeLink(), bytes(16), [])
except (IndexError, TypeError):
pass
else:
fail("S11.1 malformed request envelope reached request handling")
for malformed in [b"", b"\x91\xc0"]:
try:
decoded = umsgpack.unpackb(malformed)
request_id = decoded[0]
response_data = decoded[1]
_ = (request_id, response_data)
except Exception:
continue
fail("S11.2 malformed response envelope exposed request_id and response")
print("PASS S11.1/S11.2 malformed request and response envelopes cannot be handled")
def main() -> None:
print(f"verify_request_response.py against RNS {RNS.__version__}")
vector = load_json(VECTORS_PATH)
link_vector = load_json(LINKS_PATH)["vectors"][0]
token = Token(bytes.fromhex(link_vector["expected"]["derived_key_hex"]))
link_id = bytes.fromhex(link_vector["expected"]["link_id_hex"])
verify_packet_forms(vector, token, link_id)
verify_resource_form(vector, "resource_request", token, False)
verify_resource_form(vector, "resource_response", token, True)
print("PASS S11.1/S11.2 Resource REQUEST/RESPONSE ADV correlation and plaintext-hash request_id")
verify_correlation_behavior(vector)
verify_constants_and_receipt_states(vector)
verify_malformed_envelopes()
print("ALL PASS")
if __name__ == "__main__":
main()