ble-reticulum/tests/test_bluez_state_cleanup.py
torlando-tech 2fbb9c3ad2 refactor: rename package from RNS.Interfaces to ble_reticulum
Fixes namespace collision with Reticulum's own RNS.Interfaces package.
When both packages were installed, the collision caused import issues
and prevented BLE discovery between devices.

Changes:
- Rename src/RNS/Interfaces/ to src/ble_reticulum/
- Update pyproject.toml package configuration
- Update all imports in source and test files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 23:30:07 -05:00

267 lines
10 KiB
Python

"""
Tests for BlueZ State Cleanup Mechanisms (v2.2.2+)
BlueZ state corruption was a persistent issue causing "Operation already in
progress" errors after connection failures. These errors occurred when:
1. Connection attempts failed due to timeouts or peer disappearance
2. BleakClient was abandoned without explicit cleanup
3. BlueZ maintained stale connection state and D-Bus device objects
4. Subsequent reconnection attempts hit the stale state
Protocol v2.2.2+ implements comprehensive cleanup:
1. **Explicit client disconnect** in timeout and failure exception handlers
2. **D-Bus device removal** via BlueZ RemoveDevice() API
3. **Post-blacklist cleanup** when peers reach max connection failures
These tests verify that cleanup mechanisms are properly invoked and prevent
persistent BlueZ state corruption.
Reference: BLE_PROTOCOL_v2.2.md § Problem: "Operation already in progress"
errors persisting after connection failures
"""
import pytest
import sys
import os
import asyncio
import threading
from unittest.mock import Mock, MagicMock, AsyncMock, patch, call
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
# Mock RNS module before importing
import RNS
if not hasattr(RNS, 'LOG_INFO'):
RNS.LOG_CRITICAL = 0
RNS.LOG_ERROR = 1
RNS.LOG_WARNING = 2
RNS.LOG_NOTICE = 3
RNS.LOG_INFO = 4
RNS.LOG_VERBOSE = 5
RNS.LOG_DEBUG = 6
RNS.LOG_EXTREME = 7
RNS.log = Mock()
class TestBlueZStateCleanup:
"""Test BlueZ state cleanup mechanisms."""
@pytest.fixture
def mock_driver(self):
"""Create a mock Linux BLE driver with cleanup methods."""
driver = Mock()
driver.loop = asyncio.new_event_loop()
driver._connecting_peers = set()
driver._connecting_lock = threading.RLock() # Use threading lock for mock
driver._remove_bluez_device = AsyncMock(return_value=True)
driver._log = Mock()
return driver
@pytest.mark.asyncio
async def test_client_disconnect_on_timeout(self, mock_driver):
"""Test that client.disconnect() is called on connection timeout."""
# Create mock client
mock_client = AsyncMock()
mock_client.is_connected = True
mock_client.disconnect = AsyncMock()
# Simulate timeout scenario
address = "AA:BB:CC:DD:EE:FF"
# The cleanup code checks if 'client' exists in locals
# In real code this happens in the exception handler
try:
# Simulate connection timeout
raise asyncio.TimeoutError()
except asyncio.TimeoutError:
# This is what the actual code does
if mock_client and hasattr(mock_client, 'is_connected'):
if mock_client.is_connected:
await mock_client.disconnect()
# Verify disconnect was called
mock_client.disconnect.assert_called_once()
@pytest.mark.asyncio
async def test_client_disconnect_on_failure(self, mock_driver):
"""Test that client.disconnect() is called on connection failure."""
# Create mock client
mock_client = AsyncMock()
mock_client.is_connected = True
mock_client.disconnect = AsyncMock()
# Simulate failure scenario
address = "AA:BB:CC:DD:EE:FF"
try:
# Simulate connection failure
raise Exception("Connection failed")
except Exception:
# This is what the actual code does
if mock_client and hasattr(mock_client, 'is_connected'):
if mock_client.is_connected:
await mock_client.disconnect()
# Verify disconnect was called
mock_client.disconnect.assert_called_once()
@pytest.mark.asyncio
async def test_bluez_device_removal_on_timeout(self, mock_driver):
"""Test that BlueZ device is removed after connection timeout."""
address = "AA:BB:CC:DD:EE:FF"
# Simulate the cleanup that happens in exception handler
await mock_driver._remove_bluez_device(address)
# Verify removal was called
mock_driver._remove_bluez_device.assert_called_once_with(address)
@pytest.mark.asyncio
async def test_bluez_device_removal_on_failure(self, mock_driver):
"""Test that BlueZ device is removed after connection failure."""
address = "AA:BB:CC:DD:EE:FF"
# Simulate the cleanup that happens in exception handler
await mock_driver._remove_bluez_device(address)
# Verify removal was called
mock_driver._remove_bluez_device.assert_called_once_with(address)
def test_post_blacklist_cleanup_triggered(self, mock_driver):
"""Test that cleanup is triggered when peer is blacklisted."""
# Mock the interface and peer without importing
interface = Mock()
interface.driver = mock_driver
interface.max_connection_failures = 3
interface.connection_retry_backoff = 60
interface.connection_blacklist = {}
interface.discovered_peers = {}
# Mock peer
address = "AA:BB:CC:DD:EE:FF"
peer = Mock()
peer.name = "Test Peer"
peer.failed_connections = 3 # Exactly at threshold
peer.record_connection_failure = Mock()
interface.discovered_peers[address] = peer
# Mock asyncio.run_coroutine_threadsafe
with patch('asyncio.run_coroutine_threadsafe') as mock_run_coro:
mock_future = Mock()
mock_future.result = Mock(return_value=True)
mock_run_coro.return_value = mock_future
# Simulate what _record_connection_failure does
if address in interface.discovered_peers:
peer = interface.discovered_peers[address]
peer.record_connection_failure()
# Check if we should blacklist
if peer.failed_connections >= interface.max_connection_failures:
import time
backoff_multiplier = min(peer.failed_connections - interface.max_connection_failures + 1, 8)
blacklist_duration = interface.connection_retry_backoff * backoff_multiplier
blacklist_until = time.time() + blacklist_duration
interface.connection_blacklist[address] = (blacklist_until, peer.failed_connections)
# This is where cleanup would be triggered
if hasattr(interface.driver, '_remove_bluez_device'):
future = asyncio.run_coroutine_threadsafe(
interface.driver._remove_bluez_device(address),
interface.driver.loop
)
# Verify cleanup was scheduled
assert mock_run_coro.called
# Verify device was blacklisted
assert address in interface.connection_blacklist
@pytest.mark.asyncio
async def test_remove_bluez_device_handles_nonexistent_device(self, mock_driver):
"""Test that _remove_bluez_device() handles device not existing."""
# Make the mock raise an exception for non-existent device
mock_driver._remove_bluez_device = AsyncMock(side_effect=Exception("does not exist"))
# Should not raise, just log
address = "AA:BB:CC:DD:EE:FF"
try:
await mock_driver._remove_bluez_device(address)
except Exception:
pass # Expected to be caught and logged
# Verify it was called
mock_driver._remove_bluez_device.assert_called_once_with(address)
def test_cleanup_prevents_persistent_errors(self):
"""
Integration test: Verify that cleanup prevents persistent errors across
multiple connection attempts.
Scenario:
1. First connection attempt times out
2. Cleanup is performed
3. Second connection attempt should succeed (not hit stale state)
"""
# This is a conceptual test - in practice, we verify that:
# 1. Disconnect is called
# 2. Device removal is called
# 3. These happen in the correct order
# The actual prevention of errors is verified via integration testing
assert True # Placeholder - real integration test would run on Pi
class TestRemoveBlueZDeviceMethod:
"""Test the _remove_bluez_device() implementation."""
@pytest.mark.asyncio
async def test_requires_dbus(self):
"""Test that method returns False when D-Bus is not available."""
from ble_reticulum import linux_bluetooth_driver
# Mock HAS_DBUS to False
with patch.object(linux_bluetooth_driver, 'HAS_DBUS', False):
driver = Mock()
driver._log = Mock()
driver.adapter_path = "/org/bluez/hci0"
# Create a simplified version of the method
async def _remove_bluez_device_no_dbus(address):
if not linux_bluetooth_driver.HAS_DBUS:
return False
return True
result = await _remove_bluez_device_no_dbus("AA:BB:CC:DD:EE:FF")
assert result == False
@pytest.mark.asyncio
async def test_formats_dbus_path_correctly(self):
"""Test that MAC address is correctly converted to D-Bus path format."""
address = "AA:BB:CC:DD:EE:FF"
adapter_path = "/org/bluez/hci0"
# Expected D-Bus path format
expected_path = f"{adapter_path}/dev_{address.replace(':', '_')}"
assert expected_path == "/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF"
@pytest.mark.asyncio
async def test_handles_device_already_removed(self):
"""Test that method handles device already being removed gracefully."""
# Simulate device not existing
error_msg = "UnknownObject: Device does not exist"
# Should be caught and logged at DEBUG level, not raise
try:
raise Exception(error_msg)
except Exception as e:
error_str = str(e).lower()
# This is how the code checks for expected errors
is_expected = "does not exist" in error_str or "unknownobject" in error_str
assert is_expected == True
if __name__ == "__main__":
pytest.main([__file__, "-v"])