Verify §2.3, §4.3, §7.1, §7.4 against upstream RNS 1.2.0 / LXMF 0.9.6

Adds tools/ verifier scripts that exercise upstream RNS / LXMF and confirm
(or correct) the SPEC.md callouts:

- §2.3 HEADER_1→HEADER_2 conversion: verified by stubbing Transport.transmit
  and seeding a multi-hop path_table entry.
- §4.3 app_data 3-element variant: producer in LXMF 0.9.6 actually emits
  2 elements only (supported_functionality at LXMRouter.py:999 is dead
  code); parser tolerates 1/2/3-element + raw UTF-8.
- §7.1 path? always-precedes claim: actually conditional on
  not has_path() AND method==OPPORTUNISTIC.
- §7.4 ratchet ring default 8: actually Destination.RATCHET_COUNT = 512
  at RNS/Destination.py:85.

Also fixes a documentation bug in §1.2: the rnstransport.path.request row
of the well-known-hash table had the dest-hash prefix where the name_hash
should be (correct name_hash is 7926bbe7dd7f9aba88b0).

Seeds test-vectors/identities.json (Alice + Bob) with a regenerator
(tools/regen_identities.py) and verifier (tools/verify_destination_hash.py)
covering §1.1 and §1.2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Rob 2026-05-03 10:14:51 -04:00
commit cf169b2a9e
10 changed files with 788 additions and 61 deletions

55
SPEC.md
View file

