ble-reticulum/tests/test_ble_peer_interface.py
Torlando d475432cb0
fix(ble): Event-driven D-Bus monitoring to eliminate HCI errors on BCM43xx chips (#30)
* fix(ble): Increase D-Bus monitoring intervals to prevent HCI errors

The D-Bus monitoring threads were polling too frequently (0.5s and 30s),
causing HCI command collisions on BCM43xx single-radio chips. These chips
cannot handle concurrent BLE operations, and the frequent D-Bus activity
was interfering with scan/advertise cycles.

Changes:
- Increase D-Bus disconnect monitor interval from 0.5s to 5s
- Increase stale connection poll interval from 30s to 120s

This eliminates HCI errors (Opcode 0x2005/0x2006) while preserving
disconnect detection functionality with slightly higher latency.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(ble): Convert D-Bus monitoring to event-driven approach

Replace polling-based D-Bus monitoring with true event-driven pattern:

1. D-Bus monitor thread:
   - Use asyncio.Event instead of periodic sleep
   - Store loop reference for thread-safe shutdown
   - Use call_soon_threadsafe to wake loop on stop

2. Stale poll thread:
   - Replace busy-wait loop (240 x 0.5s) with single Event.wait()
   - Increase interval from 120s to 300s (safety net only)
   - Immediate response to stop signal

Benefits:
- Zero CPU usage while waiting (no periodic wakeups)
- Immediate shutdown response (ms instead of 5s)
- Cleaner, simpler code
- Maintains disconnect detection via D-Bus signals

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(ble): Add comprehensive unit tests for HCI error fixes

Add 26 new unit tests for the event-driven D-Bus monitoring fixes
that eliminated HCI errors on BCM43xx single-radio chips.

Test coverage:
- TestEventDrivenDBusMonitor: Tests asyncio.Event usage, immediate
  wake response, call_soon_threadsafe cross-thread signaling
- TestStalePollImprovements: Tests threading.Event.wait() usage,
  300s interval, immediate stop response
- TestStopShutdownBehavior: Tests stop() async signaling, RuntimeError
  handling, shutdown latency improvement
- TestIntegrationScenarios: Tests full lifecycle, multiple stop calls
- TestCodeVerification: Verifies actual source patterns match expected

All 26 tests pass without requiring pytest-asyncio plugin.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(tests): Use threading.RLock instead of asyncio.Lock in test fixtures

In Python 3.8/3.9, asyncio.Lock() requires a running event loop. When
test_hci_error_fixes.py runs first (alphabetically) and uses asyncio.run(),
it closes the event loop after each test. Subsequent test fixtures that
create asyncio.Lock() then fail with "no current event loop" errors.

Since these are mock fixtures that don't need async semantics, use
threading.RLock() instead.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(tests): Replace all asyncio.Lock() with threading.RLock() in test mocks

asyncio.Lock() requires a running event loop in Python 3.8/3.9. When
test files using asyncio.run() execute first, the event loop is closed,
causing subsequent test fixtures to fail when creating asyncio.Lock().

Fixed in:
- test_peripheral_disconnect_cleanup.py (mock_gatt_server fixture)
- test_bluez_state_cleanup.py (mock_driver fixture)
- test_ble_peer_interface.py (create_mock_peer_interface helper)
- conftest.py (create_mock_interface helper)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: torlando-tech <torlando-tech@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 00:46:06 -05:00

305 lines
11 KiB
Python

"""
Unit tests for BLEPeerInterface class.
Tests the spawned peer interface that represents individual BLE connections,
including data flow, fragmentation, and both central/peripheral modes.
"""
import pytest
import asyncio
import threading
from unittest.mock import Mock, AsyncMock, patch, MagicMock
# Import fragmentation for testing
try:
from RNS.Interfaces.BLEFragmentation import BLEFragmenter, BLEReassembler
except ImportError:
BLEFragmenter = None
BLEReassembler = None
# ============================================================================
# Helper: Create Mock BLEPeerInterface
# ============================================================================
def create_mock_peer_interface(peer_address="AA:BB:CC:DD:EE:FF", peer_name="TestPeer", is_peripheral=False):
"""Create a mock BLEPeerInterface for testing."""
# Mock parent interface
parent = Mock()
parent.name = "TestBLEInterface"
parent.owner = Mock()
parent.owner.inbound = Mock()
parent.online = True
parent.HW_MTU = 500
parent.bitrate = 700000
parent.rxb = 0
parent.txb = 0
parent.peers = {peer_address: (Mock(is_connected=True), 0, 185)}
parent.fragmenters = {peer_address: BLEFragmenter(mtu=185) if BLEFragmenter else Mock()}
parent.reassemblers = {peer_address: BLEReassembler() if BLEReassembler else Mock()}
parent.frag_lock = threading.RLock() # Use threading lock for mock
parent.peer_lock = threading.RLock() # Use threading lock for mock
parent.loop = asyncio.get_event_loop()
parent.gatt_server = Mock()
parent.gatt_server.send_notification = AsyncMock(return_value=True)
# Mock peer interface
peer_if = Mock()
peer_if.parent_interface = parent
peer_if.peer_address = peer_address
peer_if.peer_name = peer_name
peer_if.online = True
peer_if.is_peripheral_connection = is_peripheral
peer_if.HW_MTU = parent.HW_MTU
peer_if.bitrate = parent.bitrate
peer_if.rxb = 0
peer_if.txb = 0
return peer_if, parent
# ============================================================================
# Basic Operations Tests
# ============================================================================
@pytest.mark.skipif(BLEFragmenter is None, reason="BLEFragmentation not available")
class TestPeerInterfaceBasics:
"""Test basic BLEPeerInterface operations."""
def test_peer_interface_initialization(self):
"""Test that peer interface initializes with correct attributes."""
peer_if, parent = create_mock_peer_interface(
peer_address="AA:BB:CC:DD:EE:FF",
peer_name="TestDevice"
)
assert peer_if.parent_interface == parent
assert peer_if.peer_address == "AA:BB:CC:DD:EE:FF"
assert peer_if.peer_name == "TestDevice"
assert peer_if.online is True
assert peer_if.HW_MTU == 500
assert peer_if.bitrate == 700000
def test_process_incoming_updates_stats(self):
"""Test that processing incoming data updates statistics."""
peer_if, parent = create_mock_peer_interface()
# Simulate incoming data
test_data = b"Hello, BLE!" * 10
initial_rxb = peer_if.rxb
# Mock the process_incoming behavior
peer_if.rxb += len(test_data)
parent.rxb += len(test_data)
# Verify stats updated
assert peer_if.rxb == initial_rxb + len(test_data)
assert parent.rxb == len(test_data)
def test_process_outgoing_updates_stats(self):
"""Test that sending data updates statistics."""
peer_if, parent = create_mock_peer_interface()
# Simulate outgoing data
test_data = b"Hello, BLE!" * 10
initial_txb = peer_if.txb
# Mock the process_outgoing behavior (fragmenting)
fragmenter = parent.fragmenters[peer_if.peer_address]
if hasattr(fragmenter, 'fragment_packet'):
fragments = fragmenter.fragment_packet(test_data)
for frag in fragments:
peer_if.txb += len(frag)
parent.txb += len(frag)
# Verify stats updated
assert peer_if.txb > initial_txb
assert parent.txb > 0
def test_detach_cleanup(self):
"""Test that detach properly cleans up."""
peer_if, parent = create_mock_peer_interface()
# Simulate detach
peer_if.online = False
# Verify state
assert peer_if.online is False
def test_should_ingress_limit_inheritance(self):
"""Test that ingress limiting inherits from parent."""
peer_if, parent = create_mock_peer_interface()
# Mock parent's should_ingress_limit
parent.should_ingress_limit = Mock(return_value=True)
# Peer interface should return same value
# (In real code, this would be: peer_if.should_ingress_limit())
assert parent.should_ingress_limit() is True
# ============================================================================
# Central Mode Send Tests
# ============================================================================
@pytest.mark.skipif(BLEFragmenter is None, reason="BLEFragmentation not available")
class TestCentralModeSend:
"""Test sending data in central mode (via GATT write)."""
def test_send_via_central_single_fragment(self):
"""Test sending data that fits in one fragment."""
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
# Small data that fits in one fragment
test_data = b"Small packet"
fragmenter = parent.fragmenters[peer_if.peer_address]
# Fragment the data
fragments = fragmenter.fragment_packet(test_data)
# Should be only 1 fragment for small data
assert len(fragments) == 1
def test_send_via_central_multiple_fragments(self):
"""Test sending data that requires multiple fragments."""
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
# Large data that needs fragmentation
test_data = b"X" * 500 # 500 bytes > MTU(185)
fragmenter = parent.fragmenters[peer_if.peer_address]
# Fragment the data
fragments = fragmenter.fragment_packet(test_data)
# Should be multiple fragments
assert len(fragments) > 1
@pytest.mark.asyncio
async def test_send_via_central_timeout(self):
"""Test handling of write timeout in central mode."""
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
# Get mock client
client, _, _ = parent.peers[peer_if.peer_address]
# Configure client to timeout
async def timeout_write(*args, **kwargs):
await asyncio.sleep(0.1)
raise asyncio.TimeoutError("Write timeout")
client.write_gatt_char = AsyncMock(side_effect=timeout_write)
# Attempt write should timeout
with pytest.raises(asyncio.TimeoutError):
await client.write_gatt_char("dummy-uuid", b"data")
@pytest.mark.asyncio
async def test_send_via_central_connection_error(self):
"""Test handling of connection loss during send."""
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
# Get mock client
client, _, _ = parent.peers[peer_if.peer_address]
# Simulate disconnection
client.is_connected = False
# Verify disconnection is detected
assert client.is_connected is False
def test_send_via_central_no_fragmenter(self):
"""Test handling when fragmenter is missing."""
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
# Remove fragmenter
del parent.fragmenters[peer_if.peer_address]
# Verify fragmenter is missing
assert peer_if.peer_address not in parent.fragmenters
# ============================================================================
# Peripheral Mode Send Tests
# ============================================================================
@pytest.mark.skipif(BLEFragmenter is None, reason="BLEFragmentation not available")
class TestPeripheralModeSend:
"""Test sending data in peripheral mode (via GATT notifications)."""
@pytest.mark.asyncio
async def test_send_via_peripheral_single_fragment(self):
"""Test sending notification with single fragment."""
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
# Small data that fits in one fragment
test_data = b"Small notification"
fragmenter = parent.fragmenters[peer_if.peer_address]
fragments = fragmenter.fragment_packet(test_data)
# Should be 1 fragment
assert len(fragments) == 1
# Send notification
result = await parent.gatt_server.send_notification(fragments[0], peer_if.peer_address)
assert result is True
@pytest.mark.asyncio
async def test_send_via_peripheral_multiple_fragments(self):
"""Test sending multiple notifications."""
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
# Large data needing fragmentation
test_data = b"Y" * 500
fragmenter = parent.fragmenters[peer_if.peer_address]
fragments = fragmenter.fragment_packet(test_data)
# Should be multiple fragments
assert len(fragments) > 1
# Send all fragments
for frag in fragments:
result = await parent.gatt_server.send_notification(frag, peer_if.peer_address)
assert result is True
@pytest.mark.asyncio
async def test_send_via_peripheral_no_server(self):
"""Test handling when GATT server is not available."""
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
# Remove server
parent.gatt_server = None
# Verify no server
assert parent.gatt_server is None
@pytest.mark.asyncio
async def test_send_via_peripheral_timeout(self):
"""Test notification timeout handling."""
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
# Configure server to timeout
async def timeout_notification(*args, **kwargs):
await asyncio.sleep(0.1)
raise asyncio.TimeoutError("Notification timeout")
parent.gatt_server.send_notification = AsyncMock(side_effect=timeout_notification)
# Should timeout
with pytest.raises(asyncio.TimeoutError):
await parent.gatt_server.send_notification(b"data", peer_if.peer_address)
@pytest.mark.asyncio
async def test_send_via_peripheral_central_disconnected(self):
"""Test handling when target central is not connected."""
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
# Configure server to return False (not connected)
parent.gatt_server.send_notification = AsyncMock(return_value=False)
# Should return False
result = await parent.gatt_server.send_notification(b"data", peer_if.peer_address)
assert result is False
if __name__ == "__main__":
pytest.main([__file__, "-v"])