Add four more verifiers + receive-propagated flow + frontmatter version

Verifiers:
  tools/verify_proof_packet.py — locks in §6.5. Toggles
    Reticulum.__use_implicit_proof to test both modes; confirms
    Identity.prove emits 64B (implicit) or 96B (explicit) proof
    body; PacketReceipt.validate_proof accepts both lengths and
    rejects an 80B body.
  tools/verify_link_handshake.py — locks in §6.1, §6.2, §6.3, §6.6.
    Most importantly verifies the previously-corrected §6.2 LRPROOF
    body order (signature(64) || responder_X25519_pub(32) ||
    [signalling]) and §6.3 link_id offsets (N=2 for HEADER_1) by
    actually building a Link initiator-side, capturing the
    LINKREQUEST raw bytes, computing link_id by the spec recipe,
    running validate_request inline (since the upstream wrapper
    swallows exceptions), and confirming the responder's LRPROOF
    bytes match the spec layout. This was the single most
    interop-critical correction we made.
  tools/verify_rnode_split.py — locks in §8.3. Pure-function
    re-implementation of the canonical TX and RX state machines
    from RNode_Firmware.ino:359-446 + 716-742; tests header-byte
    layout, single-frame TX, split-frame TX (300B → 254+46 with
    shared header byte), all four RX state-machine cases (a/b/c/d
    from the spec table), and end-to-end TX/RX round-trip at
    sizes 50, 254, 255, 300, 508.
  tools/verify_msgpack_quirk.py — locks in §9.3. Confirms umsgpack
    distinguishes str (fixstr/0xa5) from bytes (bin8/0xc4); confirms
    LXMF.display_name_from_app_data parses bytes-encoded display
    names correctly and silently returns None (not crash) on
    str-encoded ones, matching the bug-tolerance documented in §9.3.

All 11 verifiers pass against RNS 1.2.0 / LXMF 0.9.6.

Plus:
  - SPEC.md frontmatter: 'Last verified against' line per agent.md §7.
  - flows/receive-propagated-lxmf.md: closing half of the propagated
    LXMF lifecycle. /get listing query, fetch query, ack-and-purge
    via the have_ids slot, message-bundle unpack and dispatch
    through lxmf_delivery.
  - tools/README.md status table refreshed; flows/README.md flips
    receive-propagated-lxmf.md to .

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Rob 2026-05-03 12:54:34 -04:00
commit abf66b9cef
8 changed files with 990 additions and 6 deletions

View file

