Added: Tier 1 audit: audits/request-response-tier1-rns-1.2.4.md Deterministic test-vectors/request-response.json tools/regen_request_response.py tools/verify_request_response.py Corrected two material specification errors: ALLOW_ALL = 0x01, ALLOW_LIST = 0x02; prior values were reversed. Resource file metadata is carried inside Resource plaintext with flag x; advertisement m remains the hashmap. Also verified packet/Resource request-ID formulas, response correlation, wrong-ID behavior, receipt states, and malformed envelopes. Verification: Deterministic vector SHA-256: 5cdad638…56ab2 git diff --check: pass Portable-path scan: pass Full pinned suite: 18 passed, 0 failed
174 lines
7.3 KiB
Python
174 lines
7.3 KiB
Python
"""
|
|
Verifier for SPEC.md S11 generic Link REQUEST/RESPONSE protocol.
|
|
|
|
Checks packet and Resource wire forms, distinct request-ID formulas,
|
|
response correlation behavior, malformed/wrong-ID handling, path hashes, and
|
|
authorization policy constants.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
import RNS
|
|
from RNS.Cryptography.Token import Token
|
|
from RNS.Link import RequestReceipt
|
|
from RNS.Resource import Resource, ResourceAdvertisement
|
|
from RNS.vendor import umsgpack
|
|
|
|
|
|
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
VECTORS_PATH = os.path.join(REPO_ROOT, "test-vectors", "request-response.json")
|
|
LINKS_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
print(f"FAIL: {message}")
|
|
sys.exit(1)
|
|
|
|
|
|
def load_json(path: str):
|
|
with open(path, "r", encoding="utf-8") as input_file:
|
|
return json.load(input_file)
|
|
|
|
|
|
def verify_packet_forms(vector: dict, token: Token, link_id: bytes) -> None:
|
|
request = vector["packet_request"]
|
|
request_raw = bytes.fromhex(request["raw_hex"])
|
|
request_packet = RNS.Packet(None, request_raw)
|
|
if not request_packet.unpack():
|
|
fail("S11.1 packet REQUEST did not unpack")
|
|
if request_packet.context != RNS.Packet.REQUEST or request_packet.destination_hash != link_id:
|
|
fail("S11.1 packet REQUEST header mismatch")
|
|
plaintext = token.decrypt(request_packet.data)
|
|
if plaintext.hex() != request["plaintext_hex"]:
|
|
fail("S11.1 packet REQUEST decrypt mismatch")
|
|
decoded = umsgpack.unpackb(plaintext)
|
|
if decoded[1].hex() != vector["inputs"]["path_hash_hex"] or not isinstance(decoded[2], dict):
|
|
fail("S11.1 packet REQUEST path hash or single-packed data mismatch")
|
|
if request_packet.getTruncatedHash().hex() != request["request_id_hex"]:
|
|
fail("S11.1 packet request_id is not the truncated packet hash")
|
|
if RNS.Identity.truncated_hash(plaintext).hex() == request["request_id_hex"]:
|
|
fail("S11.1 fixture did not distinguish packet hash from plaintext hash")
|
|
if len(plaintext) > RNS.Link.MDU:
|
|
fail("S11.1 packet REQUEST fixture exceeds Link MDU")
|
|
|
|
response = vector["packet_response"]
|
|
response_packet = RNS.Packet(None, bytes.fromhex(response["raw_hex"]))
|
|
if not response_packet.unpack() or response_packet.context != RNS.Packet.RESPONSE:
|
|
fail("S11.2 packet RESPONSE header mismatch")
|
|
response_plaintext = token.decrypt(response_packet.data)
|
|
response_decoded = umsgpack.unpackb(response_plaintext)
|
|
if (
|
|
response_decoded[0].hex() != request["request_id_hex"]
|
|
or response_decoded[0].hex() != response["correlation_request_id_hex"]
|
|
):
|
|
fail("S11.2 packet RESPONSE did not carry packet request_id")
|
|
print("PASS S11.1/S11.2 packet REQUEST/RESPONSE wire forms and packet-hash request_id")
|
|
|
|
|
|
def verify_resource_form(vector: dict, key: str, token: Token, expect_response: bool) -> None:
|
|
resource = vector[key]
|
|
plaintext = bytes.fromhex(resource["plaintext_hex"])
|
|
request_id = bytes.fromhex(resource["request_id_hex"])
|
|
stream = b"".join(bytes.fromhex(part["body_hex"]) for part in resource["parts"])
|
|
decrypted = token.decrypt(stream)[Resource.RANDOM_HASH_SIZE :]
|
|
if decrypted != plaintext:
|
|
fail(f"S11 Resource {key} did not decrypt to packed plaintext")
|
|
adv = ResourceAdvertisement.unpack(bytes.fromhex(resource["advertisement_plaintext_hex"]))
|
|
if adv.q != request_id:
|
|
fail(f"S11 Resource {key} ADV q mismatch")
|
|
if expect_response and (not adv.p or adv.u):
|
|
fail("S11.2 Resource response flags mismatch")
|
|
if not expect_response and (not adv.u or adv.p):
|
|
fail("S11.1 Resource request flags mismatch")
|
|
if not expect_response and RNS.Identity.truncated_hash(plaintext) != request_id:
|
|
fail("S11.1 Resource request_id is not truncated plaintext hash")
|
|
if len(plaintext) <= RNS.Link.MDU:
|
|
fail(f"S11 Resource {key} fixture does not exceed Link MDU")
|
|
|
|
|
|
def verify_correlation_behavior(vector: dict) -> None:
|
|
expected_id = bytes.fromhex(vector["packet_request"]["request_id_hex"])
|
|
|
|
class Pending:
|
|
def __init__(self, request_id):
|
|
self.request_id = request_id
|
|
self.response_size = None
|
|
self.response_transfer_size = None
|
|
self.received = []
|
|
|
|
def response_received(self, response, metadata=None):
|
|
self.received.append((response, metadata))
|
|
|
|
class FakeLink:
|
|
status = RNS.Link.ACTIVE
|
|
|
|
def __init__(self):
|
|
self.pending_requests = [Pending(expected_id)]
|
|
|
|
link = FakeLink()
|
|
wrong_id = bytes(reversed(expected_id))
|
|
RNS.Link.handle_response(link, wrong_id, b"wrong", 5, 5)
|
|
if len(link.pending_requests) != 1 or link.pending_requests[0].received:
|
|
fail("S11.2 wrong request_id response was not ignored")
|
|
RNS.Link.handle_response(link, expected_id, b"right", 5, 5)
|
|
if link.pending_requests or not link.pending_requests == []:
|
|
fail("S11.2 matched response did not remove pending request")
|
|
print("PASS S11.2 wrong-ID response remains unmatched; matching response resolves pending request")
|
|
|
|
|
|
def verify_constants_and_receipt_states(vector: dict) -> None:
|
|
if (RNS.Destination.ALLOW_NONE, RNS.Destination.ALLOW_ALL, RNS.Destination.ALLOW_LIST) != (0, 1, 2):
|
|
fail("S11.4 authorization constants changed")
|
|
path = vector["inputs"]["path"]
|
|
if RNS.Identity.truncated_hash(path.encode("utf-8")).hex() != vector["inputs"]["path_hash_hex"]:
|
|
fail("S11.3 path hash mismatch")
|
|
if (RequestReceipt.FAILED, RequestReceipt.SENT, RequestReceipt.DELIVERED, RequestReceipt.RECEIVING, RequestReceipt.READY) != (0, 1, 2, 3, 4):
|
|
fail("S11.5 RequestReceipt states changed")
|
|
print("PASS S11.3/S11.4/S11.5 path hash, authorization constants, and receipt states")
|
|
|
|
|
|
def verify_malformed_envelopes() -> None:
|
|
class FakeLink:
|
|
status = RNS.Link.ACTIVE
|
|
|
|
try:
|
|
RNS.Link.handle_request(FakeLink(), bytes(16), [])
|
|
except (IndexError, TypeError):
|
|
pass
|
|
else:
|
|
fail("S11.1 malformed request envelope reached request handling")
|
|
|
|
for malformed in [b"", b"\x91\xc0"]:
|
|
try:
|
|
decoded = umsgpack.unpackb(malformed)
|
|
request_id = decoded[0]
|
|
response_data = decoded[1]
|
|
_ = (request_id, response_data)
|
|
except Exception:
|
|
continue
|
|
fail("S11.2 malformed response envelope exposed request_id and response")
|
|
print("PASS S11.1/S11.2 malformed request and response envelopes cannot be handled")
|
|
|
|
|
|
def main() -> None:
|
|
print(f"verify_request_response.py against RNS {RNS.__version__}")
|
|
vector = load_json(VECTORS_PATH)
|
|
link_vector = load_json(LINKS_PATH)["vectors"][0]
|
|
token = Token(bytes.fromhex(link_vector["expected"]["derived_key_hex"]))
|
|
link_id = bytes.fromhex(link_vector["expected"]["link_id_hex"])
|
|
verify_packet_forms(vector, token, link_id)
|
|
verify_resource_form(vector, "resource_request", token, False)
|
|
verify_resource_form(vector, "resource_response", token, True)
|
|
print("PASS S11.1/S11.2 Resource REQUEST/RESPONSE ADV correlation and plaintext-hash request_id")
|
|
verify_correlation_behavior(vector)
|
|
verify_constants_and_receipt_states(vector)
|
|
verify_malformed_envelopes()
|
|
print("ALL PASS")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|