Fixes namespace collision with Reticulum's own RNS.Interfaces package. When both packages were installed, the collision caused import issues and prevented BLE discovery between devices. Changes: - Rename src/RNS/Interfaces/ to src/ble_reticulum/ - Update pyproject.toml package configuration - Update all imports in source and test files 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
558 lines
22 KiB
Python
558 lines
22 KiB
Python
"""
|
|
Tests for Peripheral Disconnection Cleanup (TDD for GitHub Issue)
|
|
|
|
When Android devices (acting as central) disconnect from Pi GATT servers (acting
|
|
as peripheral), the peer entries must be cleaned up from memory to prevent
|
|
reaching the 7-peer limit and blocking new connections.
|
|
|
|
Issue: Peripheral disconnection cleanup never happens because:
|
|
1. BLEGATTServer._handle_central_disconnected() exists but is never called
|
|
2. No D-Bus signal monitoring for device disconnections
|
|
3. on_central_disconnected callback never wired up in linux_bluetooth_driver
|
|
|
|
This test file follows TDD approach:
|
|
1. Write tests that reproduce the bug (SHOULD FAIL initially)
|
|
2. Implement the fix in linux_bluetooth_driver.py
|
|
3. Verify tests pass after implementation
|
|
|
|
Reference: BLE_PROTOCOL_v2.2.md § Dual-Mode Operation (Peripheral mode)
|
|
"""
|
|
|
|
import pytest
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
import time
|
|
import threading
|
|
from unittest.mock import Mock, MagicMock, AsyncMock, patch, call
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
|
|
|
|
# Mock RNS module before importing
|
|
import RNS
|
|
if not hasattr(RNS, 'LOG_INFO'):
|
|
RNS.LOG_CRITICAL = 0
|
|
RNS.LOG_ERROR = 1
|
|
RNS.LOG_WARNING = 2
|
|
RNS.LOG_NOTICE = 3
|
|
RNS.LOG_INFO = 4
|
|
RNS.LOG_VERBOSE = 5
|
|
RNS.LOG_DEBUG = 6
|
|
RNS.LOG_EXTREME = 7
|
|
|
|
RNS.log = Mock()
|
|
|
|
|
|
# Module-level fixture (shared across test classes)
|
|
@pytest.fixture
|
|
def mock_driver():
|
|
"""Create a mock Linux BLE driver with GATT server capabilities."""
|
|
driver = Mock()
|
|
driver.loop = asyncio.new_event_loop()
|
|
driver._peers = {} # address -> peer_conn
|
|
driver._peers_lock = threading.RLock() # Use threading lock for mock (asyncio.Lock requires event loop in Py3.8/3.9)
|
|
driver._log = Mock()
|
|
driver.on_device_disconnected = Mock()
|
|
|
|
# Mock method that should be added
|
|
driver._handle_peripheral_disconnected = Mock()
|
|
|
|
return driver
|
|
|
|
|
|
class TestPeripheralDisconnectCleanup:
|
|
"""Test peripheral disconnection cleanup mechanisms."""
|
|
|
|
@pytest.fixture
|
|
def mock_gatt_server(self, mock_driver):
|
|
"""Create a mock GATT server with connected centrals."""
|
|
gatt_server = Mock()
|
|
gatt_server.driver = mock_driver
|
|
gatt_server.connected_centrals = {}
|
|
gatt_server.centrals_lock = threading.RLock() # Use threading lock for mock
|
|
gatt_server.running = True
|
|
gatt_server._log = Mock()
|
|
|
|
# Mock callback that should be wired up
|
|
gatt_server.on_central_disconnected = None
|
|
|
|
# Mock the disconnect handler
|
|
def handle_disconnect(central_address):
|
|
"""Simulate _handle_central_disconnected logic."""
|
|
if central_address not in gatt_server.connected_centrals:
|
|
return
|
|
|
|
del gatt_server.connected_centrals[central_address]
|
|
|
|
# This callback should be wired to driver._handle_peripheral_disconnected
|
|
if gatt_server.on_central_disconnected:
|
|
gatt_server.on_central_disconnected(central_address)
|
|
|
|
gatt_server._handle_central_disconnected = handle_disconnect
|
|
|
|
return gatt_server
|
|
|
|
def test_callback_is_wired_up(self, mock_driver, mock_gatt_server):
|
|
"""
|
|
TEST 1: Verify on_central_disconnected callback is wired to driver.
|
|
|
|
This test verifies that during GATT server initialization, the
|
|
on_central_disconnected callback is set to point to the driver's
|
|
peripheral disconnection handler.
|
|
|
|
EXPECTED TO FAIL: Currently the callback is never wired up.
|
|
"""
|
|
# Simulate what should happen in BluezeroGATTServer.__init__()
|
|
# This line should be added in the actual implementation:
|
|
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
|
|
|
|
# Verify callback is wired
|
|
assert mock_gatt_server.on_central_disconnected is not None, \
|
|
"on_central_disconnected callback should be wired to driver method"
|
|
assert mock_gatt_server.on_central_disconnected == mock_driver._handle_peripheral_disconnected, \
|
|
"Callback should point to driver._handle_peripheral_disconnected"
|
|
|
|
def test_peripheral_disconnect_removes_from_peers_dict(self, mock_driver, mock_gatt_server):
|
|
"""
|
|
TEST 2: Verify that when central disconnects, peer is removed from driver._peers.
|
|
|
|
Simulates the complete cleanup flow:
|
|
1. Central connects (added to connected_centrals and _peers)
|
|
2. Central disconnects (D-Bus signal received)
|
|
3. Cleanup removes from both dictionaries
|
|
|
|
EXPECTED TO FAIL: Currently _peers entries are never cleaned up.
|
|
"""
|
|
central_address = "4A:87:8C:C7:E3:F3" # Real Android MAC from logs
|
|
|
|
# Setup: Simulate central connection
|
|
mock_gatt_server.connected_centrals[central_address] = {
|
|
"address": central_address,
|
|
"connected_at": time.time(),
|
|
"mtu": 517,
|
|
"bytes_received": 1024,
|
|
"bytes_sent": 512
|
|
}
|
|
|
|
mock_driver._peers[central_address] = Mock() # Simulate peer connection
|
|
|
|
# Wire up the callback (this should be done in actual code)
|
|
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
|
|
|
|
# Action: Simulate disconnect
|
|
mock_gatt_server._handle_central_disconnected(central_address)
|
|
|
|
# Assert: Verify cleanup in GATT server
|
|
assert central_address not in mock_gatt_server.connected_centrals, \
|
|
"Central should be removed from connected_centrals after disconnect"
|
|
|
|
# Assert: Verify driver cleanup callback was called
|
|
mock_driver._handle_peripheral_disconnected.assert_called_once_with(central_address)
|
|
|
|
# Note: In real implementation, _handle_peripheral_disconnected should remove from _peers
|
|
# For now we just verify the callback was invoked
|
|
|
|
def test_driver_peripheral_disconnect_handler_removes_peer(self, mock_driver):
|
|
"""
|
|
TEST 3: Verify driver._handle_peripheral_disconnected() removes from _peers dict.
|
|
|
|
This tests the driver-side cleanup that should happen when the GATT server
|
|
reports a central disconnection.
|
|
|
|
EXPECTED TO FAIL: Method doesn't exist yet.
|
|
"""
|
|
central_address = "65:70:A5:A7:29:73" # Real Android MAC from logs
|
|
|
|
# Setup: Add peer
|
|
mock_driver._peers[central_address] = Mock()
|
|
|
|
# Create the actual implementation that should exist
|
|
def handle_peripheral_disconnected(address):
|
|
"""Remove peer from _peers dict and notify callbacks."""
|
|
if address in mock_driver._peers:
|
|
del mock_driver._peers[address]
|
|
|
|
if mock_driver.on_device_disconnected:
|
|
mock_driver.on_device_disconnected(address)
|
|
|
|
# Temporarily assign the implementation
|
|
mock_driver._handle_peripheral_disconnected = handle_peripheral_disconnected
|
|
|
|
# Action: Call handler
|
|
mock_driver._handle_peripheral_disconnected(central_address)
|
|
|
|
# Assert: Peer removed from _peers
|
|
assert central_address not in mock_driver._peers, \
|
|
"Peer should be removed from _peers dict"
|
|
|
|
# Assert: Callback was invoked
|
|
mock_driver.on_device_disconnected.assert_called_once_with(central_address)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dbus_disconnect_signal_triggers_cleanup(self, mock_driver, mock_gatt_server):
|
|
"""
|
|
TEST 4: Verify D-Bus disconnect signal triggers cleanup flow.
|
|
|
|
Simulates BlueZ D-Bus PropertiesChanged signal when device disconnects:
|
|
- Signal: org.freedesktop.DBus.Properties.PropertiesChanged
|
|
- Interface: org.bluez.Device1
|
|
- Property: Connected = False
|
|
|
|
EXPECTED TO FAIL: D-Bus monitoring not implemented yet.
|
|
"""
|
|
central_address = "4A:87:8C:C7:E3:F3"
|
|
|
|
# Setup: Simulate connection
|
|
mock_gatt_server.connected_centrals[central_address] = {
|
|
"address": central_address,
|
|
"connected_at": time.time(),
|
|
"mtu": 517
|
|
}
|
|
|
|
mock_driver._peers[central_address] = Mock()
|
|
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
|
|
|
|
# Simulate D-Bus signal callback that should be implemented
|
|
def dbus_properties_changed_callback(interface_name, changed_props, invalidated, path):
|
|
"""Mock D-Bus callback that should be registered."""
|
|
if interface_name == "org.bluez.Device1" and "Connected" in changed_props:
|
|
if not changed_props["Connected"]: # Device disconnected
|
|
# Extract MAC from path: /org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF
|
|
if "/dev_" in path:
|
|
mac_address = path.split("/dev_")[-1].replace("_", ":")
|
|
mock_gatt_server._handle_central_disconnected(mac_address)
|
|
|
|
# Simulate D-Bus signal
|
|
dbus_path = f"/org/bluez/hci0/dev_{central_address.replace(':', '_')}"
|
|
changed_properties = {"Connected": False}
|
|
|
|
dbus_properties_changed_callback(
|
|
"org.bluez.Device1",
|
|
changed_properties,
|
|
[],
|
|
dbus_path
|
|
)
|
|
|
|
# Assert: Cleanup happened
|
|
assert central_address not in mock_gatt_server.connected_centrals
|
|
mock_driver._handle_peripheral_disconnected.assert_called_once_with(central_address)
|
|
|
|
def test_multiple_disconnects_are_idempotent(self, mock_driver, mock_gatt_server):
|
|
"""
|
|
TEST 5: Verify multiple disconnect signals don't cause errors.
|
|
|
|
Edge case: D-Bus may send multiple PropertiesChanged signals or
|
|
cleanup may be called from multiple code paths.
|
|
|
|
EXPECTED BEHAVIOR: Second call should be safely ignored.
|
|
"""
|
|
central_address = "4A:87:8C:C7:E3:F3"
|
|
|
|
# Setup
|
|
mock_gatt_server.connected_centrals[central_address] = {"address": central_address}
|
|
mock_driver._peers[central_address] = Mock()
|
|
|
|
# Wire callback
|
|
def handle_peripheral_disconnected(address):
|
|
if address in mock_driver._peers:
|
|
del mock_driver._peers[address]
|
|
|
|
mock_driver._handle_peripheral_disconnected = handle_peripheral_disconnected
|
|
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
|
|
|
|
# Action: First disconnect
|
|
mock_gatt_server._handle_central_disconnected(central_address)
|
|
assert central_address not in mock_gatt_server.connected_centrals
|
|
|
|
# Action: Second disconnect (should not raise)
|
|
try:
|
|
mock_gatt_server._handle_central_disconnected(central_address)
|
|
second_disconnect_succeeded = True
|
|
except Exception as e:
|
|
second_disconnect_succeeded = False
|
|
pytest.fail(f"Second disconnect raised exception: {e}")
|
|
|
|
assert second_disconnect_succeeded, "Multiple disconnects should be idempotent"
|
|
|
|
def test_disconnect_during_shutdown_is_ignored(self, mock_driver, mock_gatt_server):
|
|
"""
|
|
TEST 6: Verify disconnects during shutdown don't cause errors.
|
|
|
|
Edge case: GATT server is stopping while centrals are still connected.
|
|
Disconnect signals may arrive after cleanup has started.
|
|
|
|
EXPECTED BEHAVIOR: Gracefully handle when server is not running.
|
|
"""
|
|
central_address = "65:70:A5:A7:29:73"
|
|
|
|
# Setup
|
|
mock_gatt_server.connected_centrals[central_address] = {"address": central_address}
|
|
mock_gatt_server.running = False # Server is shutting down
|
|
|
|
# Action: Disconnect during shutdown
|
|
try:
|
|
mock_gatt_server._handle_central_disconnected(central_address)
|
|
disconnect_during_shutdown_ok = True
|
|
except Exception as e:
|
|
disconnect_during_shutdown_ok = False
|
|
pytest.fail(f"Disconnect during shutdown raised: {e}")
|
|
|
|
assert disconnect_during_shutdown_ok, \
|
|
"Disconnect during shutdown should be handled gracefully"
|
|
|
|
def test_peer_limit_unblocked_after_disconnect(self, mock_driver):
|
|
"""
|
|
TEST 7: Verify that after disconnect, new connections can succeed.
|
|
|
|
Regression test for the actual bug: When _peers dict reaches max (7),
|
|
new connections are blocked. After cleanup, new connections should work.
|
|
|
|
This simulates the real-world scenario from the logs where device
|
|
4A:87:8C:C7:E3:F3 was blocked by "max peers (7) reached".
|
|
"""
|
|
max_peers = 7
|
|
|
|
# Setup: Fill up to max peers
|
|
for i in range(max_peers):
|
|
address = f"AA:BB:CC:DD:EE:F{i}"
|
|
mock_driver._peers[address] = Mock()
|
|
|
|
# Verify we're at limit
|
|
assert len(mock_driver._peers) == max_peers
|
|
|
|
# Simulate one peer disconnecting
|
|
disconnected_address = "AA:BB:CC:DD:EE:F0"
|
|
|
|
def handle_peripheral_disconnected(address):
|
|
if address in mock_driver._peers:
|
|
del mock_driver._peers[address]
|
|
|
|
mock_driver._handle_peripheral_disconnected = handle_peripheral_disconnected
|
|
mock_driver._handle_peripheral_disconnected(disconnected_address)
|
|
|
|
# Assert: Peer count decreased
|
|
assert len(mock_driver._peers) == max_peers - 1, \
|
|
"Peer count should decrease after disconnect"
|
|
|
|
# Assert: New connection can now be added
|
|
new_address = "4A:87:8C:C7:E3:F3" # The blocked Android device
|
|
mock_driver._peers[new_address] = Mock()
|
|
assert len(mock_driver._peers) == max_peers, \
|
|
"Should be able to add new peer after cleanup"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reconnection_race_condition(self, mock_driver, mock_gatt_server):
|
|
"""
|
|
TEST 8: Verify reconnection race doesn't delete new connection.
|
|
|
|
Edge case: Central disconnects and immediately reconnects.
|
|
Cleanup from first connection arrives after second connection established.
|
|
|
|
EXPECTED BEHAVIOR: Should not delete the new connection state.
|
|
Solution: Check timestamp or verify connection exists before cleanup.
|
|
"""
|
|
central_address = "4A:87:8C:C7:E3:F3"
|
|
|
|
# Setup: First connection
|
|
first_connect_time = time.time()
|
|
mock_gatt_server.connected_centrals[central_address] = {
|
|
"address": central_address,
|
|
"connected_at": first_connect_time,
|
|
"mtu": 517
|
|
}
|
|
|
|
# Simulate disconnect (but cleanup delayed)
|
|
del mock_gatt_server.connected_centrals[central_address]
|
|
|
|
# Simulate immediate reconnection
|
|
second_connect_time = time.time() + 0.1
|
|
mock_gatt_server.connected_centrals[central_address] = {
|
|
"address": central_address,
|
|
"connected_at": second_connect_time,
|
|
"mtu": 517
|
|
}
|
|
|
|
# Now delayed cleanup from first disconnect arrives
|
|
# Implementation should check if connection is newer
|
|
if central_address in mock_gatt_server.connected_centrals:
|
|
conn_info = mock_gatt_server.connected_centrals[central_address]
|
|
if conn_info["connected_at"] > first_connect_time:
|
|
# Don't clean up - this is a newer connection
|
|
pass
|
|
|
|
# Assert: New connection still exists
|
|
assert central_address in mock_gatt_server.connected_centrals, \
|
|
"Reconnection should not be cleaned up by stale disconnect"
|
|
|
|
|
|
class TestRealWorldScenario:
|
|
"""Integration test simulating the real-world bug from logs."""
|
|
|
|
def test_android_connection_blocked_by_stale_peers(self):
|
|
"""
|
|
Reproduce the exact scenario from 10.0.0.80 logs:
|
|
|
|
1. Device has 7 connected peers (at limit)
|
|
2. Android device 4A:87:8C:C7:E3:F3 discovered with good signal
|
|
3. Connection blocked: "Cannot connect to 4A:87:8C:C7:E3:F3: max peers (7) reached"
|
|
4. Some peers are actually stale (disconnected but not cleaned up)
|
|
|
|
After fix, stale peers should be removed, allowing new connections.
|
|
"""
|
|
# Setup: Simulate driver at peer limit
|
|
driver = Mock()
|
|
driver._peers = {}
|
|
driver.max_peers = 7
|
|
driver._log = Mock()
|
|
|
|
# Add 7 peers (some are stale from old peripheral connections)
|
|
stale_peers = [
|
|
"66:A9:1F:BB:05:96", # Connected 3 hours ago, now stale
|
|
"75:C1:80:F9:26:6E", # Connected 2 hours ago, now stale
|
|
]
|
|
|
|
active_peers = [
|
|
"B8:27:EB:43:04:BC", # pizero2-first (active)
|
|
"B8:27:EB:A8:A7:22", # pizero-first (active)
|
|
"65:70:A5:A7:29:73", # Android (active, working)
|
|
]
|
|
|
|
for addr in stale_peers + active_peers:
|
|
driver._peers[addr] = Mock()
|
|
|
|
# 2 more to reach limit
|
|
driver._peers["AA:BB:CC:DD:EE:F1"] = Mock()
|
|
driver._peers["AA:BB:CC:DD:EE:F2"] = Mock()
|
|
|
|
assert len(driver._peers) == 7
|
|
|
|
# New Android device tries to connect
|
|
new_android = "4A:87:8C:C7:E3:F3"
|
|
|
|
# Check if can connect
|
|
can_connect = len(driver._peers) < driver.max_peers
|
|
assert not can_connect, "Should be blocked by peer limit (BUG REPRODUCED)"
|
|
|
|
# After fix: Cleanup stale peripheral connections
|
|
for stale_addr in stale_peers:
|
|
if stale_addr in driver._peers:
|
|
del driver._peers[stale_addr]
|
|
|
|
# Now new connection should succeed
|
|
can_connect_after_cleanup = len(driver._peers) < driver.max_peers
|
|
assert can_connect_after_cleanup, \
|
|
"After cleanup, new connections should be allowed"
|
|
|
|
# Add new peer
|
|
driver._peers[new_android] = Mock()
|
|
assert new_android in driver._peers, "New Android device should connect successfully"
|
|
|
|
def test_both_monitoring_mechanisms_detect_disconnect_idempotent(self, mock_driver):
|
|
"""
|
|
Integration test: Both D-Bus signals and polling detect same disconnect.
|
|
|
|
Verifies that cleanup is idempotent - if both mechanisms detect the same
|
|
disconnect, cleanup should only happen once without errors.
|
|
"""
|
|
from ble_reticulum.linux_bluetooth_driver import BluezeroGATTServer
|
|
|
|
# Setup GATT server with monitoring
|
|
server = Mock(spec=BluezeroGATTServer)
|
|
server.driver = mock_driver
|
|
server.connected_centrals = {}
|
|
server.centrals_lock = threading.RLock()
|
|
server._log = Mock()
|
|
|
|
# Track cleanup calls
|
|
cleanup_calls = []
|
|
|
|
def track_cleanup(address):
|
|
cleanup_calls.append(address)
|
|
# Simulate actual cleanup
|
|
with server.centrals_lock:
|
|
if address in server.connected_centrals:
|
|
del server.connected_centrals[address]
|
|
|
|
server._handle_central_disconnected = track_cleanup
|
|
|
|
# Add connected central
|
|
central_mac = "AA:BB:CC:DD:EE:FF"
|
|
server.connected_centrals[central_mac] = {"address": central_mac}
|
|
|
|
# Simulate D-Bus signal detecting disconnect
|
|
track_cleanup(central_mac)
|
|
assert len(cleanup_calls) == 1
|
|
assert central_mac not in server.connected_centrals
|
|
|
|
# Simulate polling also detecting disconnect (should be idempotent)
|
|
# Central is already removed from dict, so cleanup should not be called again
|
|
with server.centrals_lock:
|
|
if central_mac in server.connected_centrals:
|
|
track_cleanup(central_mac)
|
|
|
|
# Verify cleanup was only called once
|
|
assert len(cleanup_calls) == 1, "Cleanup should be idempotent"
|
|
|
|
def test_polling_catches_missed_dbus_signal(self, mock_driver):
|
|
"""
|
|
Integration test: Polling detects disconnect that D-Bus signal missed.
|
|
|
|
Simulates scenario where D-Bus signal fails or is delayed, but polling
|
|
fallback detects and triggers cleanup within 30 seconds.
|
|
"""
|
|
from ble_reticulum.linux_bluetooth_driver import BluezeroGATTServer
|
|
|
|
# Setup GATT server
|
|
server = Mock(spec=BluezeroGATTServer)
|
|
server.driver = mock_driver
|
|
server.connected_centrals = {}
|
|
server.centrals_lock = threading.RLock()
|
|
server._log = Mock()
|
|
server._handle_central_disconnected = Mock()
|
|
|
|
# Add connected central
|
|
central_mac = "AA:BB:CC:DD:EE:FF"
|
|
server.connected_centrals[central_mac] = {
|
|
"address": central_mac,
|
|
"connected_at": time.time()
|
|
}
|
|
|
|
# Simulate D-Bus signal FAILED to arrive (no cleanup called)
|
|
# ... time passes ...
|
|
|
|
# Simulate polling cycle detecting the disconnect
|
|
with patch('dbus.SystemBus') as mock_system_bus, \
|
|
patch('dbus.Interface') as mock_interface_class:
|
|
|
|
mock_bus = Mock()
|
|
mock_system_bus.return_value = mock_bus
|
|
|
|
mock_device = Mock()
|
|
mock_bus.get_object = Mock(return_value=mock_device)
|
|
|
|
mock_props_iface = Mock()
|
|
mock_interface_class.return_value = mock_props_iface
|
|
|
|
# Device shows as disconnected in BlueZ
|
|
mock_props_iface.Get = Mock(return_value=False)
|
|
|
|
# Polling checks BlueZ state
|
|
dbus_path = f"/org/bluez/hci0/dev_{central_mac.replace(':', '_')}"
|
|
device_obj = mock_bus.get_object("org.bluez", dbus_path)
|
|
props_iface = mock_interface_class(device_obj, "org.freedesktop.DBus.Properties")
|
|
is_connected = props_iface.Get("org.bluez.Device1", "Connected")
|
|
|
|
# Polling detects stale connection
|
|
if not is_connected:
|
|
with server.centrals_lock:
|
|
if central_mac in server.connected_centrals:
|
|
server._handle_central_disconnected(central_mac)
|
|
|
|
# Verify polling triggered cleanup
|
|
server._handle_central_disconnected.assert_called_once_with(central_mac)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|