import json import os import subprocess import sys import types import pytest REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) SRC_DIR = os.path.join(REPO_ROOT, "src") CPP_BUILD_DIR = os.path.join(REPO_ROOT, "migration", "protocol_core") sys.path.insert(0, SRC_DIR) sys.path.insert(0, CPP_BUILD_DIR) cpp = pytest.importorskip("ble_protocol_core_cpp") def install_fake_rns(): rns = types.ModuleType("RNS") rns.log = lambda *args, **kwargs: None rns.LOG_EXTREME = 0 rns.LOG_DEBUG = 1 rns.LOG_INFO = 2 rns.LOG_NOTICE = 2 rns.LOG_WARNING = 3 rns.LOG_ERROR = 4 rns.LOG_CRITICAL = 5 rns.Reticulum = types.SimpleNamespace(configdir=None) rns.Transport = types.SimpleNamespace(interfaces=[]) sys.modules["RNS"] = rns transport = types.ModuleType("RNS.Transport") transport.interfaces = [] sys.modules["RNS.Transport"] = transport sys.modules["RNS.Interfaces"] = types.ModuleType("RNS.Interfaces") interface_mod = types.ModuleType("RNS.Interfaces.Interface") class Interface: MODE_FULL = 0 interface_mod.Interface = Interface sys.modules["RNS.Interfaces.Interface"] = interface_mod install_fake_rns() import ble_reticulum.BLEInterface as ble_interface_module from ble_reticulum.BLEInterface import BLEInterface class FakeRNS: LOG_DEBUG = 1 LOG_INFO = 2 LOG_NOTICE = 2 LOG_WARNING = 3 LOG_ERROR = 4 logs = [] @staticmethod def log(message, level): FakeRNS.logs.append((level, message)) class FakeDriver: def __init__(self, mtu=None, connected_peers=None): self.mtu = mtu self.connected_peers = set(connected_peers or []) self.disconnect_calls = [] def disconnect(self, address): self.disconnect_calls.append(address) def get_peer_mtu(self, address): return self.mtu class FakeFragmenter: instances = [] def __init__(self, mtu=185): self.mtu = mtu FakeFragmenter.instances.append(self) class FakeReassembler: instances = [] def __init__(self, timeout=None): self.timeout = timeout FakeReassembler.instances.append(self) class FakeLock: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False class FakePeerInterface: def __init__(self, address): self.peer_address = address class Harness: _handle_identity_handshake = BLEInterface._handle_identity_handshake _handle_identity_handshake_cpp = BLEInterface._handle_identity_handshake_cpp _handle_identity_handshake_python = BLEInterface._handle_identity_handshake_python _get_session_backend = BLEInterface._get_session_backend _get_session_manager = BLEInterface._get_session_manager _build_session_snapshot = BLEInterface._build_session_snapshot _check_duplicate_identity = BLEInterface._check_duplicate_identity _compute_identity_hash = BLEInterface._compute_identity_hash _get_fragmenter_key = BLEInterface._get_fragmenter_key def __init__(self, backend="cpp", mtu=None, connected_peers=None): self._session_backend = backend self._session_manager = cpp.BLEPeerSessionManager() if backend == "cpp" else None self.driver = FakeDriver(mtu=mtu, connected_peers=connected_peers) self.address_to_identity = {} self.identity_to_address = {} self.address_to_interface = {} self.spawned_interfaces = {} self.peers = {} self.fragmenters = {} self.reassemblers = {} self.frag_lock = FakeLock() self._pending_identity_connections = {} self._pending_identity_timeout = 30 self._pending_detach = {} self._last_real_data = {} self._zombie_timeout = 45.0 self.cleanup_calls = [] self.spawn_calls = [] def _cleanup_stale_address(self, identity_hash, old_address): self.cleanup_calls.append((identity_hash, old_address)) self.address_to_identity.pop(old_address, None) self.address_to_interface.pop(old_address, None) self.peers.pop(old_address, None) def _spawn_peer_interface( self, address, name, peer_identity, client=None, mtu=None, connection_type="central", ): identity_hash = self._compute_identity_hash(peer_identity) peer = FakePeerInterface(address) self.spawned_interfaces[identity_hash] = peer self.address_to_interface[address] = peer self.spawn_calls.append( { "address": address, "peer_identity": peer_identity, "mtu": mtu, "connection_type": connection_type, } ) return peer def __str__(self): return "HarnessBLEInterface" @pytest.fixture(autouse=True) def patch_bleinterface_globals(monkeypatch): FakeRNS.logs = [] FakeFragmenter.instances = [] FakeReassembler.instances = [] monkeypatch.setattr(ble_interface_module, "RNS", FakeRNS) monkeypatch.setattr(ble_interface_module, "BLEFragmenter", FakeFragmenter) monkeypatch.setattr(ble_interface_module, "BLEReassembler", FakeReassembler) def identity(base): return bytes((base + i) & 0xFF for i in range(16)) def run_backend_probe(requested=None, include_cpp=True): env = os.environ.copy() if requested is None: env.pop("BLE_RETICULUM_SESSION_BACKEND", None) else: env["BLE_RETICULUM_SESSION_BACKEND"] = requested pythonpath = [SRC_DIR] if include_cpp: pythonpath.append(CPP_BUILD_DIR) env["PYTHONPATH"] = os.pathsep.join(pythonpath) completed = subprocess.run( [ sys.executable, "-c", "import json, ble_reticulum.BLESessionBackend as b; " "print(json.dumps({'backend': b.BACKEND, 'requested': b.REQUESTED_BACKEND}))", ], cwd=REPO_ROOT, env=env, text=True, capture_output=True, ) return completed def test_default_auto_backend_selects_cpp_when_available(): completed = run_backend_probe(requested=None, include_cpp=True) assert completed.returncode == 0, completed.stderr assert json.loads(completed.stdout)["backend"] == "cpp" def test_auto_backend_falls_back_to_python_when_cpp_unavailable(): completed = run_backend_probe(requested="auto", include_cpp=False) assert completed.returncode == 0, completed.stderr assert json.loads(completed.stdout)["backend"] == "python" def test_python_backend_forces_python_even_when_cpp_available(): completed = run_backend_probe(requested="python", include_cpp=True) assert completed.returncode == 0, completed.stderr assert json.loads(completed.stdout)["backend"] == "python" def test_cpp_backend_fails_clearly_when_unavailable(): completed = run_backend_probe(requested="cpp", include_cpp=False) assert completed.returncode != 0 assert "C++ BLE session backend is not available" in completed.stderr def test_python_backend_still_uses_existing_python_behavior(): harness = Harness(backend="python", mtu=185) peer_identity = identity(0x10) result = harness._handle_identity_handshake("AA:01", peer_identity) assert result is True assert harness.address_to_identity["AA:01"] == peer_identity frag_key = harness._get_fragmenter_key(peer_identity, "AA:01") assert harness.fragmenters[frag_key].mtu == 185 def test_cpp_backend_non_16_byte_payload_returns_false(): harness = Harness(backend="cpp") assert harness._handle_identity_handshake("AA:02", b"abc") is False def test_cpp_backend_new_identity_accepted(): harness = Harness(backend="cpp", mtu=185) harness._pending_identity_connections["AA:03"] = 1.0 peer_identity = identity(0x20) result = harness._handle_identity_handshake("AA:03", peer_identity) assert result is True assert harness.address_to_identity["AA:03"] == peer_identity assert "AA:03" not in harness._pending_identity_connections assert harness.spawn_calls frag_key = harness._get_fragmenter_key(peer_identity, "AA:03") assert harness.fragmenters[frag_key].mtu == 185 def test_cpp_backend_duplicate_same_identity_consumed(): harness = Harness(backend="cpp") peer_identity = identity(0x30) harness.address_to_identity["AA:04"] = peer_identity assert harness._handle_identity_handshake("AA:04", peer_identity) is True def test_cpp_backend_duplicate_mismatch_consumed_with_warning(): harness = Harness(backend="cpp") harness.address_to_identity["AA:05"] = identity(0x40) assert harness._handle_identity_handshake("AA:05", identity(0x50)) is True assert FakeRNS.logs def test_cpp_backend_duplicate_active_elsewhere_disconnects_current(): old_address = "AA:06" current_address = "AA:07" peer_identity = identity(0x60) harness = Harness(backend="cpp", connected_peers={old_address}) identity_hash = harness._compute_identity_hash(peer_identity) harness.identity_to_address[identity_hash] = old_address harness.peers[old_address] = object() assert harness._handle_identity_handshake(current_address, peer_identity) is True assert harness.driver.disconnect_calls == [current_address] def test_cpp_backend_stale_pending_detach_replacement_accepted(): old_address = "AA:08" current_address = "AA:09" peer_identity = identity(0x70) harness = Harness(backend="cpp") identity_hash = harness._compute_identity_hash(peer_identity) harness.identity_to_address[identity_hash] = old_address harness.address_to_identity[old_address] = peer_identity harness._pending_detach[identity_hash] = 1.0 assert harness._handle_identity_handshake(current_address, peer_identity) is True assert harness.cleanup_calls == [(identity_hash, old_address)] assert harness.address_to_identity[current_address] == peer_identity def test_cpp_backend_zombie_old_connection_disconnects_old(): old_address = "AA:0A" current_address = "AA:0B" peer_identity = identity(0x80) harness = Harness(backend="cpp", connected_peers={old_address}) identity_hash = harness._compute_identity_hash(peer_identity) harness.identity_to_address[identity_hash] = old_address harness.peers[old_address] = object() harness.address_to_identity[old_address] = peer_identity harness._last_real_data[identity_hash] = 1.0 assert harness._handle_identity_handshake(current_address, peer_identity) is True assert old_address in harness.driver.disconnect_calls assert harness.cleanup_calls == [(identity_hash, old_address)] def test_cpp_backend_mtu_fallback_to_23(): harness = Harness(backend="cpp", mtu=None) peer_identity = identity(0x90) assert harness._handle_identity_handshake("AA:0C", peer_identity) is True frag_key = harness._get_fragmenter_key(peer_identity, "AA:0C") assert harness.fragmenters[frag_key].mtu == 23 def test_cpp_backend_mtu_provided_is_honored(): harness = Harness(backend="cpp", mtu=247) peer_identity = identity(0xA0) assert harness._handle_identity_handshake("AA:0D", peer_identity) is True frag_key = harness._get_fragmenter_key(peer_identity, "AA:0D") assert harness.fragmenters[frag_key].mtu == 247 def test_cpp_backend_pending_identity_removed(): harness = Harness(backend="cpp") harness._pending_identity_connections["AA:0E"] = 1.0 assert harness._handle_identity_handshake("AA:0E", identity(0xB0)) is True assert "AA:0E" not in harness._pending_identity_connections def test_cpp_backend_existing_spawned_interface_updates_address(): old_address = "AA:0F" current_address = "AA:10" peer_identity = identity(0xC0) harness = Harness(backend="cpp") identity_hash = harness._compute_identity_hash(peer_identity) existing = FakePeerInterface(old_address) harness.spawned_interfaces[identity_hash] = existing harness.identity_to_address[identity_hash] = old_address harness._pending_detach[identity_hash] = 1.0 assert harness._handle_identity_handshake(current_address, peer_identity) is True assert existing.peer_address == current_address assert harness.address_to_interface[current_address] is existing