@ -2,6 +2,8 @@
A byte-level reference for implementing Reticulum-compatible clients. This document focuses on what implementations need to interop with the canonical Python implementation ([`markqvist/Reticulum`](https://github.com/markqvist/Reticulum) and [`markqvist/LXMF`](https://github.com/markqvist/LXMF)) plus the existing client ecosystem (Sideband, Nomadnet, MeshChat, the various firmware projects).
**Last verified against:** `RNS 1.2.0` / `LXMF 0.9.6` / `RNode_Firmware` (master at the spec's last revision date). Each section's source citations were re-checked against these versions; runtime verifiers in [`tools/`](tools/) lock the wire-format claims in against actually-running upstream code. When you upgrade past these, re-run every `tools/verify_*.py` and look for `FAIL`s.
Source citations refer to the standard `pip install rns lxmf` install layout (`RNS/`, `LXMF/`).
---

View file

@ -19,7 +19,7 @@ The two views are complementary: SPEC.md tells you what each piece looks like; t
| [`send-announce.md`](send-announce.md) (build, sign, transmit, ratchet rotation, periodic re-announce) | ✅ |
| [`forward-announce.md`](forward-announce.md) (transport-node rebroadcast logic, announce_cap, queue) | ✅ |
| [`send-propagated-lxmf.md`](send-propagated-lxmf.md) (PROPAGATED method, via a propagation node) | ✅ |
| `receive-propagated-lxmf.md` (recipient pulling messages via `/get`) | ⏳ |
| [`receive-propagated-lxmf.md`](receive-propagated-lxmf.md) (recipient pulling messages via `/get`) | ✅ |
## Conventions

View file

@ -0,0 +1,124 @@
# Flow: receive a propagated LXMF message (recipient pulls via `/get`)
The closing half of [`send-propagated-lxmf.md`](send-propagated-lxmf.md): how a recipient client retrieves messages that were store-and-forwarded for it by a propagation node. Pinned against **RNS 1.2.0 / LXMF 0.9.6**; cross-references [`../SPEC.md`](../SPEC.md) §5.8 (propagation protocol), §11 (REQUEST/RESPONSE).
This is the inverse-side flow that turns "the message was queued at a propagation node" (`send-propagated-lxmf.md` step 9) into "the message arrives in the recipient's inbox".
---
## Preconditions
- Recipient has discovered at least one propagation node via its `lxmf.propagation` announce. The recipient's `LXMRouter.outbound_propagation_node` records which one to use.
- Recipient has a path to the propagation node in `Transport.path_table`.
---
## Sequence
### 1. Recipient initiates retrieval
`LXMRouter.request_messages_from_propagation_node(identity, max_messages)` (`LXMF/LXMRouter.py:485+`). Triggered by:
- Manual user action (Sideband "Refresh inbox" button).
- Periodic background poll (every few minutes by default in long-running clients).
- An incoming `lxmf.propagation` announce from the configured PN, signalling availability.
### 2. Open a Link to the propagation node
If `Transport.has_path(propagation_node_dest)` is False, request_path first and defer (same pattern as opportunistic LXMF send). Otherwise:
```python
self.outbound_propagation_link = RNS.Link(
propagation_node_destination,
established_callback=msg_request_established_callback,
)
```
(`LXMF/LXMRouter.py:514`). Standard Link establishment per `flows/send-link-lxmf.md` steps 3-4.
### 3. Identify on the link
Once the link is `ACTIVE`, the recipient calls `link.identify(my_lxmf_delivery_identity)` so the propagation node knows whose mail to deliver. Without this, the `/get` request handler returns `LXMPeer.ERROR_NO_IDENTITY` (per §5.8.3).
### 4. Listing query — `/get` with `[None, None]`
```python
data = [None, None] # [wanted, have]
link.request("/get", data, response_callback=on_message_list)
```
The propagation node's `message_get_request` handler at `LXMF/LXMRouter.py:1427-1450` walks `propagation_entries` for messages keyed to the requester's destination_hash and returns:
```python
[ [transient_id_1(16), size_1(int)],
[transient_id_2(16), size_2(int)],
... ] # sorted by size ascending
```
For `[None, None]`, the response after the propagation node strips its internal `(transient_id, size)` tuples to just transient_ids:
```python
return [transient_id_1, transient_id_2, ...]
```
### 5. Recipient picks which messages to fetch
Application logic decides. Common heuristic: fetch all transient_ids the recipient doesn't already have stored locally, prioritising smaller messages first. Build:
```python
data = [wanted_ids, have_ids, transfer_limit_kb]
link.request("/get", data, response_callback=on_message_batch)
```
- `wanted_ids` — list of 16-byte transient_ids to deliver.
- `have_ids` — list of 16-byte transient_ids the recipient already has stored locally; the propagation node deletes these from its store as a side effect (§5.8.3 "ack and purge").
- `transfer_limit_kb` — optional cap on total bytes the recipient is willing to receive in one batch.
### 6. Propagation node returns a message bundle
`message_get_request` builds `response_messages = []` of the matching LXMF bodies, packs them as:
```python
data = msgpack.packb([time.time(), [lxmf_data_1, lxmf_data_2, ...]])
```
Returns this as a §11 RESPONSE. If the bundle fits in `link.mdu` it's a single Link DATA packet; otherwise it's a Resource (per `flows/send-resource.md`).
### 7. Recipient unpacks the bundle and processes each message
The recipient's `propagation_resource_concluded` handler (or its single-packet equivalent) at `LXMF/LXMRouter.py:2194+` walks the bundle:
```python
data = msgpack.unpackb(resource.data.read())
remote_timebase = data[0]
messages = data[1]
for lxmf_data in messages:
self.lxmf_delivery(lxmf_data, destination_type=SINGLE)
```
`lxmf_delivery` is the same path used for opportunistic and direct receive (`flows/receive-opportunistic-lxmf.md` step 11+) — it calls `LXMessage.unpack_from_bytes`, validates the signature against the sender's known identity, runs ticket / stamp / dedup checks, and fires the application's delivery callback. **The LXMF body bytes are identical regardless of how they arrived** — opportunistic, direct over a Link, or propagated. The propagation node never touched the encrypted body.
### 8. (Optional) Acknowledge and purge
In the next `/get` request, the recipient passes the just-fetched transient_ids in the `have_ids` slot per step 5. The propagation node deletes those entries on receipt. This caps the propagation node's storage growth — without it, every message would accumulate forever until the operator manually purged.
A clean-room recipient that doesn't implement the purge handshake works correctly (gets messages delivered) but contributes to long-term storage growth on shared propagation nodes. Implement the purge as a courtesy.
### 9. Link teardown or reuse
After the bundle is processed, the recipient either tears down the link (`link.teardown()` per §6.7) or keeps it for another `/get` round if more messages are expected. Most clients tear down after each successful batch — the propagation node's `lxmf.propagation` destination is `ALLOW_ALL` for `/offer` and `/get` so reopening a fresh link has no auth cost.
---
## Source map
| Step | File | Function / line |
|---|---|---|
| 1 | `LXMF/LXMRouter.py` | `request_messages_from_propagation_node`, line 485 |
| 2 | `LXMF/LXMRouter.py` | outbound link establishment, line 505-520 |
| 3 | `RNS/Link.py` | `Link.identify`, line ~1010 |
| 4-6 | `LXMF/LXMRouter.py` | `message_get_request` handler, line 1427-1500 |
| 7 | `LXMF/LXMRouter.py` | `propagation_resource_concluded`, line 2194+ |
| 7 | `LXMF/LXMRouter.py` | `lxmf_delivery`, line 1732 |
| 8 | `LXMF/LXMRouter.py` | purge via `/get` `have_ids` slot, line 1453-1465 |
| 9 | `RNS/Link.py` | `teardown`, line 699 |

View file

@ -25,13 +25,15 @@ Populated against RNS 1.2.0 / LXMF 0.9.6:
|---|---|---|
| `verify_destination_hash.py` | §1.1, §1.2, §1.3 — identity composition, `dest_hash = SHA256(name_hash \|\| identity_hash)[:16]`, on-disk private-key round-trip via `to_file`/`from_file` | ✅ |
| `verify_packet_header.py` | §2.1, §2.2, §2.3 — flag byte layout, HEADER_1/HEADER_2 form, originator HEADER_1→HEADER_2 conversion via upstream `Transport.outbound` | ✅ |
| `verify_token_crypto.py` | §3 — Token encrypt/decrypt, HKDF salt = identity_hash, HMAC-then-AES order, PKCS#7 padding | ✅ |
| `verify_announce_app_data.py` | §4.3 — LXMF announce app_data 2-element form, parser tolerance | ✅ |
| `verify_announce_roundtrip.py` | §4.1, §4.2, §4.5 — announce body layout, signature, dest_hash recompute, tamper rejection | ✅ |
| `verify_lxmf_opportunistic.py` | §5.1, §5.2, §5.5, §5.6 — full identity → encrypt → decrypt → parse round-trip | ✅ |
| `verify_proof_packet.py` | §6.5 — implicit (64B) and explicit (96B) proof body forms, validator length-dispatch | ✅ |
| `verify_link_handshake.py` | §6.1, §6.2, §6.3, §6.6 — LINKREQUEST/LRPROOF body order, link_id derivation, signalling | ✅ |
| `verify_path_request.py` | §1.2 well-known hashes, §7.1 LXMF path-preamble gating | ✅ |
| `verify_rnode_split.py` | §8.3 — RNode air-frame split-packet TX/RX state machines | ✅ |
| `verify_msgpack_quirk.py` | §9.3 — encoding name as bytes vs str affects upstream parsing | ✅ |
| `regen_identities.py` | regenerates `test-vectors/identities.json` | ✅ |
| `verify_announce_roundtrip.py` | §4 — announce build matches upstream `Identity().announce()` bytes | ⏳ |
| `verify_token_crypto.py` | §3 — Token encrypt/decrypt against upstream `RNS.Cryptography.Token` | ⏳ |
| `verify_lxmf_opportunistic.py` | §5.1, §5.5 — opportunistic LXMF body bytes match upstream | ⏳ |
| `verify_link_handshake.py` | §6 — LINKREQUEST + LRPROOF + session key match upstream | ⏳ |
| `verify_msgpack_quirk.py` | §9.3 — encoding name as bytes vs str affects upstream parsing | ⏳ |
See [`../agent.md`](../agent.md) §5 and [`../todo.md`](../todo.md) for the remaining priority order.

View file

@ -0,0 +1,278 @@
"""
Verifier for SPEC.md S6.1, S6.2, S6.3, S6.6.
Locks in the corrections previously made to S6.2 (LRPROOF body order)
and S6.3 (link_id derivation offsets) by exercising the actual upstream
Link.validate_request -> handshake -> prove pipeline and asserting the
wire bytes match the spec at every step.
Scenarios:
1. LINKREQUEST body layout per S6.1:
initiator_X25519_pub(32) || initiator_Ed25519_pub(32) || [signalling(3)]
Build a Link initiator-side, capture request_data, slice and
verify each region.
2. link_id derivation per S6.3:
link_id = SHA256(get_hashable_part(LINKREQUEST))[:16]
where hashable_part = byte(flags & 0x0F) || raw[N:] with N=2 for
HEADER_1, N=18 for HEADER_2 (the corrected offsets earlier
spec revisions had 18/34 which was wrong).
Verified by computing the link_id by hand from the packed
LINKREQUEST and confirming it matches Link.set_link_id.
3. LRPROOF body layout per S6.2:
signature(64) || responder_X25519_pub(32) || [signalling(3)]
(Earlier spec revisions had link_id || pub || sig || signalling,
which was wrong link_id is in the packet header, not the body.)
Build a responder Link via Link.validate_request, capture the
proof_data emitted by Link.prove, slice and verify.
4. signed_data per S6.2 (used in the LRPROOF):
link_id || responder_X25519_pub || responder_long_term_Ed25519_pub
|| [signalling]
Reconstruct by hand and verify the signature in proof_data
validates against this signed_data.
5. S6.6 signalling 3-byte trailer encoding/decoding for both
LINKREQUEST and LRPROOF:
byte 0: top 3 bits = mode, low 5 = mtu[20:16]
byte 1: mtu[15:8]
byte 2: mtu[7:0]
Encode a known (mtu, mode) pair via Link.signalling_bytes,
decode by hand, confirm round-trip.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import hashlib
import os
import sys
import struct
import tempfile
import RNS
from RNS.Link import Link
from RNS.Packet import Packet
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-link-")
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def verify_signalling_bytes_layout():
"""S6.6.1: 24-bit packed value, big-endian, top 3 bits = mode,
low 21 bits = mtu."""
mtu = 0x0123AB # arbitrary 21-bit value
mode = Link.MODE_AES256_CBC
sb = Link.signalling_bytes(mtu, mode)
if len(sb) != 3:
fail(f"S6.6 signalling_bytes returned {len(sb)} bytes, want 3")
# Decode by hand per the spec
decoded_mode = (sb[0] & 0xE0) >> 5
decoded_mtu = ((sb[0] << 16) | (sb[1] << 8) | sb[2]) & 0x1FFFFF
if decoded_mode != mode:
fail(f"S6.6 mode round-trip mismatch: encoded {mode}, decoded {decoded_mode}")
if decoded_mtu != mtu:
fail(f"S6.6 mtu round-trip mismatch: encoded {mtu:#x}, decoded {decoded_mtu:#x}")
# Confirm bit positions: mode is in top 3 bits of byte 0
assert (sb[0] & 0xE0) == ((mode << 5) & 0xE0)
# And the mtu fits in the low 21 bits of the 24-bit packed value
full = (sb[0] << 16) | (sb[1] << 8) | sb[2]
assert (full & 0x1FFFFF) == mtu
print(f"PASS S6.6.1 signalling layout: mtu={mtu:#x} mode={mode} -> {sb.hex()} -> "
f"({decoded_mtu:#x}, {decoded_mode})")
def build_linkrequest():
"""Construct a LINKREQUEST packet via the upstream Link initiator path,
return the resulting Packet plus a 'fake destination' SimpleLink so
we can exercise validate_request without actually transmitting."""
# The remote destination Bob's identity must already be discoverable.
bob_id = RNS.Identity()
bob_dest = RNS.Destination(bob_id, RNS.Destination.IN, RNS.Destination.SINGLE,
"verify_link", "responder")
# Initiator-side outbound destination to Bob
bob_dest_out = RNS.Destination(bob_id, RNS.Destination.OUT, RNS.Destination.SINGLE,
"verify_link", "responder")
RNS.Identity.remember(b"\x00"*32, bob_dest.hash, bob_id.get_public_key(), None)
# Build a Link to bob_dest_out. The constructor packs and sends; we
# capture the LINKREQUEST via Transport.outbound monkey-patch (not
# Packet.send, because send() calls pack() before outbound and we
# need packet.raw populated).
captured = {}
real_outbound = RNS.Transport.outbound
def fake_outbound(packet):
captured["raw"] = packet.raw
captured["request_data"] = packet.data
return True
RNS.Transport.outbound = staticmethod(fake_outbound)
try:
link = Link(destination=bob_dest_out)
finally:
RNS.Transport.outbound = real_outbound
return link, bob_dest, bob_id, captured
def verify_linkrequest_body_layout(link, captured):
"""S6.1: initiator_X25519_pub(32) || initiator_Ed25519_pub(32) || [signalling(3)]"""
body = captured["request_data"]
if len(body) not in (Link.ECPUBSIZE, Link.ECPUBSIZE + Link.LINK_MTU_SIZE):
fail(f"S6.1 LINKREQUEST body is {len(body)} bytes; "
f"want {Link.ECPUBSIZE} or {Link.ECPUBSIZE + Link.LINK_MTU_SIZE}")
initiator_x25519_pub = body[:32]
initiator_ed25519_pub = body[32:64]
if initiator_x25519_pub != link.pub_bytes[:32]:
fail(f"S6.1 LINKREQUEST X25519 pub mismatch")
# Note: link.pub_bytes covers just the X25519 in some impls; sig_pub_bytes is separate.
# The spec uses ECPUBSIZE = 64 = X25519(32) + Ed25519(32).
print(f"PASS S6.1 LINKREQUEST body layout: "
f"initiator_X25519({len(initiator_x25519_pub)}) || "
f"initiator_Ed25519({len(initiator_ed25519_pub)})"
+ (f" || signalling({len(body)-64})" if len(body) > 64 else ""))
def verify_link_id_derivation(link, captured):
"""S6.3: link_id = SHA256(byte(flags & 0x0F) || raw[N:])[:16]
with N = 2 for HEADER_1 (the corrected value)."""
raw = captured["raw"]
# Manually compute hashable_part for HEADER_1 (the initiator-side form)
hashable = bytes([raw[0] & 0x0F]) + raw[2:]
# Strip trailing signalling if body length > ECPUBSIZE
if len(captured["request_data"]) > Link.ECPUBSIZE:
diff = len(captured["request_data"]) - Link.ECPUBSIZE
hashable = hashable[:-diff]
expected_link_id = hashlib.sha256(hashable).digest()[:16]
if expected_link_id != link.link_id:
fail(f"S6.3 link_id by-hand recompute mismatch:\n"
f" spec recipe: {expected_link_id.hex()}\n"
f" upstream: {link.link_id.hex()}")
print(f"PASS S6.3 link_id derivation (N=2 for HEADER_1, signalling stripped): "
f"{link.link_id.hex()}")
def verify_lrproof_layout_and_signed_data(link, captured, bob_id, bob_dest):
"""S6.2: LRPROOF body = signature(64) || responder_X25519_pub(32) || [signalling]
signed_data = link_id || responder_X25519_pub || responder_long_term_Ed25519_pub || [signalling]"""
# Build the responder-side Link by hand (inlined from validate_request)
# so any exception surfaces cleanly rather than being swallowed.
request_data = captured["request_data"]
inbound = RNS.Packet(None, captured["raw"])
if not inbound.unpack():
fail("Failed to unpack LINKREQUEST on responder side")
inbound.destination = bob_dest
responder_link = Link(
owner=bob_dest,
peer_pub_bytes=request_data[:Link.ECPUBSIZE//2],
peer_sig_pub_bytes=request_data[Link.ECPUBSIZE//2:Link.ECPUBSIZE],
)
responder_link.set_link_id(inbound)
if len(request_data) == Link.ECPUBSIZE + Link.LINK_MTU_SIZE:
responder_link.mtu = Link.mtu_from_lr_packet(inbound) or RNS.Reticulum.MTU
responder_link.mode = Link.mode_from_lr_packet(inbound)
responder_link.destination = inbound.destination
responder_link.handshake()
# Capture LRPROOF emission. Patch Transport.outbound (not Packet.send)
# so Packet.pack() runs normally and packet.raw is populated.
captured_proof = {}
real_outbound = RNS.Transport.outbound
def fake_outbound(packet):
if packet.context == RNS.Packet.LRPROOF:
captured_proof["raw"] = packet.raw
captured_proof["proof_data"] = packet.data
captured_proof["dest_hash"] = packet.destination.link_id
return True # signal "sent" so callers don't retry
RNS.Transport.outbound = staticmethod(fake_outbound)
try:
responder_link.prove()
finally:
RNS.Transport.outbound = real_outbound
if "proof_data" not in captured_proof:
fail("LRPROOF was not emitted via Packet.send")
proof_data = captured_proof["proof_data"]
# S6.2: signature(64) || responder_X25519_pub(32) || [signalling(3)]
if len(proof_data) not in (96, 96 + Link.LINK_MTU_SIZE):
fail(f"S6.2 LRPROOF body is {len(proof_data)} bytes, want 96 or 99")
signature = proof_data[:64]
responder_x25519 = proof_data[64:96]
signalling = proof_data[96:] if len(proof_data) > 96 else b""
if responder_x25519 != responder_link.pub_bytes:
fail(f"S6.2 LRPROOF responder X25519 pub mismatch")
# Reconstruct signed_data per S6.2 corrected:
signed_data = (responder_link.link_id
+ responder_x25519
+ bob_id.get_public_key()[Link.ECPUBSIZE//2:Link.ECPUBSIZE] # long-term Ed25519 pub
+ signalling)
if not bob_id.validate(signature, signed_data):
fail("S6.2 hand-rebuilt signed_data did not validate signature — "
"spec body order doesn't match upstream emission")
# Outer packet: dest_hash position is the link_id (S6.2 wire summary)
if captured_proof["dest_hash"] != responder_link.link_id:
fail(f"S6.2 LRPROOF outer dest_hash position != link_id")
print(f"PASS S6.2 LRPROOF body order: "
f"signature(64) || responder_X25519_pub(32)"
+ (f" || signalling({len(signalling)})" if signalling else "")
+ f"; signed_data = link_id || pub || long_term_Ed25519_pub"
+ (" || signalling" if signalling else ""))
def main():
print(f"verify_link_handshake.py against RNS {RNS.__version__}")
init_minimal_rns()
try:
# 1. Signalling layout (independent of an actual link)
verify_signalling_bytes_layout()
# 2-4. Build a real LINKREQUEST and walk through validate_request + prove
link, bob_dest, bob_id, captured = build_linkrequest()
verify_linkrequest_body_layout(link, captured)
verify_link_id_derivation(link, captured)
verify_lrproof_layout_and_signed_data(link, captured, bob_id, bob_dest)
finally:
try: RNS.Reticulum.exit_handler()
except Exception: pass
print("ALL PASS")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,119 @@
"""
Verifier for SPEC.md S9.3 (RNS bundles `umsgpack` encode display
names as bytes, not str).
The bundled `RNS.vendor.umsgpack` distinguishes between Python `str`
(encoded as msgpack `fixstr/str8/str16/str32` types `0xa0..0xbf`,
`0xd9`, `0xda`, `0xdb`) and Python `bytes` (encoded as `bin8/bin16/
bin32` types `0xc4`, `0xc5`, `0xc6`). The downstream LXMF parser at
`LXMF/LXMF.py:131` does `dn.decode("utf-8")` on the unpacked first
element this only works when the producer used `bytes` (so umsgpack
unpacks as Python `bytes` which has `.decode`).
If a producer uses `str` instead, umsgpack unpacks back to a Python
`str` which has no `.decode("utf-8")` method, the LXMF parser raises
AttributeError, and the message's display name is silently lost.
This verifier confirms:
1. umsgpack.packb on a `str` produces fixstr/str8/str16/str32 prefix
bytes; on `bytes` produces bin8/bin16/bin32.
2. umsgpack.unpackb round-trips a `bytes` value back to Python `bytes`,
and a `str` value back to Python `str`.
3. LXMF.display_name_from_app_data returns the decoded display name
for a bytes-encoded producer and returns None (with no crash) for
a str-encoded producer i.e. the gotcha is real but contained.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import sys
import RNS
import RNS.vendor.umsgpack as umsgpack
import LXMF
from LXMF import LXMF as LXMF_helpers
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def verify_pack_str_uses_str_prefix():
# 5-character str fits in fixstr (0xa0..0xbf with the length in the low 5 bits)
blob = umsgpack.packb("hello")
if blob[0] not in (0xa5, 0xd9, 0xda, 0xdb):
# 0xa5 = fixstr length 5 (0xa0 | 5)
fail(f"umsgpack.packb('hello') produced unexpected first byte 0x{blob[0]:02x}; "
f"want fixstr 0xa5 or str8/16/32")
if blob[0] != 0xa5:
fail(f"umsgpack.packb('hello') did not use fixstr; got 0x{blob[0]:02x}")
print(f"PASS S9.3 packb('hello') -> 0x{blob[0]:02x}: fixstr (str family)")
def verify_pack_bytes_uses_bin_prefix():
# 5-byte bytes uses bin8 (0xc4 NN ...) — there's no fixbin
blob = umsgpack.packb(b"hello")
if blob[0] not in (0xc4, 0xc5, 0xc6):
fail(f"umsgpack.packb(b'hello') produced unexpected first byte 0x{blob[0]:02x}; "
f"want bin8/16/32")
if blob[0] != 0xc4:
fail(f"umsgpack.packb(b'hello') did not use bin8; got 0x{blob[0]:02x}")
if blob[1] != 5:
fail(f"bin8 length byte != 5: 0x{blob[1]:02x}")
print(f"PASS S9.3 packb(b'hello') -> 0xc4 05: bin8 (bin family)")
def verify_unpack_round_trip_str_vs_bytes():
# str round-trips to str
s = umsgpack.unpackb(umsgpack.packb("hello"))
if not isinstance(s, str):
fail(f"umsgpack.unpackb(packb('hello')) returned {type(s).__name__}, want str")
# bytes round-trips to bytes
b = umsgpack.unpackb(umsgpack.packb(b"hello"))
if not isinstance(b, bytes):
fail(f"umsgpack.unpackb(packb(b'hello')) returned {type(b).__name__}, want bytes")
print("PASS S9.3 round-trip preserves str/bytes distinction")
def verify_lxmf_display_name_parser_quirk():
"""LXMF.display_name_from_app_data only works when the producer used bytes."""
# 1. Correct producer: bytes-encoded display name in a 2-element array
correct_blob = umsgpack.packb([b"AliceTest", None])
name = LXMF_helpers.display_name_from_app_data(correct_blob)
if name != "AliceTest":
fail(f"S9.3 correct (bytes-encoded) display name parsed wrong: {name!r}")
print("PASS S9.3 correct producer: msgpack([bytes, None]) -> 'AliceTest'")
# 2. Wrong producer: str-encoded display name. LXMF should NOT crash —
# it should return None. (The except in display_name_from_app_data
# at LXMF.py:133-135 logs the error and returns None.)
wrong_blob = umsgpack.packb(["AliceTest", None])
try:
result = LXMF_helpers.display_name_from_app_data(wrong_blob)
except Exception as e:
fail(f"S9.3 LXMF parser crashed on str-encoded producer: {e}")
if result is not None:
fail(f"S9.3 LXMF parser silently accepted str-encoded producer "
f"(returned {result!r}); spec says this should fail to None")
print("PASS S9.3 wrong producer: msgpack([str, None]) -> None "
"(LXMF.py:131 .decode raises AttributeError, parser returns None)")
def main():
print(f"verify_msgpack_quirk.py against RNS {RNS.__version__} / LXMF {LXMF.__version__}")
verify_pack_str_uses_str_prefix()
verify_pack_bytes_uses_bin_prefix()
verify_unpack_round_trip_str_vs_bytes()
verify_lxmf_display_name_parser_quirk()
print("ALL PASS")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,182 @@
"""
Verifier for SPEC.md S6.5 (regular PROOF packet wire form).
Three scenarios against upstream RNS 1.2.0:
1. Implicit-mode opportunistic DATA proof: when Reticulum is
configured with use_implicit_proof = True (the upstream default
per Reticulum.py:259), Identity.prove emits a 64-byte body
containing only the Ed25519 signature over packet.packet_hash.
2. Explicit-mode opportunistic DATA proof: when use_implicit_proof
= False, Identity.prove emits a 96-byte body of
packet_hash(32) || signature(64).
3. Receiver-side length dispatch in PacketReceipt.validate_proof:
accepts both 64- and 96-byte forms; rejects bodies of any other
length.
Reticulum doesn't support reinitialisation in one process; we toggle
use_implicit_proof via the name-mangled class variable
RNS.Reticulum._Reticulum__use_implicit_proof rather than running
two separate Reticulum instances. The toggle is read by
should_use_implicit_proof(), which is what Identity.prove dispatches on.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import os
import sys
import tempfile
import RNS
from RNS.Packet import PacketReceipt
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
def init_minimal_rns():
cfg_dir = tempfile.mkdtemp(prefix="rns-verify-proof-")
cfg_path = os.path.join(cfg_dir, "config")
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("[reticulum]\nenable_transport = No\nshare_instance = No\n")
return RNS.Reticulum(configdir=cfg_dir, loglevel=0)
def set_implicit_proof(value: bool) -> None:
RNS.Reticulum._Reticulum__use_implicit_proof = value
if RNS.Reticulum.should_use_implicit_proof() != value:
fail(f"Failed to toggle use_implicit_proof to {value}")
def build_packet_to_prove(identity, name_aspect):
"""Build a regular DATA packet that we can then prove. name_aspect lets
each call create a unique destination so we don't trip the
'already registered' check."""
dest = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE,
"verify_proof", name_aspect)
pkt = RNS.Packet(dest, b"some payload bytes", create_receipt=False)
pkt.pack()
pkt.update_hash()
pkt.fromPacked = True
pkt.destination = dest
return dest, pkt
def capture_proof_body(identity, target_packet):
"""Run identity.prove() against target_packet and capture the proof
packet's body via Packet.send monkey-patch."""
captured = {}
real_packet_class = RNS.Packet
class CapturePacket(real_packet_class):
def send(self, *a, **kw):
captured["data"] = self.data
captured["context"] = self.context
captured["packet_type"] = self.packet_type
captured["dest_hash"] = self.destination.hash
return None
RNS.Packet = CapturePacket
try:
identity.prove(target_packet)
finally:
RNS.Packet = real_packet_class
return captured
def verify_implicit_form(identity):
set_implicit_proof(True)
dest, pkt = build_packet_to_prove(identity, "implicit_test")
captured = capture_proof_body(identity, pkt)
body = captured["data"]
if len(body) != PacketReceipt.IMPL_LENGTH:
fail(f"S6.5 implicit proof body is {len(body)} bytes, want IMPL_LENGTH = "
f"{PacketReceipt.IMPL_LENGTH} (= 64)")
if not identity.validate(body, pkt.packet_hash):
fail("S6.5 implicit proof body did not validate as signature(packet_hash)")
if captured["dest_hash"] != pkt.packet_hash[:16]:
fail(f"S6.5 implicit proof dest_hash != packet_hash[:16]")
if captured["packet_type"] != RNS.Packet.PROOF:
fail(f"S6.5 implicit proof packet_type = {captured['packet_type']}, want PROOF (3)")
print("PASS S6.5.1 implicit proof form: 64B = Ed25519_sign(packet_hash) only")
def verify_explicit_form(identity):
set_implicit_proof(False)
dest, pkt = build_packet_to_prove(identity, "explicit_test")
captured = capture_proof_body(identity, pkt)
body = captured["data"]
if len(body) != PacketReceipt.EXPL_LENGTH:
fail(f"S6.5 explicit proof body is {len(body)} bytes, want EXPL_LENGTH = "
f"{PacketReceipt.EXPL_LENGTH} (= 96)")
if body[:32] != pkt.packet_hash:
fail(f"S6.5 explicit proof body[:32] != packet_hash")
if not identity.validate(body[32:], pkt.packet_hash):
fail("S6.5 explicit proof body[32:] did not validate as signature(packet_hash)")
print("PASS S6.5.1 explicit proof form: 96B = packet_hash(32) || Ed25519_sign(packet_hash)")
def verify_validator_length_dispatch(identity):
"""S6.5.5: validate_proof must accept both 64- and 96-byte bodies and
reject anything else."""
dest, pkt = build_packet_to_prove(identity, "validator_test")
pkt.create_receipt = True
pkt.receipt = PacketReceipt(pkt)
receipt = pkt.receipt
sig = identity.sign(pkt.packet_hash)
implicit_proof = sig # 64 bytes
explicit_proof = pkt.packet_hash + sig # 96 bytes
# Implicit
if not receipt.validate_proof(implicit_proof):
fail("S6.5.5 receiver rejected a valid implicit-form proof")
# Reset and try explicit
receipt.proved = False
receipt.status = PacketReceipt.SENT
if not receipt.validate_proof(explicit_proof):
fail("S6.5.5 receiver rejected a valid explicit-form proof")
# Bogus length
receipt.proved = False
receipt.status = PacketReceipt.SENT
if receipt.validate_proof(b"\x00" * 80):
fail("S6.5.5 receiver accepted an 80-byte body (must reject any non-{64,96})")
print("PASS S6.5.5 receiver length-dispatch: accepts 64B and 96B, rejects 80B")
def main():
print(f"verify_proof_packet.py against RNS {RNS.__version__}")
init_minimal_rns()
try:
identity = RNS.Identity()
verify_implicit_form(identity)
verify_explicit_form(identity)
verify_validator_length_dispatch(identity)
finally:
try: RNS.Reticulum.exit_handler()
except Exception: pass
print("ALL PASS")
if __name__ == "__main__":
main()

277
tools/verify_rnode_split.py Normal file
View file

@ -0,0 +1,277 @@
"""
Verifier for SPEC.md S8.3 (RNode air-frame split-packet protocol).
Pure-function verifier no Reticulum runtime needed because S8.3 is
a LoRa air-frame protocol that lives between RNodes, not on the host
KISS channel. We re-implement the canonical TX and RX state machines
in Python from the upstream RNode_Firmware source and exercise them.
Scenarios:
1. Header-byte layout: bit 7..4 random seq nibble, bit 0 FLAG_SPLIT,
bits 3..1 reserved zero. Verified via mask checks against the
constants from RNode_Firmware/Framing.h.
2. TX side, payload <= 254 bytes: emits one frame with FLAG_SPLIT=0,
header || payload, and the seq nibble is randomized per fresh TX.
3. TX side, payload > 254 bytes: emits two frames sharing the same
header byte (same seq nibble + FLAG_SPLIT=1), split at exactly
254 bytes of payload in the first frame and the remainder in the
second.
4. RX state machine, four cases of inbound frames per the table at
S8.3:
a. SPLIT first half buffer
b. SPLIT second half matching seq reassemble
c. SPLIT seq mismatch replace buffer
d. non-SPLIT after first half buffered discard buffer, deliver
5. Wire-byte equivalence: TX a 300-byte payload, run the resulting
frames through the RX state machine, confirm reassembled payload
bytes match the original.
Exit code 0 on PASS, non-zero on FAIL.
"""
from __future__ import annotations
import os
import sys
# ---- Constants from RNode_Firmware/Framing.h:105-108 + Config.h:59-61 ----
NIBBLE_SEQ = 0xF0
NIBBLE_FLAGS = 0x0F
FLAG_SPLIT = 0x01
SEQ_UNSET = 0xFF
MTU = 508 # max reassembled Reticulum packet payload
SINGLE_MTU = 255 # max LoRa frame size (header + payload)
HEADER_L = 1
def fail(msg: str) -> None:
print(f"FAIL: {msg}")
sys.exit(1)
# ---- TX side (mirrors RNode_Firmware.ino:716-742) ----
def tx_frames(payload, seq_nibble=None):
"""Return a list of LoRa frames (header + payload bytes) for a
given Reticulum packet payload, mirroring the RNode firmware's
transmit() function.
seq_nibble: optional override of the random sequence nibble for
deterministic testing. Must be in 0..15.
"""
if seq_nibble is None:
seq_nibble = os.urandom(1)[0] >> 4 # any 0..15 will do
if not 0 <= seq_nibble <= 15:
raise ValueError("seq_nibble must be 0..15")
if len(payload) > MTU:
raise ValueError(f"payload too large: {len(payload)} > {MTU}")
header = (seq_nibble << 4) & NIBBLE_SEQ # high nibble = seq, low = 0
if len(payload) > SINGLE_MTU - HEADER_L:
header |= FLAG_SPLIT
first_payload = payload[:SINGLE_MTU - HEADER_L]
second_payload = payload[SINGLE_MTU - HEADER_L:]
return [bytes([header]) + first_payload,
bytes([header]) + second_payload]
else:
return [bytes([header]) + payload]
# ---- RX side (mirrors RNode_Firmware.ino:359-446 receive_callback) ----
class RxStateMachine:
"""Re-implementation of the upstream RX reassembly logic. Calling
.deliver(frame) returns the reassembled Reticulum packet bytes if
a complete packet is now available, or None if the call merely
buffered a first-half / replaced state / processed a duplicate.
"""
def __init__(self):
self.seq = SEQ_UNSET
self.buf = b""
def deliver(self, frame):
if len(frame) < 1:
return None
header = frame[0]
sequence = (header & NIBBLE_SEQ) >> 4
is_split = (header & FLAG_SPLIT) != 0
payload = frame[1:]
if is_split and self.seq == SEQ_UNSET:
# Case a: first half — buffer
self.buf = payload
self.seq = sequence
return None
elif is_split and sequence == self.seq:
# Case b: second half matching seq — reassemble
assembled = self.buf + payload
self.buf = b""
self.seq = SEQ_UNSET
return assembled
elif is_split and sequence != self.seq:
# Case c: seq mismatch — replace with this as new first half
self.buf = payload
self.seq = sequence
return None
elif not is_split:
# Case d: non-split — clear any buffered first half, deliver
if self.seq != SEQ_UNSET:
self.buf = b""
self.seq = SEQ_UNSET
return payload
return None
# ---- Tests ----
def verify_header_layout():
# NIBBLE_SEQ has only the high nibble set
if NIBBLE_SEQ != 0xF0:
fail(f"NIBBLE_SEQ != 0xF0 (got {NIBBLE_SEQ:#x})")
# NIBBLE_FLAGS has only the low nibble set
if NIBBLE_FLAGS != 0x0F:
fail(f"NIBBLE_FLAGS != 0x0F (got {NIBBLE_FLAGS:#x})")
# FLAG_SPLIT is the low bit of the low nibble
if FLAG_SPLIT != 0x01:
fail(f"FLAG_SPLIT != 0x01 (got {FLAG_SPLIT:#x})")
# SEQ_UNSET is the all-ones sentinel
if SEQ_UNSET != 0xFF:
fail(f"SEQ_UNSET != 0xFF (got {SEQ_UNSET:#x})")
print("PASS S8.3 header constants: NIBBLE_SEQ=0xF0, FLAG_SPLIT=0x01, SEQ_UNSET=0xFF")
def verify_tx_single_frame():
# 100-byte payload — fits in one frame
payload = bytes(range(100))
frames = tx_frames(payload, seq_nibble=0x7)
if len(frames) != 1:
fail(f"S8.3 single-frame TX produced {len(frames)} frames, want 1")
f = frames[0]
if len(f) != 1 + 100:
fail(f"S8.3 single-frame size = {len(f)}, want 101")
header = f[0]
if (header & NIBBLE_SEQ) >> 4 != 0x7:
fail(f"S8.3 seq nibble lost: header = {header:#x}")
if header & FLAG_SPLIT:
fail(f"S8.3 single-frame TX set FLAG_SPLIT (header = {header:#x})")
if f[1:] != payload:
fail("S8.3 single-frame payload mangled")
print(f"PASS S8.3 TX single-frame (100B payload, seq=0x7, header={header:#04x})")
def verify_tx_split_frames():
# 300-byte payload — splits into 254 + 46
payload = bytes(i & 0xFF for i in range(300))
frames = tx_frames(payload, seq_nibble=0xA)
if len(frames) != 2:
fail(f"S8.3 split TX produced {len(frames)} frames, want 2")
h1, h2 = frames[0][0], frames[1][0]
if h1 != h2:
fail(f"S8.3 split frames have different headers: {h1:#04x} vs {h2:#04x}")
if not (h1 & FLAG_SPLIT):
fail(f"S8.3 split frames did not set FLAG_SPLIT (header={h1:#04x})")
if (h1 & NIBBLE_SEQ) >> 4 != 0xA:
fail(f"S8.3 split frame seq nibble lost: header={h1:#04x}")
if len(frames[0]) != 1 + 254:
fail(f"S8.3 split frame 1 size = {len(frames[0])}, want 255")
if len(frames[1]) != 1 + (300 - 254):
fail(f"S8.3 split frame 2 size = {len(frames[1])}, want {1 + 300 - 254}")
if frames[0][1:] != payload[:254]:
fail("S8.3 split frame 1 payload mismatch")
if frames[1][1:] != payload[254:]:
fail("S8.3 split frame 2 payload mismatch")
print(f"PASS S8.3 TX split frames (300B payload, 254+46 split, "
f"shared header={h1:#04x})")
def verify_rx_state_machine():
rx = RxStateMachine()
# Case d: non-split arrives first → deliver immediately
out = rx.deliver(bytes([0x30]) + b"non-split-1")
if out != b"non-split-1":
fail(f"S8.3 RX case d (non-split fresh) failed: got {out!r}")
# Case a: split first half → buffer
out = rx.deliver(bytes([0x51]) + b"first")
if out is not None:
fail(f"S8.3 RX case a (first half) returned non-None: {out!r}")
if rx.seq != 0x5:
fail(f"S8.3 RX case a did not buffer seq=5, got {rx.seq}")
# Case b: split second half with matching seq → reassemble
out = rx.deliver(bytes([0x51]) + b"second")
if out != b"first" + b"second":
fail(f"S8.3 RX case b (second half match) failed: got {out!r}")
if rx.seq != SEQ_UNSET:
fail(f"S8.3 RX state didn't reset after reassembly: seq={rx.seq}")
# Case a again, then case c: seq mismatch replaces buffer
rx.deliver(bytes([0x51]) + b"AAAA")
rx.deliver(bytes([0x71]) + b"BBBB")
if rx.seq != 0x7 or rx.buf != b"BBBB":
fail(f"S8.3 RX case c (seq mismatch) state wrong: seq={rx.seq}, buf={rx.buf!r}")
# Case d while a first-half is buffered → discard buffer, deliver non-split
rx.deliver(bytes([0x91]) + b"discardme") # buffers seq=9
out = rx.deliver(bytes([0x30]) + b"non-split-2")
if out != b"non-split-2":
fail(f"S8.3 RX case d (non-split with stale buffer) failed: got {out!r}")
if rx.seq != SEQ_UNSET:
fail(f"S8.3 RX case d did not discard stale buffer: seq={rx.seq}")
print("PASS S8.3 RX state machine: 4 cases (a/b/c/d) all correct")
def verify_tx_rx_roundtrip():
"""End-to-end: TX a payload, feed the resulting frames through
the RX state machine, confirm reassembled payload matches."""
for size in [50, 254, 255, 300, 508]:
original = bytes(i & 0xFF for i in range(size))
frames = tx_frames(original, seq_nibble=0x3)
rx = RxStateMachine()
out = None
for f in frames:
res = rx.deliver(f)
if res is not None:
out = res
if out != original:
fail(f"S8.3 TX/RX round-trip mismatch at size {size}:\n"
f" in: {original[:30]!r}... ({len(original)}B)\n"
f" out: {out[:30] if out else None!r}... ({len(out) if out else 0}B)")
print("PASS S8.3 TX/RX round-trip at sizes [50, 254, 255, 300, 508]")
def main():
print("verify_rnode_split.py — pure-function verifier (no RNS runtime needed)")
verify_header_layout()
verify_tx_single_frame()
verify_tx_split_frames()
verify_rx_state_machine()
verify_tx_rx_roundtrip()
print("ALL PASS")
if __name__ == "__main__":
main()