Some checks failed
Tests / Detect Changes (push) Has been cancelled
Tests / Installer Test (Raspberry Pi OS - ARM) (push) Has been cancelled
Tests / Installer Test (Raspberry Pi OS - ARM)-1 (push) Has been cancelled
Tests / Unit Tests (push) Has been cancelled
Tests / Unit Tests-1 (push) Has been cancelled
Tests / Unit Tests-2 (push) Has been cancelled
Tests / Unit Tests-3 (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Integration Tests-1 (push) Has been cancelled
Tests / Integration Tests-2 (push) Has been cancelled
Tests / Integration Tests-3 (push) Has been cancelled
Tests / Installer Test (Fresh System) (push) Has been cancelled
Tests / Installer Test (Fresh System)-1 (push) Has been cancelled
Tests / Installer Test (Fresh System)-2 (push) Has been cancelled
Tests / Installer Test (Fresh System)-3 (push) Has been cancelled
Tests / Installer Test (Fresh System)-4 (push) Has been cancelled
360 lines
12 KiB
Python
360 lines
12 KiB
Python
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import types
|
|
|
|
import pytest
|
|
|
|
|
|
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
SRC_DIR = os.path.join(REPO_ROOT, "src")
|
|
CPP_BUILD_DIR = os.path.join(REPO_ROOT, "migration", "protocol_core")
|
|
|
|
sys.path.insert(0, SRC_DIR)
|
|
sys.path.insert(0, CPP_BUILD_DIR)
|
|
|
|
cpp = pytest.importorskip("ble_protocol_core_cpp")
|
|
|
|
|
|
def install_fake_rns():
|
|
rns = types.ModuleType("RNS")
|
|
rns.log = lambda *args, **kwargs: None
|
|
rns.LOG_EXTREME = 0
|
|
rns.LOG_DEBUG = 1
|
|
rns.LOG_INFO = 2
|
|
rns.LOG_NOTICE = 2
|
|
rns.LOG_WARNING = 3
|
|
rns.LOG_ERROR = 4
|
|
rns.LOG_CRITICAL = 5
|
|
rns.Reticulum = types.SimpleNamespace(configdir=None)
|
|
rns.Transport = types.SimpleNamespace(interfaces=[])
|
|
sys.modules["RNS"] = rns
|
|
|
|
transport = types.ModuleType("RNS.Transport")
|
|
transport.interfaces = []
|
|
sys.modules["RNS.Transport"] = transport
|
|
sys.modules["RNS.Interfaces"] = types.ModuleType("RNS.Interfaces")
|
|
interface_mod = types.ModuleType("RNS.Interfaces.Interface")
|
|
|
|
class Interface:
|
|
MODE_FULL = 0
|
|
|
|
interface_mod.Interface = Interface
|
|
sys.modules["RNS.Interfaces.Interface"] = interface_mod
|
|
|
|
|
|
install_fake_rns()
|
|
import ble_reticulum.BLEInterface as ble_interface_module
|
|
from ble_reticulum.BLEInterface import BLEInterface
|
|
|
|
|
|
class FakeRNS:
|
|
LOG_DEBUG = 1
|
|
LOG_INFO = 2
|
|
LOG_NOTICE = 2
|
|
LOG_WARNING = 3
|
|
LOG_ERROR = 4
|
|
logs = []
|
|
|
|
@staticmethod
|
|
def log(message, level):
|
|
FakeRNS.logs.append((level, message))
|
|
|
|
|
|
class FakeDriver:
|
|
def __init__(self, mtu=None, connected_peers=None):
|
|
self.mtu = mtu
|
|
self.connected_peers = set(connected_peers or [])
|
|
self.disconnect_calls = []
|
|
|
|
def disconnect(self, address):
|
|
self.disconnect_calls.append(address)
|
|
|
|
def get_peer_mtu(self, address):
|
|
return self.mtu
|
|
|
|
|
|
class FakeFragmenter:
|
|
instances = []
|
|
|
|
def __init__(self, mtu=185):
|
|
self.mtu = mtu
|
|
FakeFragmenter.instances.append(self)
|
|
|
|
|
|
class FakeReassembler:
|
|
instances = []
|
|
|
|
def __init__(self, timeout=None):
|
|
self.timeout = timeout
|
|
FakeReassembler.instances.append(self)
|
|
|
|
|
|
class FakeLock:
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
|
|
class FakePeerInterface:
|
|
def __init__(self, address):
|
|
self.peer_address = address
|
|
|
|
|
|
class Harness:
|
|
_handle_identity_handshake = BLEInterface._handle_identity_handshake
|
|
_handle_identity_handshake_cpp = BLEInterface._handle_identity_handshake_cpp
|
|
_handle_identity_handshake_python = BLEInterface._handle_identity_handshake_python
|
|
_get_session_backend = BLEInterface._get_session_backend
|
|
_get_session_manager = BLEInterface._get_session_manager
|
|
_build_session_snapshot = BLEInterface._build_session_snapshot
|
|
_check_duplicate_identity = BLEInterface._check_duplicate_identity
|
|
_compute_identity_hash = BLEInterface._compute_identity_hash
|
|
_get_fragmenter_key = BLEInterface._get_fragmenter_key
|
|
|
|
def __init__(self, backend="cpp", mtu=None, connected_peers=None):
|
|
self._session_backend = backend
|
|
self._session_manager = cpp.BLEPeerSessionManager() if backend == "cpp" else None
|
|
self.driver = FakeDriver(mtu=mtu, connected_peers=connected_peers)
|
|
self.address_to_identity = {}
|
|
self.identity_to_address = {}
|
|
self.address_to_interface = {}
|
|
self.spawned_interfaces = {}
|
|
self.peers = {}
|
|
self.fragmenters = {}
|
|
self.reassemblers = {}
|
|
self.frag_lock = FakeLock()
|
|
self._pending_identity_connections = {}
|
|
self._pending_identity_timeout = 30
|
|
self._pending_detach = {}
|
|
self._last_real_data = {}
|
|
self._zombie_timeout = 45.0
|
|
self.cleanup_calls = []
|
|
self.spawn_calls = []
|
|
|
|
def _cleanup_stale_address(self, identity_hash, old_address):
|
|
self.cleanup_calls.append((identity_hash, old_address))
|
|
self.address_to_identity.pop(old_address, None)
|
|
self.address_to_interface.pop(old_address, None)
|
|
self.peers.pop(old_address, None)
|
|
|
|
def _spawn_peer_interface(
|
|
self,
|
|
address,
|
|
name,
|
|
peer_identity,
|
|
client=None,
|
|
mtu=None,
|
|
connection_type="central",
|
|
):
|
|
identity_hash = self._compute_identity_hash(peer_identity)
|
|
peer = FakePeerInterface(address)
|
|
self.spawned_interfaces[identity_hash] = peer
|
|
self.address_to_interface[address] = peer
|
|
self.spawn_calls.append(
|
|
{
|
|
"address": address,
|
|
"peer_identity": peer_identity,
|
|
"mtu": mtu,
|
|
"connection_type": connection_type,
|
|
}
|
|
)
|
|
return peer
|
|
|
|
def __str__(self):
|
|
return "HarnessBLEInterface"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def patch_bleinterface_globals(monkeypatch):
|
|
FakeRNS.logs = []
|
|
FakeFragmenter.instances = []
|
|
FakeReassembler.instances = []
|
|
monkeypatch.setattr(ble_interface_module, "RNS", FakeRNS)
|
|
monkeypatch.setattr(ble_interface_module, "BLEFragmenter", FakeFragmenter)
|
|
monkeypatch.setattr(ble_interface_module, "BLEReassembler", FakeReassembler)
|
|
|
|
|
|
def identity(base):
|
|
return bytes((base + i) & 0xFF for i in range(16))
|
|
|
|
|
|
def run_backend_probe(requested=None, include_cpp=True):
|
|
env = os.environ.copy()
|
|
if requested is None:
|
|
env.pop("BLE_RETICULUM_SESSION_BACKEND", None)
|
|
else:
|
|
env["BLE_RETICULUM_SESSION_BACKEND"] = requested
|
|
pythonpath = [SRC_DIR]
|
|
if include_cpp:
|
|
pythonpath.append(CPP_BUILD_DIR)
|
|
env["PYTHONPATH"] = os.pathsep.join(pythonpath)
|
|
completed = subprocess.run(
|
|
[
|
|
sys.executable,
|
|
"-c",
|
|
"import json, ble_reticulum.BLESessionBackend as b; "
|
|
"print(json.dumps({'backend': b.BACKEND, 'requested': b.REQUESTED_BACKEND}))",
|
|
],
|
|
cwd=REPO_ROOT,
|
|
env=env,
|
|
text=True,
|
|
capture_output=True,
|
|
)
|
|
return completed
|
|
|
|
|
|
def test_default_auto_backend_selects_cpp_when_available():
|
|
completed = run_backend_probe(requested=None, include_cpp=True)
|
|
assert completed.returncode == 0, completed.stderr
|
|
assert json.loads(completed.stdout)["backend"] == "cpp"
|
|
|
|
|
|
def test_auto_backend_falls_back_to_python_when_cpp_unavailable():
|
|
completed = run_backend_probe(requested="auto", include_cpp=False)
|
|
assert completed.returncode == 0, completed.stderr
|
|
assert json.loads(completed.stdout)["backend"] == "python"
|
|
|
|
|
|
def test_python_backend_forces_python_even_when_cpp_available():
|
|
completed = run_backend_probe(requested="python", include_cpp=True)
|
|
assert completed.returncode == 0, completed.stderr
|
|
assert json.loads(completed.stdout)["backend"] == "python"
|
|
|
|
|
|
def test_cpp_backend_fails_clearly_when_unavailable():
|
|
completed = run_backend_probe(requested="cpp", include_cpp=False)
|
|
assert completed.returncode != 0
|
|
assert "C++ BLE session backend is not available" in completed.stderr
|
|
|
|
|
|
def test_python_backend_still_uses_existing_python_behavior():
|
|
harness = Harness(backend="python", mtu=185)
|
|
peer_identity = identity(0x10)
|
|
|
|
result = harness._handle_identity_handshake("AA:01", peer_identity)
|
|
|
|
assert result is True
|
|
assert harness.address_to_identity["AA:01"] == peer_identity
|
|
frag_key = harness._get_fragmenter_key(peer_identity, "AA:01")
|
|
assert harness.fragmenters[frag_key].mtu == 185
|
|
|
|
|
|
def test_cpp_backend_non_16_byte_payload_returns_false():
|
|
harness = Harness(backend="cpp")
|
|
assert harness._handle_identity_handshake("AA:02", b"abc") is False
|
|
|
|
|
|
def test_cpp_backend_new_identity_accepted():
|
|
harness = Harness(backend="cpp", mtu=185)
|
|
harness._pending_identity_connections["AA:03"] = 1.0
|
|
peer_identity = identity(0x20)
|
|
|
|
result = harness._handle_identity_handshake("AA:03", peer_identity)
|
|
|
|
assert result is True
|
|
assert harness.address_to_identity["AA:03"] == peer_identity
|
|
assert "AA:03" not in harness._pending_identity_connections
|
|
assert harness.spawn_calls
|
|
frag_key = harness._get_fragmenter_key(peer_identity, "AA:03")
|
|
assert harness.fragmenters[frag_key].mtu == 185
|
|
|
|
|
|
def test_cpp_backend_duplicate_same_identity_consumed():
|
|
harness = Harness(backend="cpp")
|
|
peer_identity = identity(0x30)
|
|
harness.address_to_identity["AA:04"] = peer_identity
|
|
assert harness._handle_identity_handshake("AA:04", peer_identity) is True
|
|
|
|
|
|
def test_cpp_backend_duplicate_mismatch_consumed_with_warning():
|
|
harness = Harness(backend="cpp")
|
|
harness.address_to_identity["AA:05"] = identity(0x40)
|
|
assert harness._handle_identity_handshake("AA:05", identity(0x50)) is True
|
|
assert FakeRNS.logs
|
|
|
|
|
|
def test_cpp_backend_duplicate_active_elsewhere_disconnects_current():
|
|
old_address = "AA:06"
|
|
current_address = "AA:07"
|
|
peer_identity = identity(0x60)
|
|
harness = Harness(backend="cpp", connected_peers={old_address})
|
|
identity_hash = harness._compute_identity_hash(peer_identity)
|
|
harness.identity_to_address[identity_hash] = old_address
|
|
harness.peers[old_address] = object()
|
|
|
|
assert harness._handle_identity_handshake(current_address, peer_identity) is True
|
|
assert harness.driver.disconnect_calls == [current_address]
|
|
|
|
|
|
def test_cpp_backend_stale_pending_detach_replacement_accepted():
|
|
old_address = "AA:08"
|
|
current_address = "AA:09"
|
|
peer_identity = identity(0x70)
|
|
harness = Harness(backend="cpp")
|
|
identity_hash = harness._compute_identity_hash(peer_identity)
|
|
harness.identity_to_address[identity_hash] = old_address
|
|
harness.address_to_identity[old_address] = peer_identity
|
|
harness._pending_detach[identity_hash] = 1.0
|
|
|
|
assert harness._handle_identity_handshake(current_address, peer_identity) is True
|
|
assert harness.cleanup_calls == [(identity_hash, old_address)]
|
|
assert harness.address_to_identity[current_address] == peer_identity
|
|
|
|
|
|
def test_cpp_backend_zombie_old_connection_disconnects_old():
|
|
old_address = "AA:0A"
|
|
current_address = "AA:0B"
|
|
peer_identity = identity(0x80)
|
|
harness = Harness(backend="cpp", connected_peers={old_address})
|
|
identity_hash = harness._compute_identity_hash(peer_identity)
|
|
harness.identity_to_address[identity_hash] = old_address
|
|
harness.peers[old_address] = object()
|
|
harness.address_to_identity[old_address] = peer_identity
|
|
harness._last_real_data[identity_hash] = 1.0
|
|
|
|
assert harness._handle_identity_handshake(current_address, peer_identity) is True
|
|
assert old_address in harness.driver.disconnect_calls
|
|
assert harness.cleanup_calls == [(identity_hash, old_address)]
|
|
|
|
|
|
def test_cpp_backend_mtu_fallback_to_23():
|
|
harness = Harness(backend="cpp", mtu=None)
|
|
peer_identity = identity(0x90)
|
|
assert harness._handle_identity_handshake("AA:0C", peer_identity) is True
|
|
frag_key = harness._get_fragmenter_key(peer_identity, "AA:0C")
|
|
assert harness.fragmenters[frag_key].mtu == 23
|
|
|
|
|
|
def test_cpp_backend_mtu_provided_is_honored():
|
|
harness = Harness(backend="cpp", mtu=247)
|
|
peer_identity = identity(0xA0)
|
|
assert harness._handle_identity_handshake("AA:0D", peer_identity) is True
|
|
frag_key = harness._get_fragmenter_key(peer_identity, "AA:0D")
|
|
assert harness.fragmenters[frag_key].mtu == 247
|
|
|
|
|
|
def test_cpp_backend_pending_identity_removed():
|
|
harness = Harness(backend="cpp")
|
|
harness._pending_identity_connections["AA:0E"] = 1.0
|
|
assert harness._handle_identity_handshake("AA:0E", identity(0xB0)) is True
|
|
assert "AA:0E" not in harness._pending_identity_connections
|
|
|
|
|
|
def test_cpp_backend_existing_spawned_interface_updates_address():
|
|
old_address = "AA:0F"
|
|
current_address = "AA:10"
|
|
peer_identity = identity(0xC0)
|
|
harness = Harness(backend="cpp")
|
|
identity_hash = harness._compute_identity_hash(peer_identity)
|
|
existing = FakePeerInterface(old_address)
|
|
harness.spawned_interfaces[identity_hash] = existing
|
|
harness.identity_to_address[identity_hash] = old_address
|
|
harness._pending_detach[identity_hash] = 1.0
|
|
|
|
assert harness._handle_identity_handshake(current_address, peer_identity) is True
|
|
assert existing.peer_address == current_address
|
|
assert harness.address_to_interface[current_address] is existing
|