- Fix BLEInterface.handle_peripheral_data to use _compute_identity_hash instead of RNS.Identity.full_hash for consistent identity hash computation - Update MockBLEDriver.on_device_connected callback to match the (address, peer_identity) signature in bluetooth_driver.py - Fix test_v2_2_identity_handshake.py and test_v2_2_race_conditions.py to properly mock ble_reticulum.Interface without breaking the namespace - Use BLEFragmenter/BLEReassembler directly in tests instead of non-existent _create_fragmenter/_create_reassembler methods - Fix asyncio.get_event_loop() deprecation in test_ble_peer_interface.py for Python 3.10+ compatibility - Update MAC address test fixtures to account for v2.2 MAC sorting logic - Fix test_peer_address_mac_rotation to properly simulate MAC rotation where old connection is dropped before new one arrives Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
416 lines
16 KiB
Python
416 lines
16 KiB
Python
"""
|
|
Tests for BLE Protocol v2.2 Connection Race Condition Prevention
|
|
|
|
Connection race conditions were a major issue in earlier protocol versions,
|
|
causing "Operation already in progress" errors when discovery callbacks fired
|
|
rapidly. Protocol v2.2.1+ implements multi-layer protection:
|
|
|
|
1. **5-Second Rate Limiting** (Interface Layer)
|
|
- Tracks `last_connection_attempt` per peer
|
|
- Skips connection if attempted within last 5 seconds
|
|
- Prevents rapid-fire retries from discovery callbacks
|
|
|
|
2. **Driver Connection State Tracking** (Driver Layer)
|
|
- `_connecting_peers` set tracks in-progress connections
|
|
- Prevents concurrent connection attempts to same address
|
|
- Cleanup via Future callbacks ensures state consistency
|
|
|
|
3. **Early Attempt Recording** (Interface Layer)
|
|
- Records connection attempt BEFORE calling driver.connect()
|
|
- Prevents retry if discovery fires again mid-connection
|
|
|
|
These mechanisms work together to eliminate connection storms while maintaining
|
|
responsive peer discovery.
|
|
|
|
Reference: BLE_PROTOCOL_v2.2.md § Platform-Specific Workarounds → Connection
|
|
Race Condition Prevention
|
|
"""
|
|
|
|
import pytest
|
|
import sys
|
|
import os
|
|
import time
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
|
|
|
|
# Mock RNS module before importing BLEInterface
|
|
from unittest.mock import Mock, MagicMock
|
|
import sys as _sys
|
|
|
|
# Create RNS mock structure
|
|
import RNS
|
|
if not hasattr(RNS, 'LOG_INFO'):
|
|
RNS.LOG_CRITICAL = 0
|
|
RNS.LOG_ERROR = 1
|
|
RNS.LOG_WARNING = 2
|
|
RNS.LOG_NOTICE = 3
|
|
RNS.LOG_INFO = 4
|
|
RNS.LOG_VERBOSE = 5
|
|
RNS.LOG_DEBUG = 6
|
|
RNS.LOG_EXTREME = 7
|
|
RNS.log = lambda msg, level=4: None
|
|
RNS.prettyhexrep = lambda data: data.hex() if isinstance(data, bytes) else str(data)
|
|
RNS.hexrep = lambda data, delimit=True: data.hex() if isinstance(data, bytes) else str(data)
|
|
|
|
# Mock RNS.Transport
|
|
if not hasattr(RNS, 'Transport'):
|
|
RNS.Transport = MagicMock()
|
|
RNS.Transport.interfaces = []
|
|
|
|
# Mock RNS.Identity
|
|
if not hasattr(RNS, 'Identity'):
|
|
RNS.Identity = MagicMock()
|
|
RNS.Identity.full_hash = lambda x: (x * 2)[:16]
|
|
|
|
# Mock ble_reticulum.Interface module (the base class module, not the whole namespace)
|
|
# We only mock the Interface.py module, allowing BLEInterface.py to be imported from src/
|
|
if 'ble_reticulum.Interface' not in _sys.modules:
|
|
# Create mock Interface base class
|
|
class MockInterface:
|
|
MODE_FULL = 1
|
|
def __init__(self):
|
|
self.IN = True
|
|
self.OUT = True
|
|
self.online = False
|
|
|
|
@staticmethod
|
|
def get_config_obj(configuration):
|
|
"""Mock config object wrapper - just returns a dict-like object."""
|
|
class ConfigObj:
|
|
def __init__(self, config):
|
|
self._config = config if config else {}
|
|
|
|
def __getitem__(self, key):
|
|
return self._config.get(key)
|
|
|
|
def get(self, key, default=None):
|
|
return self._config.get(key, default)
|
|
|
|
def as_string(self, key, default=None):
|
|
val = self._config.get(key, default)
|
|
return str(val) if val is not None else default
|
|
|
|
def as_int(self, key, default=None):
|
|
val = self._config.get(key, default)
|
|
return int(val) if val is not None else default
|
|
|
|
def as_bool(self, key, default=False):
|
|
val = self._config.get(key, default)
|
|
if isinstance(val, bool):
|
|
return val
|
|
if isinstance(val, str):
|
|
return val.lower() in ('true', 'yes', '1', 'on')
|
|
return bool(val) if val is not None else default
|
|
return ConfigObj(configuration)
|
|
|
|
# Create a mock module for ble_reticulum.Interface
|
|
interface_module = MagicMock()
|
|
interface_module.Interface = MockInterface
|
|
_sys.modules['ble_reticulum.Interface'] = interface_module
|
|
|
|
from tests.mock_ble_driver import MockBLEDriver
|
|
from ble_reticulum.BLEInterface import BLEInterface, DiscoveredPeer
|
|
|
|
|
|
class MockOwner:
|
|
"""Mock Reticulum owner."""
|
|
def __init__(self):
|
|
self.inbound_calls = []
|
|
|
|
def inbound(self, data, interface):
|
|
self.inbound_calls.append((data, interface))
|
|
|
|
|
|
class TestRateLimiting:
|
|
"""Test 5-second connection attempt rate limiting."""
|
|
|
|
def test_5_second_rate_limit_prevents_retry(self):
|
|
"""Test that connection attempts within 5 seconds are skipped."""
|
|
driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
peer_address = "11:22:33:44:55:66"
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
|
|
# Record first connection attempt
|
|
peer.record_connection_attempt()
|
|
interface.discovered_peers[peer_address] = peer
|
|
|
|
# Immediately try to select peers (within 5 seconds)
|
|
peers_to_connect = interface._select_peers_to_connect()
|
|
peer_addresses = [p.address for p in peers_to_connect]
|
|
|
|
# Should be skipped due to rate limiting
|
|
assert peer_address not in peer_addresses
|
|
|
|
def test_connection_allowed_after_5_seconds(self):
|
|
"""Test that connection is allowed after 5-second cooldown."""
|
|
# Use local MAC lower than peer MAC so connection direction allows us to initiate
|
|
driver = MockBLEDriver(local_address="11:11:11:11:11:11")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
peer_address = "22:22:22:22:22:22"
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
|
|
# Record connection attempt 6 seconds ago (past cooldown)
|
|
peer.record_connection_attempt()
|
|
peer.last_connection_attempt = time.time() - 6.0
|
|
|
|
interface.discovered_peers[peer_address] = peer
|
|
|
|
# Should now be allowed
|
|
peers_to_connect = interface._select_peers_to_connect()
|
|
peer_addresses = [p.address for p in peers_to_connect]
|
|
|
|
assert peer_address in peer_addresses
|
|
|
|
def test_never_attempted_peer_allowed(self):
|
|
"""Test that peer with no prior attempts is allowed."""
|
|
# Use local MAC lower than peer MAC so connection direction allows us to initiate
|
|
driver = MockBLEDriver(local_address="11:11:11:11:11:11")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
peer_address = "22:22:22:22:22:22"
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
|
|
# last_connection_attempt == 0 (never attempted)
|
|
assert peer.last_connection_attempt == 0
|
|
|
|
interface.discovered_peers[peer_address] = peer
|
|
|
|
# Should be allowed
|
|
peers_to_connect = interface._select_peers_to_connect()
|
|
peer_addresses = [p.address for p in peers_to_connect]
|
|
|
|
assert peer_address in peer_addresses
|
|
|
|
|
|
class TestDriverStateTracking:
|
|
"""Test driver-level connection state tracking."""
|
|
|
|
def test_driver_tracks_connecting_peers(self):
|
|
"""Test that driver tracks addresses with connections in progress."""
|
|
# Note: This tests implementation details of LinuxBluetoothDriver
|
|
# We verify the interface checks for this state
|
|
|
|
driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
# Simulate driver state tracking
|
|
driver._connecting_peers = set()
|
|
driver._connecting_lock = __import__('threading').Lock()
|
|
|
|
peer_address = "11:22:33:44:55:66"
|
|
|
|
# Add to connecting set (simulating pending connection)
|
|
with driver._connecting_lock:
|
|
driver._connecting_peers.add(peer_address)
|
|
|
|
# Add to discovered peers
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
interface.discovered_peers[peer_address] = peer
|
|
|
|
# Try to select peers
|
|
peers_to_connect = interface._select_peers_to_connect()
|
|
peer_addresses = [p.address for p in peers_to_connect]
|
|
|
|
# Should be skipped (connection already in progress)
|
|
assert peer_address not in peer_addresses
|
|
|
|
def test_multiple_rapid_discoveries_handled(self):
|
|
"""Test that rapid discovery callbacks don't cause duplicate connections."""
|
|
# Use local MAC lower than peer MAC so connection direction allows us to initiate
|
|
driver = MockBLEDriver(local_address="11:11:11:11:11:11")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
peer_address = "22:22:22:22:22:22"
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
|
|
# Manually record the first connection attempt (simulating what _try_connect_to_peer does)
|
|
# This is needed because _select_peers_to_connect() only returns peers to connect,
|
|
# it doesn't actually initiate the connection or record the attempt
|
|
interface.discovered_peers[peer_address] = peer
|
|
first_selection = interface._select_peers_to_connect()
|
|
|
|
# First selection should include the peer
|
|
assert len(first_selection) == 1
|
|
assert first_selection[0].address == peer_address
|
|
|
|
# Record the attempt (simulating what happens when connection is initiated)
|
|
peer.record_connection_attempt()
|
|
|
|
# Subsequent rapid selections should be rate-limited
|
|
for i in range(4):
|
|
subsequent_selection = interface._select_peers_to_connect()
|
|
# Should be empty because peer was just attempted
|
|
assert len(subsequent_selection) == 0, f"Selection {i+2} should be empty due to rate limiting"
|
|
|
|
# Verify timestamp was recorded
|
|
assert peer.last_connection_attempt > 0
|
|
time_since = time.time() - peer.last_connection_attempt
|
|
assert time_since < 1.0 # Should be very recent
|
|
|
|
|
|
class TestEarlyAttemptRecording:
|
|
"""Test early recording of connection attempts."""
|
|
|
|
def test_attempt_recorded_before_driver_connect(self):
|
|
"""Test that attempt is recorded before driver.connect() is called."""
|
|
# This test verifies the fix for the race condition where discovery
|
|
# callbacks would fire again before driver.connect() completed
|
|
|
|
driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
peer_address = "11:22:33:44:55:66"
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
interface.discovered_peers[peer_address] = peer
|
|
|
|
# Initial state: no attempts
|
|
assert peer.connection_attempts == 0
|
|
assert peer.last_connection_attempt == 0
|
|
|
|
# Trigger discovery callback (which calls _select_peers_to_connect)
|
|
device = type('obj', (object,), {
|
|
'address': peer_address,
|
|
'name': 'TestPeer',
|
|
'rssi': -60,
|
|
'service_uuids': [],
|
|
'manufacturer_data': {}
|
|
})()
|
|
|
|
# Simulate device discovered callback
|
|
interface._device_discovered_callback(device)
|
|
|
|
# Verify attempt was recorded
|
|
# (Implementation detail: recorded in _device_discovered_callback
|
|
# or when connect is initiated)
|
|
# The key is that last_connection_attempt > 0 after first discovery
|
|
|
|
|
|
class TestCombinedProtection:
|
|
"""Test that all protection layers work together."""
|
|
|
|
def test_layered_protection_prevents_connection_storm(self):
|
|
"""Test that layered protection prevents connection storm scenario."""
|
|
driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
# Simulate driver connection state tracking
|
|
driver._connecting_peers = set()
|
|
driver._connecting_lock = __import__('threading').Lock()
|
|
|
|
peer_address = "11:22:33:44:55:66"
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
interface.discovered_peers[peer_address] = peer
|
|
|
|
connection_attempts = []
|
|
|
|
# Mock driver.connect to track attempts
|
|
original_connect = driver.connect
|
|
def tracked_connect(address):
|
|
connection_attempts.append(address)
|
|
with driver._connecting_lock:
|
|
driver._connecting_peers.add(address)
|
|
original_connect(address)
|
|
|
|
driver.connect = tracked_connect
|
|
|
|
# Simulate rapid discovery (10 callbacks in quick succession)
|
|
for i in range(10):
|
|
peers = interface._select_peers_to_connect()
|
|
for p in peers:
|
|
if p.address == peer_address:
|
|
driver.connect(p.address)
|
|
|
|
# Despite 10 discovery callbacks, should have at most 1 connection attempt
|
|
assert len(connection_attempts) <= 1
|
|
|
|
def test_concurrent_discovery_callbacks(self):
|
|
"""Test behavior with concurrent discovery callbacks."""
|
|
import threading
|
|
|
|
driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF")
|
|
owner = MockOwner()
|
|
|
|
config = {"name": "Test", "enable_central": True}
|
|
interface = BLEInterface(owner, config)
|
|
interface.driver = driver
|
|
interface.local_address = driver.local_address
|
|
|
|
# Simulate driver state
|
|
driver._connecting_peers = set()
|
|
driver._connecting_lock = threading.Lock()
|
|
|
|
peer_address = "11:22:33:44:55:66"
|
|
peer = DiscoveredPeer(peer_address, "TestPeer", -60)
|
|
interface.discovered_peers[peer_address] = peer
|
|
|
|
# Track connection attempts from multiple threads
|
|
attempts = []
|
|
attempts_lock = threading.Lock()
|
|
|
|
def try_connect():
|
|
"""Simulate concurrent discovery callback."""
|
|
time.sleep(0.01) # Small delay to ensure overlap
|
|
peers = interface._select_peers_to_connect()
|
|
for p in peers:
|
|
if p.address == peer_address:
|
|
with attempts_lock:
|
|
attempts.append(p.address)
|
|
# Simulate connection attempt
|
|
with driver._connecting_lock:
|
|
if peer_address not in driver._connecting_peers:
|
|
driver._connecting_peers.add(peer_address)
|
|
|
|
# Launch 5 concurrent "discovery" threads
|
|
threads = [threading.Thread(target=try_connect) for _ in range(5)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# Should have very few connection attempts due to protection layers
|
|
# (Rate limiting and driver state tracking)
|
|
assert len(attempts) <= 2 # Allow small window before protection kicks in
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|