ble-reticulum/tests/test_peripheral_disconnect_cleanup.py
torlando-tech c32d23c1d4 fix(tests): Move mock_driver fixture to module level for shared access
Fixes integration test failures where TestRealWorldScenario tests
couldn't access the mock_driver fixture.

The mock_driver fixture was defined inside TestPeripheralDisconnectCleanup
class, making it unavailable to TestRealWorldScenario class. This caused
pytest fixture lookup errors:

- test_both_monitoring_mechanisms_detect_disconnect_idempotent
- test_polling_catches_missed_dbus_signal

Solution: Move mock_driver to module level (outside class) so all test
classes can access it as a shared fixture.

All integration tests now pass locally.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-15 20:26:04 -05:00

558 lines
22 KiB
Python

"""
Tests for Peripheral Disconnection Cleanup (TDD for GitHub Issue)
When Android devices (acting as central) disconnect from Pi GATT servers (acting
as peripheral), the peer entries must be cleaned up from memory to prevent
reaching the 7-peer limit and blocking new connections.
Issue: Peripheral disconnection cleanup never happens because:
1. BLEGATTServer._handle_central_disconnected() exists but is never called
2. No D-Bus signal monitoring for device disconnections
3. on_central_disconnected callback never wired up in linux_bluetooth_driver
This test file follows TDD approach:
1. Write tests that reproduce the bug (SHOULD FAIL initially)
2. Implement the fix in linux_bluetooth_driver.py
3. Verify tests pass after implementation
Reference: BLE_PROTOCOL_v2.2.md § Dual-Mode Operation (Peripheral mode)
"""
import pytest
import sys
import os
import asyncio
import time
import threading
from unittest.mock import Mock, MagicMock, AsyncMock, patch, call
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
# Mock RNS module before importing
import RNS
if not hasattr(RNS, 'LOG_INFO'):
RNS.LOG_CRITICAL = 0
RNS.LOG_ERROR = 1
RNS.LOG_WARNING = 2
RNS.LOG_NOTICE = 3
RNS.LOG_INFO = 4
RNS.LOG_VERBOSE = 5
RNS.LOG_DEBUG = 6
RNS.LOG_EXTREME = 7
RNS.log = Mock()
# Module-level fixture (shared across test classes)
@pytest.fixture
def mock_driver():
"""Create a mock Linux BLE driver with GATT server capabilities."""
driver = Mock()
driver.loop = asyncio.new_event_loop()
driver._peers = {} # address -> peer_conn
driver._peers_lock = asyncio.Lock()
driver._log = Mock()
driver.on_device_disconnected = Mock()
# Mock method that should be added
driver._handle_peripheral_disconnected = Mock()
return driver
class TestPeripheralDisconnectCleanup:
"""Test peripheral disconnection cleanup mechanisms."""
@pytest.fixture
def mock_gatt_server(self, mock_driver):
"""Create a mock GATT server with connected centrals."""
gatt_server = Mock()
gatt_server.driver = mock_driver
gatt_server.connected_centrals = {}
gatt_server.centrals_lock = asyncio.Lock()
gatt_server.running = True
gatt_server._log = Mock()
# Mock callback that should be wired up
gatt_server.on_central_disconnected = None
# Mock the disconnect handler
def handle_disconnect(central_address):
"""Simulate _handle_central_disconnected logic."""
if central_address not in gatt_server.connected_centrals:
return
del gatt_server.connected_centrals[central_address]
# This callback should be wired to driver._handle_peripheral_disconnected
if gatt_server.on_central_disconnected:
gatt_server.on_central_disconnected(central_address)
gatt_server._handle_central_disconnected = handle_disconnect
return gatt_server
def test_callback_is_wired_up(self, mock_driver, mock_gatt_server):
"""
TEST 1: Verify on_central_disconnected callback is wired to driver.
This test verifies that during GATT server initialization, the
on_central_disconnected callback is set to point to the driver's
peripheral disconnection handler.
EXPECTED TO FAIL: Currently the callback is never wired up.
"""
# Simulate what should happen in BluezeroGATTServer.__init__()
# This line should be added in the actual implementation:
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
# Verify callback is wired
assert mock_gatt_server.on_central_disconnected is not None, \
"on_central_disconnected callback should be wired to driver method"
assert mock_gatt_server.on_central_disconnected == mock_driver._handle_peripheral_disconnected, \
"Callback should point to driver._handle_peripheral_disconnected"
def test_peripheral_disconnect_removes_from_peers_dict(self, mock_driver, mock_gatt_server):
"""
TEST 2: Verify that when central disconnects, peer is removed from driver._peers.
Simulates the complete cleanup flow:
1. Central connects (added to connected_centrals and _peers)
2. Central disconnects (D-Bus signal received)
3. Cleanup removes from both dictionaries
EXPECTED TO FAIL: Currently _peers entries are never cleaned up.
"""
central_address = "4A:87:8C:C7:E3:F3" # Real Android MAC from logs
# Setup: Simulate central connection
mock_gatt_server.connected_centrals[central_address] = {
"address": central_address,
"connected_at": time.time(),
"mtu": 517,
"bytes_received": 1024,
"bytes_sent": 512
}
mock_driver._peers[central_address] = Mock() # Simulate peer connection
# Wire up the callback (this should be done in actual code)
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
# Action: Simulate disconnect
mock_gatt_server._handle_central_disconnected(central_address)
# Assert: Verify cleanup in GATT server
assert central_address not in mock_gatt_server.connected_centrals, \
"Central should be removed from connected_centrals after disconnect"
# Assert: Verify driver cleanup callback was called
mock_driver._handle_peripheral_disconnected.assert_called_once_with(central_address)
# Note: In real implementation, _handle_peripheral_disconnected should remove from _peers
# For now we just verify the callback was invoked
def test_driver_peripheral_disconnect_handler_removes_peer(self, mock_driver):
"""
TEST 3: Verify driver._handle_peripheral_disconnected() removes from _peers dict.
This tests the driver-side cleanup that should happen when the GATT server
reports a central disconnection.
EXPECTED TO FAIL: Method doesn't exist yet.
"""
central_address = "65:70:A5:A7:29:73" # Real Android MAC from logs
# Setup: Add peer
mock_driver._peers[central_address] = Mock()
# Create the actual implementation that should exist
def handle_peripheral_disconnected(address):
"""Remove peer from _peers dict and notify callbacks."""
if address in mock_driver._peers:
del mock_driver._peers[address]
if mock_driver.on_device_disconnected:
mock_driver.on_device_disconnected(address)
# Temporarily assign the implementation
mock_driver._handle_peripheral_disconnected = handle_peripheral_disconnected
# Action: Call handler
mock_driver._handle_peripheral_disconnected(central_address)
# Assert: Peer removed from _peers
assert central_address not in mock_driver._peers, \
"Peer should be removed from _peers dict"
# Assert: Callback was invoked
mock_driver.on_device_disconnected.assert_called_once_with(central_address)
@pytest.mark.asyncio
async def test_dbus_disconnect_signal_triggers_cleanup(self, mock_driver, mock_gatt_server):
"""
TEST 4: Verify D-Bus disconnect signal triggers cleanup flow.
Simulates BlueZ D-Bus PropertiesChanged signal when device disconnects:
- Signal: org.freedesktop.DBus.Properties.PropertiesChanged
- Interface: org.bluez.Device1
- Property: Connected = False
EXPECTED TO FAIL: D-Bus monitoring not implemented yet.
"""
central_address = "4A:87:8C:C7:E3:F3"
# Setup: Simulate connection
mock_gatt_server.connected_centrals[central_address] = {
"address": central_address,
"connected_at": time.time(),
"mtu": 517
}
mock_driver._peers[central_address] = Mock()
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
# Simulate D-Bus signal callback that should be implemented
def dbus_properties_changed_callback(interface_name, changed_props, invalidated, path):
"""Mock D-Bus callback that should be registered."""
if interface_name == "org.bluez.Device1" and "Connected" in changed_props:
if not changed_props["Connected"]: # Device disconnected
# Extract MAC from path: /org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF
if "/dev_" in path:
mac_address = path.split("/dev_")[-1].replace("_", ":")
mock_gatt_server._handle_central_disconnected(mac_address)
# Simulate D-Bus signal
dbus_path = f"/org/bluez/hci0/dev_{central_address.replace(':', '_')}"
changed_properties = {"Connected": False}
dbus_properties_changed_callback(
"org.bluez.Device1",
changed_properties,
[],
dbus_path
)
# Assert: Cleanup happened
assert central_address not in mock_gatt_server.connected_centrals
mock_driver._handle_peripheral_disconnected.assert_called_once_with(central_address)
def test_multiple_disconnects_are_idempotent(self, mock_driver, mock_gatt_server):
"""
TEST 5: Verify multiple disconnect signals don't cause errors.
Edge case: D-Bus may send multiple PropertiesChanged signals or
cleanup may be called from multiple code paths.
EXPECTED BEHAVIOR: Second call should be safely ignored.
"""
central_address = "4A:87:8C:C7:E3:F3"
# Setup
mock_gatt_server.connected_centrals[central_address] = {"address": central_address}
mock_driver._peers[central_address] = Mock()
# Wire callback
def handle_peripheral_disconnected(address):
if address in mock_driver._peers:
del mock_driver._peers[address]
mock_driver._handle_peripheral_disconnected = handle_peripheral_disconnected
mock_gatt_server.on_central_disconnected = mock_driver._handle_peripheral_disconnected
# Action: First disconnect
mock_gatt_server._handle_central_disconnected(central_address)
assert central_address not in mock_gatt_server.connected_centrals
# Action: Second disconnect (should not raise)
try:
mock_gatt_server._handle_central_disconnected(central_address)
second_disconnect_succeeded = True
except Exception as e:
second_disconnect_succeeded = False
pytest.fail(f"Second disconnect raised exception: {e}")
assert second_disconnect_succeeded, "Multiple disconnects should be idempotent"
def test_disconnect_during_shutdown_is_ignored(self, mock_driver, mock_gatt_server):
"""
TEST 6: Verify disconnects during shutdown don't cause errors.
Edge case: GATT server is stopping while centrals are still connected.
Disconnect signals may arrive after cleanup has started.
EXPECTED BEHAVIOR: Gracefully handle when server is not running.
"""
central_address = "65:70:A5:A7:29:73"
# Setup
mock_gatt_server.connected_centrals[central_address] = {"address": central_address}
mock_gatt_server.running = False # Server is shutting down
# Action: Disconnect during shutdown
try:
mock_gatt_server._handle_central_disconnected(central_address)
disconnect_during_shutdown_ok = True
except Exception as e:
disconnect_during_shutdown_ok = False
pytest.fail(f"Disconnect during shutdown raised: {e}")
assert disconnect_during_shutdown_ok, \
"Disconnect during shutdown should be handled gracefully"
def test_peer_limit_unblocked_after_disconnect(self, mock_driver):
"""
TEST 7: Verify that after disconnect, new connections can succeed.
Regression test for the actual bug: When _peers dict reaches max (7),
new connections are blocked. After cleanup, new connections should work.
This simulates the real-world scenario from the logs where device
4A:87:8C:C7:E3:F3 was blocked by "max peers (7) reached".
"""
max_peers = 7
# Setup: Fill up to max peers
for i in range(max_peers):
address = f"AA:BB:CC:DD:EE:F{i}"
mock_driver._peers[address] = Mock()
# Verify we're at limit
assert len(mock_driver._peers) == max_peers
# Simulate one peer disconnecting
disconnected_address = "AA:BB:CC:DD:EE:F0"
def handle_peripheral_disconnected(address):
if address in mock_driver._peers:
del mock_driver._peers[address]
mock_driver._handle_peripheral_disconnected = handle_peripheral_disconnected
mock_driver._handle_peripheral_disconnected(disconnected_address)
# Assert: Peer count decreased
assert len(mock_driver._peers) == max_peers - 1, \
"Peer count should decrease after disconnect"
# Assert: New connection can now be added
new_address = "4A:87:8C:C7:E3:F3" # The blocked Android device
mock_driver._peers[new_address] = Mock()
assert len(mock_driver._peers) == max_peers, \
"Should be able to add new peer after cleanup"
@pytest.mark.asyncio
async def test_reconnection_race_condition(self, mock_driver, mock_gatt_server):
"""
TEST 8: Verify reconnection race doesn't delete new connection.
Edge case: Central disconnects and immediately reconnects.
Cleanup from first connection arrives after second connection established.
EXPECTED BEHAVIOR: Should not delete the new connection state.
Solution: Check timestamp or verify connection exists before cleanup.
"""
central_address = "4A:87:8C:C7:E3:F3"
# Setup: First connection
first_connect_time = time.time()
mock_gatt_server.connected_centrals[central_address] = {
"address": central_address,
"connected_at": first_connect_time,
"mtu": 517
}
# Simulate disconnect (but cleanup delayed)
del mock_gatt_server.connected_centrals[central_address]
# Simulate immediate reconnection
second_connect_time = time.time() + 0.1
mock_gatt_server.connected_centrals[central_address] = {
"address": central_address,
"connected_at": second_connect_time,
"mtu": 517
}
# Now delayed cleanup from first disconnect arrives
# Implementation should check if connection is newer
if central_address in mock_gatt_server.connected_centrals:
conn_info = mock_gatt_server.connected_centrals[central_address]
if conn_info["connected_at"] > first_connect_time:
# Don't clean up - this is a newer connection
pass
# Assert: New connection still exists
assert central_address in mock_gatt_server.connected_centrals, \
"Reconnection should not be cleaned up by stale disconnect"
class TestRealWorldScenario:
"""Integration test simulating the real-world bug from logs."""
def test_android_connection_blocked_by_stale_peers(self):
"""
Reproduce the exact scenario from 10.0.0.80 logs:
1. Device has 7 connected peers (at limit)
2. Android device 4A:87:8C:C7:E3:F3 discovered with good signal
3. Connection blocked: "Cannot connect to 4A:87:8C:C7:E3:F3: max peers (7) reached"
4. Some peers are actually stale (disconnected but not cleaned up)
After fix, stale peers should be removed, allowing new connections.
"""
# Setup: Simulate driver at peer limit
driver = Mock()
driver._peers = {}
driver.max_peers = 7
driver._log = Mock()
# Add 7 peers (some are stale from old peripheral connections)
stale_peers = [
"66:A9:1F:BB:05:96", # Connected 3 hours ago, now stale
"75:C1:80:F9:26:6E", # Connected 2 hours ago, now stale
]
active_peers = [
"B8:27:EB:43:04:BC", # pizero2-first (active)
"B8:27:EB:A8:A7:22", # pizero-first (active)
"65:70:A5:A7:29:73", # Android (active, working)
]
for addr in stale_peers + active_peers:
driver._peers[addr] = Mock()
# 2 more to reach limit
driver._peers["AA:BB:CC:DD:EE:F1"] = Mock()
driver._peers["AA:BB:CC:DD:EE:F2"] = Mock()
assert len(driver._peers) == 7
# New Android device tries to connect
new_android = "4A:87:8C:C7:E3:F3"
# Check if can connect
can_connect = len(driver._peers) < driver.max_peers
assert not can_connect, "Should be blocked by peer limit (BUG REPRODUCED)"
# After fix: Cleanup stale peripheral connections
for stale_addr in stale_peers:
if stale_addr in driver._peers:
del driver._peers[stale_addr]
# Now new connection should succeed
can_connect_after_cleanup = len(driver._peers) < driver.max_peers
assert can_connect_after_cleanup, \
"After cleanup, new connections should be allowed"
# Add new peer
driver._peers[new_android] = Mock()
assert new_android in driver._peers, "New Android device should connect successfully"
def test_both_monitoring_mechanisms_detect_disconnect_idempotent(self, mock_driver):
"""
Integration test: Both D-Bus signals and polling detect same disconnect.
Verifies that cleanup is idempotent - if both mechanisms detect the same
disconnect, cleanup should only happen once without errors.
"""
from RNS.Interfaces.linux_bluetooth_driver import BluezeroGATTServer
# Setup GATT server with monitoring
server = Mock(spec=BluezeroGATTServer)
server.driver = mock_driver
server.connected_centrals = {}
server.centrals_lock = threading.RLock()
server._log = Mock()
# Track cleanup calls
cleanup_calls = []
def track_cleanup(address):
cleanup_calls.append(address)
# Simulate actual cleanup
with server.centrals_lock:
if address in server.connected_centrals:
del server.connected_centrals[address]
server._handle_central_disconnected = track_cleanup
# Add connected central
central_mac = "AA:BB:CC:DD:EE:FF"
server.connected_centrals[central_mac] = {"address": central_mac}
# Simulate D-Bus signal detecting disconnect
track_cleanup(central_mac)
assert len(cleanup_calls) == 1
assert central_mac not in server.connected_centrals
# Simulate polling also detecting disconnect (should be idempotent)
# Central is already removed from dict, so cleanup should not be called again
with server.centrals_lock:
if central_mac in server.connected_centrals:
track_cleanup(central_mac)
# Verify cleanup was only called once
assert len(cleanup_calls) == 1, "Cleanup should be idempotent"
def test_polling_catches_missed_dbus_signal(self, mock_driver):
"""
Integration test: Polling detects disconnect that D-Bus signal missed.
Simulates scenario where D-Bus signal fails or is delayed, but polling
fallback detects and triggers cleanup within 30 seconds.
"""
from RNS.Interfaces.linux_bluetooth_driver import BluezeroGATTServer
# Setup GATT server
server = Mock(spec=BluezeroGATTServer)
server.driver = mock_driver
server.connected_centrals = {}
server.centrals_lock = threading.RLock()
server._log = Mock()
server._handle_central_disconnected = Mock()
# Add connected central
central_mac = "AA:BB:CC:DD:EE:FF"
server.connected_centrals[central_mac] = {
"address": central_mac,
"connected_at": time.time()
}
# Simulate D-Bus signal FAILED to arrive (no cleanup called)
# ... time passes ...
# Simulate polling cycle detecting the disconnect
with patch('dbus.SystemBus') as mock_system_bus, \
patch('dbus.Interface') as mock_interface_class:
mock_bus = Mock()
mock_system_bus.return_value = mock_bus
mock_device = Mock()
mock_bus.get_object = Mock(return_value=mock_device)
mock_props_iface = Mock()
mock_interface_class.return_value = mock_props_iface
# Device shows as disconnected in BlueZ
mock_props_iface.Get = Mock(return_value=False)
# Polling checks BlueZ state
dbus_path = f"/org/bluez/hci0/dev_{central_mac.replace(':', '_')}"
device_obj = mock_bus.get_object("org.bluez", dbus_path)
props_iface = mock_interface_class(device_obj, "org.freedesktop.DBus.Properties")
is_connected = props_iface.Get("org.bluez.Device1", "Connected")
# Polling detects stale connection
if not is_connected:
with server.centrals_lock:
if central_mac in server.connected_centrals:
server._handle_central_disconnected(central_mac)
# Verify polling triggered cleanup
server._handle_central_disconnected.assert_called_once_with(central_mac)
if __name__ == "__main__":
pytest.main([__file__, "-v"])