Add Gate 2D BLE peer session manager Python equivalence tests

This commit is contained in:
John Poole 2026-05-18 16:19:32 -07:00
commit a95baf9122
3 changed files with 668 additions and 0 deletions

View file

@ -0,0 +1,132 @@
# Gate 2D: BLEPeerSessionManager Python Equivalence
Date: 2026-05-18 16:11 America/Los_Angeles
Scope: Python equivalence harness only. No live BLE integration.
## Summary
Gate 2D added a Python test harness that compares current Python
`BLEInterface._handle_identity_handshake` behavior with the C++
`BLEPeerSessionManager` pybind decision/result model.
Added:
- `migration/tests/test_ble_peer_session_manager_python_equivalence.py`
- `migration/phase2/Gate2D_BLEPeerSessionManager_python_equivalence_20260518_1611.md`
- `migration/sql/mark_gate2d_protocol_session_python_equivalence_20260518_1611.sql`
Unchanged:
- `src/ble_reticulum/BLEInterface.py`
- live Python BLE behavior
- driver/platform code
- BlueZ/Bleak/DBus integration
- `RNS.Transport` integration
- ESP32 BLE integration
## Harness Design
The harness imports the real `BLEInterface` class, then binds these real Python methods onto a minimal fake object:
- `_handle_identity_handshake`
- `_check_duplicate_identity`
- `_compute_identity_hash`
- `_get_fragmenter_key`
The fake object supplies only the state those methods require:
- `address_to_identity`
- `identity_to_address`
- `address_to_interface`
- `spawned_interfaces`
- `peers`
- `fragmenters`
- `reassemblers`
- `_pending_identity_connections`
- `_pending_detach`
- `_last_real_data`
- `_zombie_timeout`
- fake driver
- fake fragmenter/reassembler constructors
- fake stale-address cleanup and spawn hooks
This avoids real RNS runtime, BlueZ, Bleak, DBus, and live BLE dependencies while still exercising the actual Python reference methods.
## Equivalence Cases Covered
| Case | Python reference behavior checked | C++ result checked |
|---|---|---|
| non-16-byte payload | returns `False` | `PassToReassembler`, `consumed=false` |
| new 16-byte identity | maps updated, fragmenter/reassembler created, pending removed, last-real-data updated, spawn requested | `AcceptedNewIdentity`, `CreateFragmentationState`, `MarkPeerReady`, `RemovePendingIdentity`, `MarkRealData` |
| known identity duplicate same | returns `True`, no normal data processing | `ConsumedDuplicateSameIdentity`, `consumed=true` |
| known identity duplicate mismatch | returns `True`, warning recorded | `ConsumedDuplicateMismatchedIdentity`, `Warn`, `consumed=true` |
| duplicate identity active elsewhere | disconnects current address | `RejectedDuplicateIdentity`, `DisconnectCurrentPeer` |
| duplicate identity with pending detach / stale old address | accepts new identity and invokes stale cleanup | `AcceptedNewIdentity`, `CleanupOldAddress`, `UpdatePeerAddress` |
| duplicate identity with zombie old connection | accepts new identity and disconnects old address | `AcceptedNewIdentity`, `DisconnectOldPeer`, `CleanupOldAddress` |
| MTU provided | fragmenter uses driver MTU | result MTU equals provided MTU |
| MTU missing | fragmenter falls back to 23 | result MTU equals 23 |
| pending identity removal | successful handshake removes pending address | `RemovePendingIdentity` |
| existing spawned interface path | existing peer address and `address_to_interface` update | `UpdatePeerAddress`, `MarkPeerReady` |
| exception compatibility | Python consumes packet after synthetic MTU exception | skipped for C++; manager has no platform exception surface |
The skipped exception case is intentional documentation: current Python consumes after adapter-side exceptions, but the C++ session manager does not call platform dependencies such as `driver.get_peer_mtu`, so `ErrorConsumed` is not exercised cleanly at Gate 2D.
## Verification
New Gate 2D test:
```sh
PYTHONPATH=migration/protocol_core pytest -q migration/tests/test_ble_peer_session_manager_python_equivalence.py
```
Result:
```text
11 passed, 1 skipped, 2 warnings in 0.24s
```
Gate 2C / Gate 2D regression:
```sh
PYTHONPATH=migration/protocol_core pytest -q migration/tests/test_ble_peer_session_manager_pybind.py migration/tests/test_ble_peer_session_manager_python_equivalence.py
```
Result:
```text
23 passed, 1 skipped, 2 warnings in 0.18s
```
Backend shim regression:
```sh
pytest -q migration/tests/test_fragmentation_backend_shim.py
```
Result:
```text
9 passed, 2 warnings in 0.64s
```
Migration regression set:
```sh
PYTHONPATH=migration/protocol_core pytest -q migration/tests/test_fragmentation_cpp_equivalence.py migration/tests/test_identity_helpers_cpp_equivalence.py migration/tests/test_ble_peer_session_manager_pybind.py migration/tests/test_ble_peer_session_manager_python_equivalence.py
```
Result:
```text
71 passed, 2 skipped, 2 warnings in 0.47s
```
## SQL
Companion SQL:
`migration/sql/mark_gate2d_protocol_session_python_equivalence_20260518_1611.sql`
The SQL marks `_handle_identity_handshake` as `PYTHON_EQUIVALENT` for phase
`2_ble_protocol_session_manager`, keeps tag `GLUE`, keeps `cpp_candidate=1`, and does not mark any field-accepted status.