@ -46,7 +46,7 @@ Common pre-computed `name_hash` values:
| `0ad8bff9ff75737c058e` | `nomadnetwork.gossip` |
| `9efb9c771eeb5ae90ea6` | `rnstransport.broadcasts` |
| `4848a053c16415bed6c8` | `rnstransport.remote.management` |
| `6b9f66014d9853faab22` | `rnstransport.path.request` (truncated to 16: `6b9f66014d9853faab220fba47d02761`) |
| `7926bbe7dd7f9aba88b0` | `rnstransport.path.request` (resulting `dest_hash` with `identity=None`: `6b9f66014d9853faab220fba47d02761`) |
### 1.3 Private key on-disk format
@ -79,9 +79,7 @@ HEADER_2: flags(1) hops(1) transport_id(16) dest_hash(16) context(1) data(...)
### 2.3 Originator HEADER_1 → HEADER_2 conversion
> ⚠️ **UNVERIFIED:** Source-cited but not yet exercised by a verifier in this repo's `tools/`. The behavior described matches `RNS/Transport.py::outbound` line 1074+ but no end-to-end test has confirmed that an implementation that always emits HEADER_1 fails for multi-hop where one that does the conversion succeeds.
This is non-obvious and matters: when an **originator** (not a relay) sends a packet to a destination known to be more than 1 hop away, the originator MUST also do the HEADER_2 conversion. From `RNS/Transport.py::outbound` (~line 1074):
This is non-obvious and matters: when an **originator** (not a relay) sends a packet to a destination known to be more than 1 hop away, the originator MUST also do the HEADER_2 conversion. From `RNS/Transport.py::outbound` (lines 1074-1083 in RNS 1.2.0; verified by `tools/verify_packet_header.py`):
```python
if path_entry[IDX_PT_HOPS] > 1:
@ -93,7 +91,7 @@ if path_entry[IDX_PT_HOPS] > 1:
new_raw += packet.raw[2:] # original dest_hash + context + payload
```
For destinations 0 or 1 hops away, the originator may stay HEADER_1 — the receiving rnsd auto-fills the transport_id when the destination matches a local client (`for_local_client` branch at line 1485). Implementations that always emit HEADER_1 will silently fail to deliver to multi-hop destinations even with a known path.
For destinations 0 or 1 hops away, the originator may stay HEADER_1 — the receiving rnsd auto-fills the transport_id when the destination matches a local client (`for_local_client` branch at `RNS/Transport.py:1451` in RNS 1.2.0). Implementations that always emit HEADER_1 will silently fail to deliver to multi-hop destinations even with a known path.
### 2.4 Hop count
@ -178,10 +176,11 @@ Note that `dest_hash` is INCLUDED in the signed data even though it's not in the
### 4.3 `app_data` format for LXMF delivery destinations
Upstream `LXMF/LXMRouter.py::get_announce_app_data` produces:
Upstream `LXMF/LXMRouter.py::get_announce_app_data` produces a 2-element msgpack array (verified against LXMF 0.9.6 by `tools/verify_announce_app_data.py`):
```python
peer_data = [display_name_bytes, stamp_cost] # stamp_cost = None unless 1 ≤ N ≤ 254
# LXMF/LXMRouter.py:986-1002 in LXMF 0.9.6
peer_data = [display_name, stamp_cost] # stamp_cost = None unless 1 ≤ N ≤ 254
return msgpack.packb(peer_data)
```
@ -196,9 +195,9 @@ c0 # nil (stamp_cost)
Encoding the display name as msgpack `bin` (`0xc4 NN`) is required for upstream interop — see section 9.3 below. The stamp_cost field can be `int 0` (`0x00`) or `nil` (`0xc0`); upstream's `stamp_cost_from_app_data` doesn't strict-type-check.
A third optional element `[capability_flags]` (e.g. `[SF_COMPRESSION]`) may follow. Older clients may emit a 1-element array (just the name) or a raw UTF-8 string instead of msgpack — see `LXMF/LXMF.py::display_name_from_app_data` for the parser's tolerance branches.
**A third optional `[capability_flags]` element** (e.g. `[SF_COMPRESSION]`, the only flag currently defined at `LXMF/LXMF.py:108`) is **read by the parser** (`compression_support_from_app_data` at `LXMF/LXMF.py:154-167`) but is **not emitted by the LXMF 0.9.6 producer**`LXMRouter.py:999` computes `supported_functionality = [SF_COMPRESSION]` but never appends it to `peer_data`. Implementations should accept the 3-element form on inbound (a future LXMF version may re-enable it; older deployments may emit it) but should not rely on receiving it.
> ⚠️ **UNVERIFIED:** The 3-element `[name, stamp_cost, [capabilities]]` variant is observed in `LXMF/LXMRouter.py::get_announce_app_data` source code (it adds `supported_functionality` when relevant) but the exact emission conditions and the wire-byte form across upstream versions has not been tested in this repo. Older 1-element arrays and raw UTF-8 strings are mentioned in upstream comments but no captured example has been verified.
The parser also tolerates a 1-element msgpack array (just the name) and a raw UTF-8 string ("original announce format" branch at `LXMF/LXMF.py:138-139`) — see `LXMF/LXMF.py::display_name_from_app_data` for all four accepted shapes.
### 4.4 Announce filtering by `name_hash`
@ -343,17 +342,27 @@ After processing each `CTX_NONE` DATA packet on an active link, the receiver MUS
## 7. Transport behavior — the parts that bite
### 7.1 Path requests: peers send `path?` before opportunistic LXMF
### 7.1 Path requests: peers send `path?` before opportunistic LXMF when no path is known
> ⚠️ **UNVERIFIED:** The general claim that path requests precede LXMF DATA is well-supported by `RNS/Transport.py::request_path` source. The specific claim that this *always* happens (vs only when the path entry is stale) has not been verified — observed behavior on BLE was many path-request retransmits without intervening DATA, suggesting peers retry path? until they get a response. Confirm with a runtime test exercising both fresh-path and stale-path cases.
The path-request preamble in upstream LXMF is **conditional, not unconditional** (verified by `tools/verify_path_request.py` against LXMF 0.9.6):
When `RNS.Transport.outbound` doesn't have a fresh path entry for the destination, it issues a path request before sending the actual DATA. A path request is a regular DATA packet with:
```python
# LXMF/LXMRouter.py::handle_outbound, ~line 1672
if not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC:
RNS.log("Pre-emptively requesting unknown path for opportunistic ...", RNS.LOG_DEBUG)
RNS.Transport.request_path(destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
```
- `dest_hash = SHA256(SHA256("rnstransport.path.request")[:10] || "")[:16] = 6b9f66014d9853faab220fba47d02761`
- `dest_type = PLAIN`, `transport_type = BROADCAST`, `context = CTX_NONE`
- payload: `target_dest_hash(16) || random_tag(16)` (32 bytes total — the most common non-transport-instance variant)
In other words: a `path?` is sent before the LXM **only when no entry exists in `Transport.path_table`** for the target — `has_path()` is just a key-presence check (`RNS/Transport.py:2570-2576`). Existing-but-stale path entries are NOT replaced by this preamble; LXMF instead leans on the periodic `Transport.jobs` cycle to evict expired path entries (`stale_paths` accumulator at `RNS/Transport.py:747+`), after which the next outbound LXM rediscovers the unknown-path branch and triggers the `request_path`. A second `request_path` is issued from the retry path (`LXMRouter.py:2571+`) once `lxmessage.delivery_attempts >= MAX_PATHLESS_TRIES`, so on a flaky path peers can see multiple `path?` retransmits without intervening DATA — that matches BLE-trace observations.
Transport-enabled originators append their own identity hash (16 more bytes) so the responder can route the proof back. Non-transport clients omit this.
A `path?` request itself is a regular DATA packet (verified by `tools/verify_path_request.py`):
- `dest_hash = SHA256(SHA256("rnstransport.path.request")[:10])[:16] = 6b9f66014d9853faab220fba47d02761`
- `dest_type = PLAIN`, `transport_type = BROADCAST`, `header_type = HEADER_1`, `context = CTX_NONE`
- payload (`RNS/Transport.py::request_path`):
- **leaf clients** (transport disabled): `target_dest_hash(16) || random_tag(16)` — 32 bytes
- **transport-enabled originators**: `target_dest_hash(16) || transport_id(16) || random_tag(16)` — 48 bytes — so the responding announce can be routed back along the request's reverse path
### 7.2 Responding to path requests
@ -378,9 +387,9 @@ The long-term encryption / signing keys and the `identity_hash` / `destination_h
### 7.4 Ratchet ring (inbound decrypt tolerance)
Senders cache the most recent ratchet they've seen for each destination. If you rotate your ratchet faster than relays propagate the announce, in-flight messages may arrive encrypted to your *previous* ratchet. To decrypt these, keep a small ring of recent ratchet privkeys (upstream default: 8) and try each in order during decrypt. The fallback to the long-term identity privkey is the ultimate safety net.
Senders cache the most recent ratchet they've seen for each destination. If you rotate your ratchet faster than relays propagate the announce, in-flight messages may arrive encrypted to your *previous* ratchet. To decrypt these, keep a ring of recent ratchet privkeys and try each in order during decrypt. The fallback to the long-term identity privkey is the ultimate safety net.
> ⚠️ **UNVERIFIED:** The "default 8 ratchets" upstream behavior needs a source citation (search `RNS/Identity.py` for `RATCHET_COUNT` or similar). The reference mobile-app implementation discards old ratchet privkeys on rotation, accepting the in-flight loss window. The minimum-viable client without a ring may still interop usefully — confirm by experiment.
Upstream's default ring size is **`Destination.RATCHET_COUNT = 512`** (`RNS/Destination.py:85` in RNS 1.2.0), with a minimum rotation interval of `RATCHET_INTERVAL = 30*60` seconds (line 90) and per-ratchet `RATCHET_EXPIRY = 60*60*24*30` seconds (`RNS/Identity.py:69`). A new ratchet is generated on each `rotate_ratchets()` call and prepended to the in-memory list; `_clean_ratchets` truncates back to `RATCHET_COUNT`. The 512 figure is generous and not a hard interop requirement — it's an in-memory bound on the inbound-decrypt try-list.
A minimal client may keep just the current ratchet privkey, accepting that the brief window between rotation and announce-propagation will lose some messages. Mention the trade-off in your implementation notes.
@ -510,15 +519,11 @@ logged before any filtering converts hours of "messages aren't arriving" debuggi
## 10. Test vectors
> ⚠️ **UNVERIFIED:** The `test-vectors/` directory is currently a placeholder. See [`agent.md`](agent.md) §5 for the bootstrap task list — populating real test vectors with regenerator scripts is the highest-priority next task for a contributor.
See [`test-vectors/`](test-vectors/). Currently populated:
See [`test-vectors/`](test-vectors/). Each vector includes:
- **`identities.json`** — Alice and Bob private-key inputs plus their derived `public_key`, `identity_hash`, and `lxmf.delivery` `destination_hash`. Verified by `tools/verify_destination_hash.py`; regenerated by `tools/regen_identities.py`. Covers SPEC.md §1.1 and §1.2.
- Identity inputs (private keys hex)
- Derived public material (public_key, identity_hash, destination_hash for `lxmf.delivery`)
- A signed announce packet in full hex
- Encrypted LXMF DATA (sender, recipient, plaintext, expected ciphertext bytes)
- Link handshake (LINKREQUEST + LRPROOF + derived session keys)
> ⚠️ **UNVERIFIED:** The remaining vector categories — signed announce packets, encrypted opportunistic LXMF DATA, and Link handshake (LINKREQUEST + LRPROOF + derived session keys) — are not yet populated. See [`agent.md`](agent.md) §5 and [`todo.md`](todo.md) for the remaining bootstrap work.
An implementation that round-trips every test vector — both directions — should be wire-compatible with upstream Reticulum and LXMF for the covered operations.

View file

@ -4,7 +4,14 @@ Known-good byte sequences that any Reticulum-compatible implementation should be
## Status
Empty placeholder. See [`../agent.md`](../agent.md) §5 for the bootstrap task list.
Partially populated against RNS 1.2.0:
- ✅ `identities.json` — Alice + Bob identity vectors (regenerator: `../tools/regen_identities.py`, verifier: `../tools/verify_destination_hash.py`).
- ⏳ `announces.json` — not yet populated.
- ⏳ `lxmf.json` — not yet populated.
- ⏳ `links.json` — not yet populated.
See [`../agent.md`](../agent.md) §5 and [`../todo.md`](../todo.md) for the remaining bootstrap task list.
## Format (proposed)

View file

@ -0,0 +1,47 @@
{
"_about": "Identity test vectors. Load via RNS.Identity.from_bytes(bytes.fromhex(inputs.private_key_hex)) and confirm the four expected.* fields match. Regenerate by running this script against the upstream RNS shown in rns_version_at_generation.",
"vectors": [
{
"label": "alice",
"destination_full_name": "lxmf.delivery",
"inputs": {
"x25519_priv_hex": "587e730a70d24e971efa8c146e554996d70bff45b2033d336e2c078dc63d3645",
"ed25519_priv_hex": "bef79d95bf6b253827a2e7e81a13ab0b10a908fd158581d1827095b788169e93",
"private_key_hex": "587e730a70d24e971efa8c146e554996d70bff45b2033d336e2c078dc63d3645bef79d95bf6b253827a2e7e81a13ab0b10a908fd158581d1827095b788169e93"
},
"expected": {
"public_key_hex": "76fce269b2356a51b6a832a1a25099155acb20733b453f9538aaa8069e854d5a780708b44424373474ee1607c3f2b4a1cd5643de508e106e6b8cf4a10f00ec7c",
"identity_hash_hex": "28d43a11abc1094301a59ed3b44f127b",
"name_hash_hex": "6ec60bc318e2c0f0d908",
"destination_hash_hex": "c33c40a5b030596d95617dc4ca163aae"
},
"rns_version_at_generation": "1.2.0",
"generator_script": "tools/regen_identities.py",
"verifies_spec_sections": [
"1.1",
"1.2"
]
},
{
"label": "bob",
"destination_full_name": "lxmf.delivery",
"inputs": {
"x25519_priv_hex": "0f453e75d564532f2fa671aea79e9a714e4564e1ff833d1df19986fe8a36aa21",
"ed25519_priv_hex": "9a6acdad966af7d006cfd393ca8278c608978bcaefa5b5f24db867179f83a863",
"private_key_hex": "0f453e75d564532f2fa671aea79e9a714e4564e1ff833d1df19986fe8a36aa219a6acdad966af7d006cfd393ca8278c608978bcaefa5b5f24db867179f83a863"
},
"expected": {
"public_key_hex": "92331490ac7c5db96102f80ffc64d71330907a5aea969b8617b7b2f3e0f8352a274e3172cbb18bdb14ccc1178fd66a8a811be97690d30985c75649a2b07dc76a",
"identity_hash_hex": "c090410e5b5bf8956194c1872dccec3b",
"name_hash_hex": "6ec60bc318e2c0f0d908",
"destination_hash_hex": "9695d17f22fa6e45d2b0cd3439a7ca7e"
},
"rns_version_at_generation": "1.2.0",
"generator_script": "tools/regen_identities.py",
"verifies_spec_sections": [
"1.1",
"1.2"
]
}
]
}

62
todo.md
View file

@ -12,7 +12,11 @@ Outstanding work for the spec repo.
## Test infrastructure
- [ ] **Bootstrap `test-vectors/`** with the existing vectors from
- [x] **Bootstrap `test-vectors/identities.json`** — Alice + Bob
identities populated against RNS 1.2.0. Regenerator at
`tools/regen_identities.py`.
- [ ] **Bootstrap remaining test-vectors files** (`announces.json`,
`lxmf.json`, `links.json`) with the existing vectors from
`reticulum-mobile-app/reference/test-vectors.json`. Convert to
the proposed JSON format documented in `test-vectors/README.md`,
adding the regenerator scripts so future contributors can
@ -20,14 +24,14 @@ Outstanding work for the spec repo.
- [ ] **Write the priority verifier scripts** listed in
`tools/README.md`, in this order (highest interop value first):
1. `verify_destination_hash.py` — pure-function check, no RNS state needed
2. `verify_packet_header.py` — bit layout + HEADER_1/HEADER_2 round-trip
3. `verify_announce_roundtrip.py` — closes the SPEC.md §4 gap
4. `verify_token_crypto.py` — closes SPEC.md §3 gap
5. `verify_lxmf_opportunistic.py` — closes SPEC.md §5 gap
6. `verify_link_handshake.py` — closes SPEC.md §6 gap
7. `verify_path_request.py` — closes SPEC.md §7.1, §7.2 gaps
8. `verify_msgpack_quirk.py` — closes SPEC.md §9.3 gap
1. [x] `verify_destination_hash.py` — pure-function check, no RNS state needed
2. [x] `verify_packet_header.py` — bit layout + HEADER_1/HEADER_2 round-trip + originator HEADER_1→HEADER_2 conversion
3. [ ] `verify_announce_roundtrip.py` — closes the SPEC.md §4 gap (partial coverage in `verify_announce_app_data.py`)
4. [ ] `verify_token_crypto.py` — closes SPEC.md §3 gap
5. [ ] `verify_lxmf_opportunistic.py` — closes SPEC.md §5 gap
6. [ ] `verify_link_handshake.py` — closes SPEC.md §6 gap
7. [x] `verify_path_request.py` — closes SPEC.md §7.1, §7.2 gaps
8. [ ] `verify_msgpack_quirk.py` — closes SPEC.md §9.3 gap
Each verifier should remove its corresponding `⚠️ UNVERIFIED` /
`🔮 SPECULATION` callout in `SPEC.md` (per `agent.md` §1).
@ -37,23 +41,35 @@ Outstanding work for the spec repo.
These need either a runtime test or a stronger upstream source citation
to remove their markers:
- [ ] **§2.3 Originator HEADER_1 → HEADER_2 conversion.** Need a
verifier that exercises `RNS.Transport.outbound` with a known
multi-hop path table and shows the wire bytes change.
- [x] **§2.3 Originator HEADER_1 → HEADER_2 conversion.** Verified
against RNS 1.2.0 by `tools/verify_packet_header.py`, which
seeds `Transport.path_table` with a multi-hop entry and confirms
the converted wire bytes via stubbed `Transport.transmit`.
Citation updated to `RNS/Transport.py:1074-1083`.
- [ ] **§4.3 The 3-element `[name, stamp_cost, [capabilities]]`
app_data variant.** Need a captured upstream emission with the
capabilities field present, ideally with `SF_COMPRESSION` set.
- [x] **§4.3 The 3-element `[name, stamp_cost, [capabilities]]`
app_data variant.** Verified against LXMF 0.9.6 by
`tools/verify_announce_app_data.py`. Finding: in this LXMF
version the producer emits a 2-element form only (the
`supported_functionality` line at `LXMF/LXMRouter.py:999` is
dead code); the parser is prepared for a 3-element form via
`compression_support_from_app_data`. SPEC.md §4.3 updated to
describe the actual current behavior.
- [ ] **§7.1 path? always precedes LXMF DATA.** Verify whether peers
ALWAYS issue path? before sending LXMF, or only when the path
table entry is stale. Run a controlled test with a fresh path
and a known-stale path.
- [x] **§7.1 path? always precedes LXMF DATA.** Verified against
LXMF 0.9.6 by `tools/verify_path_request.py`. Finding: the
preamble fires only when `not has_path()` AND method is
OPPORTUNISTIC; the retry path can fire a second `request_path`
after `MAX_PATHLESS_TRIES` (`LXMRouter.py:2571+`). SPEC.md §7.1
rewritten accordingly. Also fixed a documentation bug in §1.2
(path-request name_hash column).
- [ ] **§7.4 Ratchet ring count default = 8.** Find the upstream
source citation (`RNS/Identity.py`, search for `RATCHET_COUNT`
or similar). Update SPEC.md with the file + line, OR replace
with the actual measured value if different.
- [x] **§7.4 Ratchet ring count default = 8.** False — actual upstream
default is `Destination.RATCHET_COUNT = 512` at
`RNS/Destination.py:85` in RNS 1.2.0, with
`RATCHET_INTERVAL = 30*60` (line 90) and
`RATCHET_EXPIRY = 60*60*24*30` (`RNS/Identity.py:69`).
SPEC.md §7.4 corrected.
## Spec polishing (lower priority)

View file

@ -19,17 +19,19 @@ The scripts read `RNS.__version__` at startup and print it in their output so a
## Status
Empty placeholder. See [`../agent.md`](../agent.md) §5 for the priority order.
Populated against RNS 1.2.0 / LXMF 0.9.6:
Initial scripts to write:
| Script | Verifies SPEC.md section | Status |
|---|---|---|
| `verify_destination_hash.py` | §1.1, §1.2 — identity composition + `dest_hash = SHA256(name_hash \|\| identity_hash)[:16]` | ✅ |
| `verify_packet_header.py` | §2.1, §2.2, §2.3 — flag byte layout, HEADER_1/HEADER_2 form, originator HEADER_1→HEADER_2 conversion via upstream `Transport.outbound` | ✅ |
| `verify_announce_app_data.py` | §4.3 — LXMF announce app_data 2-element form, parser tolerance | ✅ |
| `verify_path_request.py` | §1.2 well-known hashes, §7.1 LXMF path-preamble gating | ✅ |
| `regen_identities.py` | regenerates `test-vectors/identities.json` | ✅ |
| `verify_announce_roundtrip.py` | §4 — announce build matches upstream `Identity().announce()` bytes | ⏳ |
| `verify_token_crypto.py` | §3 — Token encrypt/decrypt against upstream `RNS.Cryptography.Token` | ⏳ |
| `verify_lxmf_opportunistic.py` | §5.1, §5.5 — opportunistic LXMF body bytes match upstream | ⏳ |
| `verify_link_handshake.py` | §6 — LINKREQUEST + LRPROOF + session key match upstream | ⏳ |
| `verify_msgpack_quirk.py` | §9.3 — encoding name as bytes vs str affects upstream parsing | ⏳ |
| Script | Verifies SPEC.md section |
|---|---|
| `verify_destination_hash.py` | §1.2 — `dest_hash = SHA256(name_hash || identity_hash)[:16]` |
| `verify_packet_header.py` | §2.1, §2.2 — flag byte layout + HEADER_1/HEADER_2 round-trip |
| `verify_announce_roundtrip.py` | §4 — announce build matches upstream `Identity().announce()` bytes |
| `verify_token_crypto.py` | §3 — Token encrypt/decrypt against upstream `RNS.Cryptography.Token` |
| `verify_lxmf_opportunistic.py` | §5.1, §5.5 — opportunistic LXMF body bytes match upstream |
| `verify_link_handshake.py` | §6 — LINKREQUEST + LRPROOF + session key match upstream |
| `verify_path_request.py` | §7.1, §7.2 — path-request payload format |
| `verify_msgpack_quirk.py` | §9.3 — encoding name as bytes vs str affects upstream parsing |
See [`../agent.md`](../agent.md) §5 and [`../todo.md`](../todo.md) for the remaining priority order.

126
tools/regen_identities.py Normal file
View file

@ -0,0 +1,126 @@
"""
Regenerator for test-vectors/identities.json.
Takes a list of fixed (X25519_priv || Ed25519_priv) inputs and emits the
expected `public_key`, `identity_hash`, and `destination_hash` (for
`lxmf.delivery`) that upstream RNS derives. Verifies the round-trip:
load -> derive -> reload-from-output and confirms outputs match.
Run from the repo root:
python tools/regen_identities.py
Updates `test-vectors/identities.json` in place. Exit 0 on success.
"""
from __future__ import annotations
import hashlib
import json
import os
import sys
import RNS
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OUT_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
# Fixed test-vector inputs. The X25519 and Ed25519 private bytes are 32
# arbitrary but stable bytes; we use SHA-256 of a label so the values are
# reproducible in any environment without a private blob in the repo.
def vector_inputs():
def derive(label: str, suffix: str) -> bytes:
return hashlib.sha256(f"{label}.{suffix}".encode("utf-8")).digest()
return [
{
"label": "alice",
"x25519_priv_hex": derive("alice", "x25519").hex(),
"ed25519_priv_hex": derive("alice", "ed25519").hex(),
"destination_full_name": "lxmf.delivery",
},
{
"label": "bob",
"x25519_priv_hex": derive("bob", "x25519").hex(),
"ed25519_priv_hex": derive("bob", "ed25519").hex(),
"destination_full_name": "lxmf.delivery",
},
]
def derive_with_upstream(spec: dict) -> dict:
prv_bytes = bytes.fromhex(spec["x25519_priv_hex"]) + bytes.fromhex(spec["ed25519_priv_hex"])
if len(prv_bytes) != 64:
raise ValueError(f"prv_bytes for {spec['label']} must be 64 B, got {len(prv_bytes)}")
identity = RNS.Identity.from_bytes(prv_bytes)
if identity is None:
raise RuntimeError(f"Identity.from_bytes returned None for {spec['label']}")
public_key = identity.get_public_key() # 64 B: X25519_pub || Ed25519_pub
identity_hash = identity.hash # 16 B
full_name = spec["destination_full_name"]
name_hash = hashlib.sha256(full_name.encode("utf-8")).digest()[:10]
destination_hash = hashlib.sha256(name_hash + identity_hash).digest()[:16]
# Cross-check: RNS.Destination.hash with bytes-style identity argument
# (no Identity instance — just the raw identity_hash) must agree.
rns_dest_hash = RNS.Destination.hash(identity_hash, *full_name.split("."))
if rns_dest_hash != destination_hash:
raise RuntimeError(
f"destination_hash mismatch for {spec['label']!r}: "
f"hand-computed {destination_hash.hex()} vs RNS.Destination.hash "
f"{rns_dest_hash.hex()}"
)
# Round-trip: rebuild identity from its private-key bytes again.
rebuilt = RNS.Identity.from_bytes(identity.get_private_key())
if rebuilt.hash != identity_hash:
raise RuntimeError(
f"private-key round-trip mismatch for {spec['label']!r}"
)
return {
"label": spec["label"],
"destination_full_name": full_name,
"inputs": {
"x25519_priv_hex": spec["x25519_priv_hex"],
"ed25519_priv_hex": spec["ed25519_priv_hex"],
"private_key_hex": identity.get_private_key().hex(), # X25519 || Ed25519
},
"expected": {
"public_key_hex": public_key.hex(), # X25519_pub || Ed25519_pub
"identity_hash_hex": identity_hash.hex(),
"name_hash_hex": name_hash.hex(),
"destination_hash_hex": destination_hash.hex(),
},
"rns_version_at_generation": RNS.__version__,
"generator_script": "tools/regen_identities.py",
"verifies_spec_sections": ["1.1", "1.2"],
}
def main():
print(f"regen_identities.py against RNS {RNS.__version__}")
vectors = [derive_with_upstream(v) for v in vector_inputs()]
payload = {
"_about": (
"Identity test vectors. Load via RNS.Identity.from_bytes("
"bytes.fromhex(inputs.private_key_hex)) and confirm the four "
"expected.* fields match. Regenerate by running this script "
"against the upstream RNS shown in rns_version_at_generation."
),
"vectors": vectors,
}
os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True)
with open(OUT_PATH, "w", encoding="utf-8", newline="\n") as f:
json.dump(payload, f, indent=2, sort_keys=False)
f.write("\n")
print(f"Wrote {OUT_PATH} with {len(vectors)} vectors")
print("ALL PASS")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,123 @@
"""
Verifier for SPEC.md S4.3 (announce app_data format for LXMF delivery
destinations).
Exercises:
- Upstream LXMF.LXMRouter.get_announce_app_data emits a 2-element msgpack
array [display_name_bytes, stamp_cost] in LXMF 0.9.6. The dead-code
supported_functionality line at LXMF/LXMRouter.py:999 is computed but
never appended.
- The wire-byte form for display_name="Reticulum5", stamp_cost=None matches
the hex documented in SPEC.md S4.3:
92 c4 0a 52 65 74 69 63 75 6c 75 6d 35 c0
- The parsers in LXMF/LXMF.py tolerate:
* raw UTF-8 ("original announce format")
* 1-element msgpack array
* 2-element [name, stamp_cost]
* 3-element [name, stamp_cost, [capability_flags]]
and that the 3-element variant is what flips
compression_support_from_app_data based on SF_COMPRESSION presence.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import sys
import RNS
import LXMF
from LXMF import LXMF as LXMF_helpers
import RNS.vendor.umsgpack as umsgpack
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def verify_two_element_wire_bytes():
# SPEC.md S4.3 hex example: display_name="Reticulum5", stamp_cost=None.
# Spec layout:
# 92 # fixarray, 2 elements
# c4 0a # bin8, length 10
# 52 65 74 69 63 75 6c 75 6d 35 # "Reticulum5"
# c0 # nil (stamp_cost)
expected = bytes.fromhex("92" + "c40a" + "5265746963756c756d35" + "c0")
name = b"Reticulum5"
peer_data = [name, None]
actual = umsgpack.packb(peer_data)
if actual != expected:
fail(f"S4.3 2-element wire bytes mismatch:\n"
f" got: {actual.hex()}\n"
f" want: {expected.hex()}")
print("PASS S4.3 2-element wire bytes (umsgpack.packb([b'Reticulum5', None]))")
def verify_producer_is_two_element_in_this_lxmf():
# Read the LXMRouter source and confirm it appends 2 elements (not 3) in
# the LXMF 0.9.6 producer. We do this by inspecting the function source so
# the verifier breaks loudly if upstream restores the 3-element variant.
import inspect
from LXMF.LXMRouter import LXMRouter
src = inspect.getsource(LXMRouter.get_announce_app_data)
if "peer_data = [display_name, stamp_cost]" not in src:
fail(f"S4.3 producer line not found in LXMRouter.get_announce_app_data. "
f"Upstream may have restored the 3-element variant. Source:\n{src}")
if "peer_data.append(supported_functionality)" in src:
# If upstream re-enables this, the spec needs the 3-element variant
# marked as "current upstream" instead of "parser-tolerated only".
fail("S4.3 producer NOW appends supported_functionality. "
"Update SPEC.md S4.3 to describe the 3-element variant as live.")
print(f"PASS S4.3 LXMF {LXMF.__version__} producer emits 2-element form only "
"(supported_functionality is dead code at LXMF/LXMRouter.py:999)")
def verify_parser_tolerance():
cases = [
# (label, app_data bytes, expected display_name, expected stamp_cost,
# expected compression_support)
("raw UTF-8 (original format)",
"Alice".encode("utf-8"),
"Alice", None, True),
("1-element fixarray [name]",
umsgpack.packb([b"Bob"]),
"Bob", None, True),
("2-element fixarray [name, stamp_cost=int]",
umsgpack.packb([b"Carol", 12]),
"Carol", 12, True),
("2-element fixarray [name, nil]",
umsgpack.packb([b"Dan", None]),
"Dan", None, True),
("3-element fixarray [name, nil, [SF_COMPRESSION]]",
umsgpack.packb([b"Eve", None, [LXMF_helpers.SF_COMPRESSION]]),
"Eve", None, True),
("3-element fixarray [name, nil, []] (no SF flags set)",
umsgpack.packb([b"Faye", None, []]),
"Faye", None, False),
]
for label, blob, want_name, want_cost, want_compress in cases:
got_name = LXMF_helpers.display_name_from_app_data(blob)
got_cost = LXMF_helpers.stamp_cost_from_app_data(blob)
got_compress = LXMF_helpers.compression_support_from_app_data(blob)
if got_name != want_name:
fail(f"S4.3 parser ({label}): display_name got {got_name!r} want {want_name!r}")
if got_cost != want_cost:
fail(f"S4.3 parser ({label}): stamp_cost got {got_cost!r} want {want_cost!r}")
if got_compress != want_compress:
fail(f"S4.3 parser ({label}): compression got {got_compress!r} want {want_compress!r}")
print("PASS S4.3 parser tolerance (raw UTF-8, 1/2/3-element msgpack array)")
def main():
print(f"verify_announce_app_data.py against RNS {RNS.__version__} / LXMF {LXMF.__version__}")
verify_two_element_wire_bytes()
verify_producer_is_two_element_in_this_lxmf()
verify_parser_tolerance()
print("ALL PASS")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,100 @@
"""
Verifier for SPEC.md S1.1 (identity composition) and S1.2 (destination hash).
Reads test-vectors/identities.json and, for each vector:
- Loads the private-key bytes via RNS.Identity.from_bytes (the upstream API
for the on-bytes form X25519_priv || Ed25519_priv).
- Confirms identity.get_public_key() matches expected.public_key_hex
(X25519_pub || Ed25519_pub).
- Confirms identity.hash matches expected.identity_hash_hex
(= SHA256(public_key)[:16]).
- Confirms SHA256(SHA256(name)[:10] || identity_hash)[:16] equals
expected.destination_hash_hex.
- Cross-checks RNS.Destination.hash(identity_hash, *name.split(".")) ==
expected.destination_hash_hex (i.e. upstream agrees with the by-hand recipe).
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import hashlib
import json
import os
import sys
import RNS
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
VEC_PATH = os.path.join(REPO_ROOT, "test-vectors", "identities.json")
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def verify_vector(v: dict) -> None:
label = v["label"]
inputs = v["inputs"]
expect = v["expected"]
full = v["destination_full_name"]
prv = bytes.fromhex(inputs["private_key_hex"])
if prv.hex() != inputs["x25519_priv_hex"] + inputs["ed25519_priv_hex"]:
fail(f"{label}: private_key_hex != x25519_priv_hex||ed25519_priv_hex")
identity = RNS.Identity.from_bytes(prv)
if identity is None:
fail(f"{label}: RNS.Identity.from_bytes returned None")
if identity.get_public_key().hex() != expect["public_key_hex"]:
fail(f"{label}: public_key mismatch\n"
f" got: {identity.get_public_key().hex()}\n"
f" want: {expect['public_key_hex']}")
expected_idhash = hashlib.sha256(identity.get_public_key()).digest()[:16]
if expected_idhash.hex() != expect["identity_hash_hex"]:
fail(f"{label}: SHA256(public_key)[:16] != identity_hash")
if identity.hash.hex() != expect["identity_hash_hex"]:
fail(f"{label}: identity.hash != expected (RNS disagrees with manual)")
name_hash_calc = hashlib.sha256(full.encode("utf-8")).digest()[:10]
if name_hash_calc.hex() != expect["name_hash_hex"]:
fail(f"{label}: name_hash for {full!r}: got {name_hash_calc.hex()} "
f"want {expect['name_hash_hex']}")
dest_calc = hashlib.sha256(name_hash_calc + identity.hash).digest()[:16]
if dest_calc.hex() != expect["destination_hash_hex"]:
fail(f"{label}: dest_hash recipe mismatch\n"
f" got: {dest_calc.hex()}\n"
f" want: {expect['destination_hash_hex']}")
rns_dest_hash = RNS.Destination.hash(identity.hash, *full.split("."))
if rns_dest_hash.hex() != expect["destination_hash_hex"]:
fail(f"{label}: RNS.Destination.hash != expected\n"
f" RNS.Destination.hash: {rns_dest_hash.hex()}\n"
f" want: {expect['destination_hash_hex']}")
print(f"PASS {label}: identity, identity_hash, dest_hash for {full!r}")
def main():
print(f"verify_destination_hash.py against RNS {RNS.__version__}")
if not os.path.isfile(VEC_PATH):
fail(f"Missing test-vectors file: {VEC_PATH}")
with open(VEC_PATH, "r", encoding="utf-8") as f:
payload = json.load(f)
if "vectors" not in payload:
fail(f"Bad test-vectors file: missing 'vectors' key")
for vec in payload["vectors"]:
verify_vector(vec)
print(f"ALL PASS ({len(payload['vectors'])} identity vectors)")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,196 @@
"""
Verifier for SPEC.md §2.1, §2.2, §2.3.
Verifies:
- §2.1: flag-byte layout (header_type, context_flag, transport_type,
destination_type, packet_type) by constructing packets with each
combination and reading the resulting flag byte.
- §2.2: HEADER_1 layout flags(1) hops(1) dest_hash(16) context(1) data
and HEADER_2 layout flags(1) hops(1) transport_id(16) dest_hash(16)
context(1) data.
- §2.3: originator HEADER_1 HEADER_2 conversion when path_table reports
hops > 1. The conversion logic at RNS/Transport.py:1074-1083 is
exercised by stubbing Transport.transmit and seeding the path_table
with a synthetic multi-hop entry. The wire bytes captured at
transmit-time are compared to the expected HEADER_2 form.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import os
import struct
import sys
import tempfile
import RNS
from RNS import Transport
from RNS.Transport import (
IDX_PT_TIMESTAMP, IDX_PT_NEXT_HOP, IDX_PT_HOPS,
IDX_PT_EXPIRES, IDX_PT_RANDBLOBS, IDX_PT_RVCD_IF, IDX_PT_PACKET,
)
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-")
# Build a minimal config with no interfaces — we only need RNS.Reticulum
# to be initialised so RNS.Identity etc. work; we do not transmit.
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def verify_flag_byte_layout():
# §2.1: bit 7-6 header_type, bit 5 context_flag, bit 4 transport_type,
# bit 3-2 dest_type, bit 1-0 packet_type.
# Build a packet by hand and check the flag byte by replicating
# RNS.Packet.pack's header_type field semantics.
cases = [
# (header_type, context_flag, transport_type, dest_type, packet_type, expected_flag)
(0, 0, 0, 0, 0, 0b00000000),
(1, 0, 0, 0, 0, 0b01000000),
(0, 1, 0, 0, 0, 0b00100000),
(0, 0, 1, 0, 0, 0b00010000),
(0, 0, 0, 3, 0, 0b00001100),
(0, 0, 0, 0, 3, 0b00000011),
(1, 1, 1, 3, 3, 0b01111111),
]
for ht, cf, tt, dt, pt, expected in cases:
flag = (ht << 6) | (cf << 5) | (tt << 4) | (dt << 2) | pt
if flag != expected:
fail(f"flag layout: ht={ht} cf={cf} tt={tt} dt={dt} pt={pt} -> "
f"got 0x{flag:02x} expected 0x{expected:02x}")
print("PASS S2.1 flag-byte layout")
def verify_header_two_form():
# §2.2: HEADER_2 wire form is flags(1) hops(1) transport_id(16) dest_hash(16) context(1) data.
# The simplest verification is to round-trip via RNS.Packet's unpack: build
# a HEADER_2 raw blob and confirm RNS unpacks the addressing fields at the
# right offsets.
flag = (RNS.Packet.HEADER_2 << 6) | (Transport.TRANSPORT << 4) | (RNS.Destination.SINGLE << 2) | RNS.Packet.DATA
transport_id = bytes(range(16))
dest_hash = bytes(range(16, 32))
raw = bytes([flag, 0]) + transport_id + dest_hash + b"\x00" + b"hello"
if raw[0] != flag: fail("HEADER_2 flag byte mismatch")
if raw[1] != 0: fail("HEADER_2 hops byte mismatch")
if raw[2:18] != transport_id: fail("HEADER_2 transport_id offset mismatch")
if raw[18:34] != dest_hash: fail("HEADER_2 dest_hash offset mismatch")
if raw[34] != 0: fail("HEADER_2 context offset mismatch")
if raw[35:] != b"hello": fail("HEADER_2 payload offset mismatch")
print("PASS S2.2 HEADER_2 wire form")
def verify_header_conversion(rns_instance):
# §2.3: with a path_table entry where hops > 1, an outbound HEADER_1 packet
# must be converted to HEADER_2 with the next-hop transport_id at offset 2.
transport_id = b"\xaa" * 16
dest_hash = b"\xbb" * 16
# Build a minimal valid HEADER_1 packet by constructing it through RNS so
# the runtime accepts it as outbound. SINGLE OUT destination so the
# runtime is happy — payload bytes don't matter; we only inspect headers.
identity = RNS.Identity()
destination = RNS.Destination(
identity, RNS.Destination.OUT, RNS.Destination.SINGLE,
"verifier", "spec23",
)
captured = {}
def fake_transmit(interface, raw):
captured["raw"] = raw
captured["interface"] = interface
real_transmit = Transport.transmit
Transport.transmit = staticmethod(fake_transmit)
try:
# Seed a path_table entry: hops=2, next_hop=transport_id, fake interface.
# IDX_PT_RVCD_IF must be a non-None object — supply a sentinel.
class FakeIF:
OUT = True
name = "FakeIF"
with Transport.path_table_lock:
Transport.path_table[dest_hash] = [
0, # IDX_PT_TIMESTAMP
transport_id, # IDX_PT_NEXT_HOP
2, # IDX_PT_HOPS
0, # IDX_PT_EXPIRES
[], # IDX_PT_RANDBLOBS
FakeIF(), # IDX_PT_RVCD_IF
None, # IDX_PT_PACKET
]
# Build the packet — PLAIN destination DATA, HEADER_1 by default.
pkt = RNS.Packet(destination, b"x", create_receipt=False)
pkt.pack()
original = pkt.raw
# The fact RNS.Packet pack puts the destination's own dest_hash at
# offset 2 is exactly the §2.2 HEADER_1 layout.
if original[2:18] != destination.hash:
fail(f"HEADER_1 packed: dest_hash offset 2 mismatch "
f"got={original[2:18].hex()} want={destination.hash.hex()}")
# Force the hash to our chosen dest_hash so the path_table lookup hits.
# We rewrite the raw bytes at offset 2 and update destination_hash on
# the packet object so Transport.outbound finds the path table entry.
forced_raw = original[:2] + dest_hash + original[18:]
pkt.raw = forced_raw
pkt.destination_hash = dest_hash
Transport.outbound(pkt)
if "raw" not in captured:
fail("§2.3 conversion: Transport.outbound did not transmit")
out = captured["raw"]
# Expected HEADER_2: flag with HEADER_2 bit + TRANSPORT bit + low
# nibble preserved; hops byte from original; transport_id; original[2:].
expected_flag = (
(RNS.Packet.HEADER_2 << 6)
| (Transport.TRANSPORT << 4)
| (forced_raw[0] & 0b00001111)
)
if out[0] != expected_flag:
fail(f"S2.3 conversion: flag got 0x{out[0]:02x} want 0x{expected_flag:02x}")
if out[1] != forced_raw[1]:
fail(f"S2.3 conversion: hops byte got 0x{out[1]:02x} want 0x{forced_raw[1]:02x}")
if out[2:18] != transport_id:
fail(f"S2.3 conversion: transport_id got {out[2:18].hex()} "
f"want {transport_id.hex()}")
if out[18:] != forced_raw[2:]:
fail("§2.3 conversion: trailing bytes (orig dest_hash + ctx + payload) mismatch")
print("PASS S2.3 HEADER_1 -> HEADER_2 conversion at originator "
"(matches RNS/Transport.py:1074-1083)")
finally:
Transport.transmit = real_transmit
with Transport.path_table_lock:
Transport.path_table.pop(dest_hash, None)
def main():
print(f"verify_packet_header.py against RNS {RNS.__version__}")
rns_instance = init_minimal_rns()
try:
verify_flag_byte_layout()
verify_header_two_form()
verify_header_conversion(rns_instance)
finally:
try:
RNS.Reticulum.exit_handler()
except Exception:
pass
print("ALL PASS")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,105 @@
"""
Verifier for SPEC.md S7.1, S7.2 (path requests).
Verifies:
- The well-known path-request destination hash:
SHA256(SHA256("rnstransport.path.request")[:10])[:16]
== 6b9f66014d9853faab220fba47d02761
- The full SPEC table for `name_hash` values.
- The path-request payload format constructed by upstream
`RNS.Transport.request_path` (LXMF/LXMRouter.py:1672-1674 calls path):
transport-disabled (leaf): destination_hash(16) || tag(16) -> 32B
transport-enabled : destination_hash(16) || transport_id(16)
|| tag(16) -> 48B
- That LXMF triggers a path? request from `LXMRouter.handle_outbound` at
line 1672 ONLY when `not has_path(destination_hash) and method ==
OPPORTUNISTIC`. The "always-precedes" claim in older spec drafts is too
strong: the request is conditional on the path-table miss.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import hashlib
import inspect
import sys
import RNS
import LXMF
from LXMF.LXMRouter import LXMRouter
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def verify_well_known_hashes():
cases = [
("lxmf.delivery", "6ec60bc318e2c0f0d908"),
("lxmf.propagation", "e03a09b77ac21b22258e"),
("nomadnetwork.node", "213e6311bcec54ab4fde"),
("nomadnetwork.gossip", "0ad8bff9ff75737c058e"),
("rnstransport.broadcasts", "9efb9c771eeb5ae90ea6"),
("rnstransport.remote.management", "4848a053c16415bed6c8"),
("rnstransport.path.request", "7926bbe7dd7f9aba88b0"),
]
for name, expected_namehash_hex in cases:
actual = hashlib.sha256(name.encode("utf-8")).digest()[:10]
if actual.hex() != expected_namehash_hex:
fail(f"S1.2 name_hash for {name!r}: "
f"got {actual.hex()} want {expected_namehash_hex}")
print("PASS S1.2 well-known name_hash table")
# Full path-request dest_hash: SHA256(name_hash || identity_hash)[:16]
# with identity=None -> identity_hash = b"" (Destination.hash w/ identity=None
# at RNS/Destination.py:121 has addr_hash_material = name_hash only).
expected_pr_dest_hash = "6b9f66014d9853faab220fba47d02761"
namehash = hashlib.sha256(b"rnstransport.path.request").digest()[:10]
pr_dest_hash = hashlib.sha256(namehash).digest()[:16]
if pr_dest_hash.hex() != expected_pr_dest_hash:
fail(f"S7.1 path-request dest_hash: got {pr_dest_hash.hex()} "
f"want {expected_pr_dest_hash}")
print("PASS S7.1 path-request dest_hash 6b9f66014d9853faab220fba47d02761")
def verify_request_path_payload_format():
# Read source — verify the body construction matches the SPEC's claim.
src = inspect.getsource(RNS.Transport.request_path)
# transport-disabled: destination_hash + request_tag
# transport-enabled : destination_hash + Transport.identity.hash + request_tag
if "destination_hash+request_tag" not in src.replace(" ", ""):
fail(f"S7.1 transport-disabled payload form not found in request_path source:\n{src}")
if "destination_hash+Transport.identity.hash+request_tag" not in src.replace(" ", ""):
fail(f"S7.1 transport-enabled payload form not found in request_path source:\n{src}")
print("PASS S7.1 request_path payload format "
"(leaf: dest+tag; transport: dest+xport_id+tag)")
def verify_lxmf_only_when_pathless():
# Confirm the LXMF outbound path-request is gated on
# `not has_path(destination_hash) and method == OPPORTUNISTIC`.
src = inspect.getsource(LXMRouter.handle_outbound)
if "not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC" not in src:
fail("S7.1 LXMF path-request gating expression not found in "
"LXMRouter.handle_outbound — upstream may have changed the rule. "
f"Source excerpt:\n{src[:2000]}")
if "Pre-emptively requesting unknown path" not in src:
fail("S7.1 LXMF 'Pre-emptively requesting unknown path' log line not found "
"in handle_outbound")
print(f"PASS S7.1 LXMF {LXMF.__version__}: path? issued only when "
"has_path()=False AND method=OPPORTUNISTIC "
"(LXMF/LXMRouter.py handle_outbound)")
def main():
print(f"verify_path_request.py against RNS {RNS.__version__} / LXMF {LXMF.__version__}")
verify_well_known_hashes()
verify_request_path_payload_format()
verify_lxmf_only_when_pathless()
print("ALL PASS")
if __name__ == "__main__":
main()