reticiulum-specification/tools/regen_request_response.py

183 lines
6.6 KiB
Python
Raw Normal View History

"""
Regenerator for test-vectors/request-response.json.
Builds deterministic packet and Resource forms for the generic RNS Link
REQUEST/RESPONSE protocol using the Link session key from links.json.
"""
from __future__ import annotations
import json
import os
import sys
import RNS
from RNS.Cryptography.Token import Token
from RNS.Resource import Resource, ResourceAdvertisement
from RNS.vendor import umsgpack
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OUT_PATH = os.path.join(REPO_ROOT, "test-vectors", "request-response.json")
LINKS_PATH = os.path.join(REPO_ROOT, "test-vectors", "links.json")
FIXED_TIME = 1700000000.0
REQUEST_PACKET_IV = bytes.fromhex("9192939495969798999a9b9c9d9e9fa0")
RESPONSE_PACKET_IV = bytes.fromhex("a1a2a3a4a5a6a7a8a9aaabacadaeafb0")
REQUEST_RESOURCE_IV = bytes.fromhex("b1b2b3b4b5b6b7b8b9babbbcbdbebfc0")
RESPONSE_RESOURCE_IV = bytes.fromhex("c1c2c3c4c5c6c7c8c9cacbcccdcecfd0")
class FakeLink:
def __init__(self, derived_key: bytes, link_id: bytes):
self.type = RNS.Destination.LINK
self.status = RNS.Link.ACTIVE
self.hash = link_id
self.link_id = link_id
self.mtu = RNS.Reticulum.MTU
self.mdu = RNS.Link.MDU
self.rtt = 0.1
self.traffic_timeout_factor = 1
self.last_outbound = 0
self.tx = 0
self.txbytes = 0
self._token = Token(derived_key)
def encrypt(self, data: bytes) -> bytes:
return self._token.encrypt(data)
def decrypt(self, data: bytes) -> bytes:
return self._token.decrypt(data)
def with_fixed_iv(iv: bytes, callback):
token_mod = sys.modules["RNS.Cryptography.Token"]
real_urandom = token_mod.os.urandom
token_mod.os.urandom = lambda length: iv if length == 16 else real_urandom(length)
try:
return callback()
finally:
token_mod.os.urandom = real_urandom
def deterministic_resource(data: bytes, link: FakeLink, request_id: bytes, is_response: bool, iv: bytes, seed: int):
real_get_random_hash = RNS.Identity.get_random_hash
hashes = [
bytes([seed]) * 4 + bytes(28),
bytes([seed + 1]) * 4 + bytes(28),
]
RNS.Identity.get_random_hash = staticmethod(lambda: hashes.pop(0))
try:
return with_fixed_iv(
iv,
lambda: Resource(
data,
link,
request_id=request_id,
is_response=is_response,
advertise=False,
auto_compress=False,
),
)
finally:
RNS.Identity.get_random_hash = staticmethod(real_get_random_hash)
def packet_fields(packet: RNS.Packet) -> dict:
return {
"plaintext_hex": packet.data.hex(),
"ciphertext_hex": packet.ciphertext.hex(),
"raw_hex": packet.raw.hex(),
"packet_hash_hex": packet.get_hash().hex(),
"packet_truncated_hash_hex": packet.getTruncatedHash().hex(),
}
def resource_fields(resource: Resource, plaintext: bytes) -> dict:
return {
"plaintext_hex": plaintext.hex(),
"request_id_hex": resource.request_id.hex(),
"resource_hash_hex": resource.hash.hex(),
"advertisement_plaintext_hex": ResourceAdvertisement(resource).pack().hex(),
"parts": [
{"body_hex": part.data.hex(), "map_hash_hex": part.map_hash.hex(), "raw_hex": part.raw.hex()}
for part in resource.parts
],
}
def main() -> None:
print(f"regen_request_response.py against RNS {RNS.__version__}")
with open(LINKS_PATH, "r", encoding="utf-8") as links_file:
link_vector = json.load(links_file)["vectors"][0]
derived_key = bytes.fromhex(link_vector["expected"]["derived_key_hex"])
link_id = bytes.fromhex(link_vector["expected"]["link_id_hex"])
link = FakeLink(derived_key, link_id)
path = "/vector/echo"
path_hash = RNS.Identity.truncated_hash(path.encode("utf-8"))
small_request = umsgpack.packb([FIXED_TIME, path_hash, {"message": "hello"}])
packet_request = with_fixed_iv(
REQUEST_PACKET_IV,
lambda: RNS.Packet(link, small_request, context=RNS.Packet.REQUEST),
)
packet_request.pack()
packet_request_id = packet_request.getTruncatedHash()
small_response = umsgpack.packb([packet_request_id, {"answer": "world"}])
packet_response = with_fixed_iv(
RESPONSE_PACKET_IV,
lambda: RNS.Packet(link, small_response, context=RNS.Packet.RESPONSE),
)
packet_response.pack()
large_request = umsgpack.packb([FIXED_TIME, path_hash, bytes(range(256)) * 3])
resource_request_id = RNS.Identity.truncated_hash(large_request)
request_resource = deterministic_resource(
large_request, link, resource_request_id, False, REQUEST_RESOURCE_IV, 0x31
)
large_response = umsgpack.packb([resource_request_id, bytes(range(255, -1, -1)) * 3])
response_resource = deterministic_resource(
large_response, link, resource_request_id, True, RESPONSE_RESOURCE_IV, 0x41
)
payload = {
"_about": (
"Deterministic RNS Link REQUEST/RESPONSE vectors. Packet requests use "
"the truncated packet hash as request_id; Resource requests use the "
"truncated hash of packed_request plaintext. Resource ADV q/u/p fields "
"carry request correlation."
),
"inputs": {
"link_vector_label": "alice_to_bob_aes256cbc",
"path": path,
"path_hash_hex": path_hash.hex(),
"timestamp": FIXED_TIME,
"packet_request_iv_hex": REQUEST_PACKET_IV.hex(),
"packet_response_iv_hex": RESPONSE_PACKET_IV.hex(),
"resource_request_iv_hex": REQUEST_RESOURCE_IV.hex(),
"resource_response_iv_hex": RESPONSE_RESOURCE_IV.hex(),
},
"packet_request": {
**packet_fields(packet_request),
"request_id_hex": packet_request_id.hex(),
},
"packet_response": {
**packet_fields(packet_response),
"correlation_request_id_hex": packet_request_id.hex(),
},
"resource_request": resource_fields(request_resource, large_request),
"resource_response": resource_fields(response_resource, large_response),
"rns_version_at_generation": RNS.__version__,
"generator_script": "tools/regen_request_response.py",
"verifies_spec_sections": ["11.1", "11.2", "11.3", "11.4", "11.5"],
}
with open(OUT_PATH, "w", encoding="utf-8", newline="\n") as output:
json.dump(payload, output, indent=2, sort_keys=False)
output.write("\n")
print(f"Wrote {OUT_PATH}")
print("ALL PASS")
if __name__ == "__main__":
main()