View file

@ -0,0 +1,92 @@
BEGIN TRANSACTION;
INSERT INTO symbols (
source_file,
symbol_name,
symbol_type,
class_name,
line_number,
tag,
phase,
status,
cpp_candidate,
confidence,
rationale,
callers,
callees,
notes,
first_seen_at,
updated_at
)
VALUES (
'src/ble_reticulum/BLEInterface.py',
'_handle_identity_handshake',
'method',
'BLEInterface',
1202,
'GLUE',
'2_ble_protocol_session_manager',
'PYTHON_EQUIVALENT',
1,
'high',
'Gate 2D added a Python equivalence harness comparing current BLEInterface._handle_identity_handshake behavior with C++ BLEPeerSessionManager pybind decisions. This remains reference behavior for C++ session ownership, not a literal method port.',
'_data_received_callback',
'_compute_identity_hash; _check_duplicate_identity; driver.disconnect; driver.get_peer_mtu; _get_fragmenter_key; BLEFragmenter; BLEReassembler; _spawn_peer_interface',
'Gate 2D equivalence tests passed: 11 passed, 1 skipped for the documented platform-exception compatibility boundary. No BLEInterface.py production behavior, live BLE behavior, driver/platform code, or Phase 1 FIELD_ACCEPTED rows were changed.',
CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP
)
ON CONFLICT(source_file, class_name, symbol_name, line_number) DO UPDATE SET
tag = 'GLUE',
phase = '2_ble_protocol_session_manager',
status = 'PYTHON_EQUIVALENT',
cpp_candidate = 1,
confidence = 'high',
rationale = excluded.rationale,
callers = excluded.callers,
callees = excluded.callees,
notes = excluded.notes,
updated_at = CURRENT_TIMESTAMP;
INSERT INTO reviews (
symbol_id,
reviewed_at,
reviewer,
old_tag,
new_tag,
old_status,
new_status,
note
)
SELECT
symbol_id,
CURRENT_TIMESTAMP,
'Codex',
'GLUE',
'GLUE',
'PYTHON_BOUND',
'PYTHON_EQUIVALENT',
'Gate 2D completed the Python equivalence harness for BLEPeerSessionManager. New equivalence test passed: 11 passed, 1 skipped. Gate 2C/2D and migration regression sets passed. No FIELD_ACCEPTED status assigned.'
FROM symbols
WHERE source_file = 'src/ble_reticulum/BLEInterface.py'
AND class_name = 'BLEInterface'
AND symbol_name = '_handle_identity_handshake'
AND line_number = 1202;
SELECT
symbol_id,
source_file,
class_name,
symbol_name,
tag,
phase,
status,
cpp_candidate,
updated_at
FROM symbols
WHERE source_file = 'src/ble_reticulum/BLEInterface.py'
AND class_name = 'BLEInterface'
AND symbol_name = '_handle_identity_handshake'
AND line_number = 1202;
COMMIT;

View file

@ -0,0 +1,444 @@
import os
import sys
import types
import pytest
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
SRC_DIR = os.path.join(REPO_ROOT, "src")
CPP_BUILD_DIR = os.path.join(REPO_ROOT, "migration", "protocol_core")
sys.path.insert(0, SRC_DIR)
sys.path.insert(0, CPP_BUILD_DIR)
cpp = pytest.importorskip(
"ble_protocol_core_cpp",
reason=(
"compiled pybind11 module missing; build with "
"`python3 migration/protocol_core/setup.py build_ext --inplace`"
),
)
if "RNS" not in sys.modules:
rns = types.ModuleType("RNS")
rns.log = lambda *args, **kwargs: None
rns.LOG_EXTREME = 0
rns.LOG_DEBUG = 1
rns.LOG_INFO = 2
rns.LOG_NOTICE = 2
rns.LOG_WARNING = 3
rns.LOG_ERROR = 4
rns.LOG_CRITICAL = 5
rns.Reticulum = types.SimpleNamespace(configdir=None)
rns.Transport = types.SimpleNamespace(interfaces=[])
sys.modules["RNS"] = rns
if "RNS.Transport" not in sys.modules:
transport = types.ModuleType("RNS.Transport")
transport.interfaces = []
sys.modules["RNS.Transport"] = transport
if "RNS.Interfaces" not in sys.modules:
sys.modules["RNS.Interfaces"] = types.ModuleType("RNS.Interfaces")
if "RNS.Interfaces.Interface" not in sys.modules:
interface_module = types.ModuleType("RNS.Interfaces.Interface")
class Interface:
MODE_FULL = 0
interface_module.Interface = Interface
sys.modules["RNS.Interfaces.Interface"] = interface_module
try:
import ble_reticulum.BLEInterface as ble_interface_module
from ble_reticulum.BLEInterface import BLEInterface
except Exception as exc: # pragma: no cover - explicit environment guard
pytest.skip(f"BLEInterface reference import unavailable: {exc}", allow_module_level=True)
class FakeRNS:
LOG_DEBUG = 1
LOG_INFO = 2
LOG_WARNING = 3
LOG_ERROR = 4
logs = []
@staticmethod
def log(message, level):
FakeRNS.logs.append((level, message))
class FakeDriver:
def __init__(self, mtu=None, connected_peers=None):
self.mtu = mtu
self.connected_peers = set(connected_peers or [])
self.disconnect_calls = []
self.raise_on_mtu = False
def disconnect(self, address):
self.disconnect_calls.append(address)
def get_peer_mtu(self, address):
if self.raise_on_mtu:
raise RuntimeError("synthetic MTU failure")
return self.mtu
class FakeFragmenter:
instances = []
def __init__(self, mtu=185):
self.mtu = mtu
FakeFragmenter.instances.append(self)
class FakeReassembler:
instances = []
def __init__(self, timeout=None):
self.timeout = timeout
FakeReassembler.instances.append(self)
class FakeLock:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class FakePeerInterface:
def __init__(self, address):
self.peer_address = address
class Harness:
_handle_identity_handshake = BLEInterface._handle_identity_handshake
_check_duplicate_identity = BLEInterface._check_duplicate_identity
_compute_identity_hash = BLEInterface._compute_identity_hash
_get_fragmenter_key = BLEInterface._get_fragmenter_key
def __init__(self, mtu=None, connected_peers=None):
self.driver = FakeDriver(mtu=mtu, connected_peers=connected_peers)
self.address_to_identity = {}
self.identity_to_address = {}
self.address_to_interface = {}
self.spawned_interfaces = {}
self.peers = {}
self.fragmenters = {}
self.reassemblers = {}
self.frag_lock = FakeLock()
self._pending_identity_connections = {}
self._pending_detach = {}
self._last_real_data = {}
self._zombie_timeout = 45.0
self.cleanup_calls = []
self.spawn_calls = []
def _cleanup_stale_address(self, identity_hash, old_address):
self.cleanup_calls.append((identity_hash, old_address))
self.address_to_identity.pop(old_address, None)
self.address_to_interface.pop(old_address, None)
self.peers.pop(old_address, None)
def _spawn_peer_interface(
self,
address,
name,
peer_identity,
client=None,
mtu=None,
connection_type="central",
):
identity_hash = self._compute_identity_hash(peer_identity)
peer = FakePeerInterface(address)
self.spawned_interfaces[identity_hash] = peer
self.address_to_interface[address] = peer
self.spawn_calls.append(
{
"address": address,
"name": name,
"peer_identity": peer_identity,
"mtu": mtu,
"connection_type": connection_type,
}
)
return peer
def __str__(self):
return "HarnessBLEInterface"
@pytest.fixture(autouse=True)
def patch_bleinterface_globals(monkeypatch):
FakeRNS.logs = []
FakeFragmenter.instances = []
FakeReassembler.instances = []
monkeypatch.setattr(ble_interface_module, "RNS", FakeRNS)
monkeypatch.setattr(ble_interface_module, "BLEFragmenter", FakeFragmenter)
monkeypatch.setattr(ble_interface_module, "BLEReassembler", FakeReassembler)
def identity(base):
return bytes((base + i) & 0xFF for i in range(16))
def action_types(result):
return {action.type for action in result.actions}
def cpp_snapshot_from_harness(harness, address, peer_identity=None, now=100.0):
snap = cpp.ConnectionSnapshot()
snap.current = cpp.ConnectionId(address)
snap.local_role = cpp.LocalRole.Peripheral
snap.negotiated_mtu = harness.driver.mtu
known_identity = harness.address_to_identity.get(address)
if known_identity is not None:
snap.set_known_identity(known_identity)
if peer_identity is not None:
identity_hash = harness._compute_identity_hash(peer_identity)
existing_address = harness.identity_to_address.get(identity_hash)
if existing_address is not None:
snap.existing_address_for_identity = existing_address
snap.identity_has_pending_detach = identity_hash in harness._pending_detach
snap.existing_address_connected = (
existing_address in harness.driver.connected_peers
)
snap.existing_address_in_peer_table = existing_address in harness.peers
last_real_data = harness._last_real_data.get(identity_hash, 0.0)
snap.existing_last_real_data = last_real_data
snap.existing_connection_is_zombie = (
last_real_data > 0.0
and now - last_real_data > harness._zombie_timeout
)
return snap
def run_cpp(manager, harness, address, data, now=100.0):
peer_identity = data if len(data) == 16 else None
snap = cpp_snapshot_from_harness(harness, address, peer_identity, now=now)
return manager.handle_identity_handshake(snap, data, now)
def test_non_16_byte_payload_equivalence():
harness = Harness()
manager = cpp.BLEPeerSessionManager()
py_result = harness._handle_identity_handshake("AA:01", b"abc")
cpp_result = run_cpp(manager, harness, "AA:01", b"abc")
assert py_result is False
assert cpp_result.decision == cpp.InputDecision.PassToReassembler
assert cpp_result.consumed is False
def test_new_16_byte_identity_no_duplicate_equivalence():
harness = Harness(mtu=185)
harness._pending_identity_connections["AA:02"] = 1.0
manager = cpp.BLEPeerSessionManager()
peer_identity = identity(0x10)
cpp_result = run_cpp(manager, harness, "AA:02", peer_identity)
py_result = harness._handle_identity_handshake("AA:02", peer_identity)
identity_hash = harness._compute_identity_hash(peer_identity)
frag_key = harness._get_fragmenter_key(peer_identity, "AA:02")
assert py_result is True
assert harness.address_to_identity["AA:02"] == peer_identity
assert harness.identity_to_address[identity_hash] == "AA:02"
assert harness.fragmenters[frag_key].mtu == 185
assert frag_key in harness.reassemblers
assert "AA:02" not in harness._pending_identity_connections
assert identity_hash in harness._last_real_data
assert harness.spawn_calls and harness.spawn_calls[0]["address"] == "AA:02"
assert cpp_result.decision == cpp.InputDecision.AcceptedNewIdentity
assert cpp_result.consumed is True
assert cpp_result.accepted is True
assert cpp.SessionActionType.CreateFragmentationState in action_types(cpp_result)
assert cpp.SessionActionType.MarkPeerReady in action_types(cpp_result)
assert cpp.SessionActionType.RemovePendingIdentity in action_types(cpp_result)
assert cpp.SessionActionType.MarkRealData in action_types(cpp_result)
def test_known_identity_duplicate_same_equivalence():
harness = Harness()
manager = cpp.BLEPeerSessionManager()
peer_identity = identity(0x20)
harness.address_to_identity["AA:03"] = peer_identity
cpp_result = run_cpp(manager, harness, "AA:03", peer_identity)
py_result = harness._handle_identity_handshake("AA:03", peer_identity)
assert py_result is True
assert cpp_result.decision == cpp.InputDecision.ConsumedDuplicateSameIdentity
assert cpp_result.consumed is True
def test_known_identity_duplicate_mismatch_equivalence():
harness = Harness()
manager = cpp.BLEPeerSessionManager()
harness.address_to_identity["AA:04"] = identity(0x30)
cpp_result = run_cpp(manager, harness, "AA:04", identity(0x40))
py_result = harness._handle_identity_handshake("AA:04", identity(0x40))
assert py_result is True
assert cpp_result.decision == cpp.InputDecision.ConsumedDuplicateMismatchedIdentity
assert cpp_result.consumed is True
assert cpp.SessionActionType.Warn in action_types(cpp_result)
assert FakeRNS.logs
def test_duplicate_identity_active_elsewhere_equivalence():
current_address = "AA:06"
old_address = "AA:05"
peer_identity = identity(0x50)
harness = Harness(connected_peers={old_address})
identity_hash = harness._compute_identity_hash(peer_identity)
harness.identity_to_address[identity_hash] = old_address
harness.peers[old_address] = object()
manager = cpp.BLEPeerSessionManager()
cpp_result = run_cpp(manager, harness, current_address, peer_identity)
py_result = harness._handle_identity_handshake(current_address, peer_identity)
assert py_result is True
assert harness.driver.disconnect_calls == [current_address]
assert cpp_result.decision == cpp.InputDecision.RejectedDuplicateIdentity
assert cpp_result.consumed is True
assert cpp.SessionActionType.DisconnectCurrentPeer in action_types(cpp_result)
def test_duplicate_identity_with_pending_detach_stale_old_address_equivalence():
current_address = "AA:08"
old_address = "AA:07"
peer_identity = identity(0x60)
harness = Harness()
identity_hash = harness._compute_identity_hash(peer_identity)
harness.identity_to_address[identity_hash] = old_address
harness.address_to_identity[old_address] = peer_identity
harness.address_to_interface[old_address] = FakePeerInterface(old_address)
harness._pending_detach[identity_hash] = 1.0
manager = cpp.BLEPeerSessionManager()
cpp_result = run_cpp(manager, harness, current_address, peer_identity)
py_result = harness._handle_identity_handshake(current_address, peer_identity)
assert py_result is True
assert harness.cleanup_calls == [(identity_hash, old_address)]
assert harness.address_to_identity[current_address] == peer_identity
assert cpp_result.decision == cpp.InputDecision.AcceptedNewIdentity
assert cpp.SessionActionType.CleanupOldAddress in action_types(cpp_result)
assert cpp.SessionActionType.UpdatePeerAddress in action_types(cpp_result)
def test_duplicate_identity_with_zombie_old_connection_equivalence():
current_address = "AA:0A"
old_address = "AA:09"
peer_identity = identity(0x70)
harness = Harness(connected_peers={old_address})
identity_hash = harness._compute_identity_hash(peer_identity)
harness.identity_to_address[identity_hash] = old_address
harness.peers[old_address] = object()
harness.address_to_identity[old_address] = peer_identity
harness._last_real_data[identity_hash] = 1.0
manager = cpp.BLEPeerSessionManager()
cpp_result = run_cpp(manager, harness, current_address, peer_identity, now=100.0)
py_result = harness._handle_identity_handshake(current_address, peer_identity)
assert py_result is True
assert old_address in harness.driver.disconnect_calls
assert harness.cleanup_calls == [(identity_hash, old_address)]
assert cpp_result.decision == cpp.InputDecision.AcceptedNewIdentity
assert cpp.SessionActionType.DisconnectOldPeer in action_types(cpp_result)
assert cpp.SessionActionType.CleanupOldAddress in action_types(cpp_result)
def test_mtu_provided_equivalence():
harness = Harness(mtu=247)
manager = cpp.BLEPeerSessionManager()
peer_identity = identity(0x80)
cpp_result = run_cpp(manager, harness, "AA:0B", peer_identity)
py_result = harness._handle_identity_handshake("AA:0B", peer_identity)
frag_key = harness._get_fragmenter_key(peer_identity, "AA:0B")
assert py_result is True
assert harness.fragmenters[frag_key].mtu == 247
assert cpp_result.mtu == 247
def test_mtu_missing_fallback_equivalence():
harness = Harness(mtu=None)
manager = cpp.BLEPeerSessionManager()
peer_identity = identity(0x90)
cpp_result = run_cpp(manager, harness, "AA:0C", peer_identity)
py_result = harness._handle_identity_handshake("AA:0C", peer_identity)
frag_key = harness._get_fragmenter_key(peer_identity, "AA:0C")
assert py_result is True
assert harness.fragmenters[frag_key].mtu == 23
assert cpp_result.mtu == 23
def test_pending_identity_removal_equivalence():
harness = Harness()
harness._pending_identity_connections["AA:0D"] = 1.0
manager = cpp.BLEPeerSessionManager()
peer_identity = identity(0xA0)
cpp_result = run_cpp(manager, harness, "AA:0D", peer_identity)
py_result = harness._handle_identity_handshake("AA:0D", peer_identity)
assert py_result is True
assert "AA:0D" not in harness._pending_identity_connections
assert cpp.SessionActionType.RemovePendingIdentity in action_types(cpp_result)
def test_existing_spawned_interface_path_equivalence():
old_address = "AA:0E"
new_address = "AA:0F"
peer_identity = identity(0xB0)
harness = Harness()
identity_hash = harness._compute_identity_hash(peer_identity)
existing = FakePeerInterface(old_address)
harness.spawned_interfaces[identity_hash] = existing
harness.identity_to_address[identity_hash] = old_address
harness._pending_detach[identity_hash] = 1.0
manager = cpp.BLEPeerSessionManager()
cpp_result = run_cpp(manager, harness, new_address, peer_identity)
py_result = harness._handle_identity_handshake(new_address, peer_identity)
assert py_result is True
assert existing.peer_address == new_address
assert harness.address_to_interface[new_address] is existing
assert not harness.spawn_calls
assert cpp_result.decision == cpp.InputDecision.AcceptedNewIdentity
assert cpp.SessionActionType.UpdatePeerAddress in action_types(cpp_result)
assert cpp.SessionActionType.MarkPeerReady in action_types(cpp_result)
def test_exception_compatibility_python_consumes_packet():
harness = Harness(mtu=185)
harness.driver.raise_on_mtu = True
peer_identity = identity(0xC0)
py_result = harness._handle_identity_handshake("AA:10", peer_identity)
assert py_result is True
pytest.skip(
"C++ manager has no platform call surface for driver.get_peer_mtu exceptions; "
"Gate 2D documents Python consume-on-exception compatibility only."
)