ble-reticulum/tests/mock_ble_driver.py
torlando-tech b0246e320b fix(ble): Prevent data loss from peripheral reassembler race condition
Reorder operations in handle_peripheral_data() to create
fragmenter/reassembler BEFORE spawning peer interface. This
prevents data from being dropped during the brief window when
the interface exists but the reassembler doesn't.

Also adds unit tests to verify the fix and prevent regression.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 12:21:42 -05:00

405 lines
14 KiB
Python

"""
Mock BLE Driver for Unit Testing
This module provides a mock implementation of BLEDriverInterface that simulates
BLE behavior without requiring actual Bluetooth hardware. It's designed for
unit testing BLEInterface logic including:
- Fragmentation and reassembly
- Peer lifecycle management
- Connection blacklist logic
- MAC-based connection direction
- Error handling
Usage:
# Create two mock drivers to simulate a pair of peers
driver1 = MockBLEDriver()
driver2 = MockBLEDriver()
# Link them to enable bidirectional communication
MockBLEDriver.link_drivers(driver1, driver2)
# Simulate discovery
driver1.simulate_device_discovered("AA:BB:CC:DD:EE:FF", "RNS-Test", -60)
# Simulate connection
driver1.connect("AA:BB:CC:DD:EE:FF")
# Simulate data transfer
driver1.send("AA:BB:CC:DD:EE:FF", b"test data")
# -> Triggers driver2.on_data_received("11:22:33:44:55:66", b"test data")
"""
import sys
import os
# Add src directory to path for imports
src_path = os.path.join(os.path.dirname(__file__), '..', 'src')
if src_path not in sys.path:
sys.path.insert(0, src_path)
# Import directly using importlib to bypass RNS namespace conflicts
# This avoids issues when a real RNS package is installed globally
import importlib.util
bluetooth_driver_path = os.path.join(src_path, 'RNS', 'Interfaces', 'bluetooth_driver.py')
spec = importlib.util.spec_from_file_location("bluetooth_driver", bluetooth_driver_path)
bluetooth_driver = importlib.util.module_from_spec(spec)
spec.loader.exec_module(bluetooth_driver)
BLEDriverInterface = bluetooth_driver.BLEDriverInterface
BLEDevice = bluetooth_driver.BLEDevice
DriverState = bluetooth_driver.DriverState
from typing import List, Optional, Callable, Dict
import time
class MockBLEDriver(BLEDriverInterface):
"""
Mock BLE driver that simulates Bluetooth behavior for testing.
"""
def __init__(self, local_address: str = "11:22:33:44:55:66"):
"""
Initialize the mock driver.
Args:
local_address: Simulated MAC address for this driver
"""
self.local_address = local_address
self._state = DriverState.IDLE
self._connected_peers: Dict[str, dict] = {} # address -> {role, mtu, identity}
self._identity: Optional[bytes] = None
self._service_discovery_delay: float = 0.0 # No delay in mock
self._power_mode: str = "balanced"
# UUIDs (set via start())
self._service_uuid: Optional[str] = None
self._rx_char_uuid: Optional[str] = None
self._tx_char_uuid: Optional[str] = None
self._identity_char_uuid: Optional[str] = None
# Callbacks (assigned by consumer)
self.on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
self.on_device_connected: Optional[Callable[[str], None]] = None
self.on_device_disconnected: Optional[Callable[[str], None]] = None
self.on_data_received: Optional[Callable[[str, bytes], None]] = None
self.on_mtu_negotiated: Optional[Callable[[str, int], None]] = None
self.on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None
# Linked driver for bidirectional communication testing
self._linked_driver: Optional['MockBLEDriver'] = None
# Simulated characteristics storage
self._characteristics: Dict[str, bytes] = {} # char_uuid -> value
# Track sent data for assertions
self.sent_data: List[tuple] = [] # [(address, data), ...]
# --- Lifecycle & Configuration ---
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
"""Initialize the mock driver with UUIDs."""
self._service_uuid = service_uuid
self._rx_char_uuid = rx_char_uuid
self._tx_char_uuid = tx_char_uuid
self._identity_char_uuid = identity_char_uuid
self._state = DriverState.IDLE
def stop(self):
"""Stop all activity and disconnect all peers."""
for address in list(self._connected_peers.keys()):
self.disconnect(address)
self._state = DriverState.IDLE
def set_identity(self, identity_bytes: bytes):
"""Set the local identity value."""
self._identity = identity_bytes
self._characteristics[self._identity_char_uuid] = identity_bytes
# --- State & Properties ---
@property
def state(self) -> DriverState:
"""Return current state."""
return self._state
@property
def connected_peers(self) -> List[str]:
"""Return list of connected peer addresses."""
return list(self._connected_peers.keys())
# --- Core Actions ---
def start_scanning(self):
"""Start scanning (simulated)."""
self._state = DriverState.SCANNING
def stop_scanning(self):
"""Stop scanning."""
if self._state == DriverState.SCANNING:
self._state = DriverState.IDLE
def start_advertising(self, device_name: str, identity: bytes):
"""Start advertising (simulated)."""
self._identity = identity
self._characteristics[self._identity_char_uuid] = identity
self._state = DriverState.ADVERTISING
def stop_advertising(self):
"""Stop advertising."""
if self._state == DriverState.ADVERTISING:
self._state = DriverState.IDLE
def connect(self, address: str):
"""
Simulate connecting to a peer (central role).
If a linked driver is set and its address matches, establishes
a bidirectional connection.
"""
if address in self._connected_peers:
return # Already connected
# Simulate connection with default MTU
self._connected_peers[address] = {
"role": "central",
"mtu": 185, # Default MTU
"identity": None
}
# Trigger callback
if self.on_device_connected:
self.on_device_connected(address)
# Trigger MTU negotiation callback
if self.on_mtu_negotiated:
self.on_mtu_negotiated(address, 185)
# If linked driver exists and address matches, establish reverse connection
if self._linked_driver and self._linked_driver.local_address == address:
self._linked_driver._accept_connection(self.local_address)
def _accept_connection(self, address: str):
"""
Internal: Accept incoming connection (peripheral role).
Called by linked driver when it connects to us.
"""
if address in self._connected_peers:
return
self._connected_peers[address] = {
"role": "peripheral",
"mtu": 185,
"identity": None
}
if self.on_device_connected:
self.on_device_connected(address)
if self.on_mtu_negotiated:
self.on_mtu_negotiated(address, 185)
def disconnect(self, address: str):
"""Disconnect from a peer."""
if address not in self._connected_peers:
return
# Remove peer
role = self._connected_peers[address]["role"]
del self._connected_peers[address]
# Trigger callback
if self.on_device_disconnected:
self.on_device_disconnected(address)
# If linked, trigger disconnect on other side
if self._linked_driver and self._linked_driver.local_address == address:
if role == "central":
self._linked_driver._handle_disconnect(self.local_address)
else:
self._linked_driver._handle_disconnect(self.local_address)
def _handle_disconnect(self, address: str):
"""Internal: Handle disconnection initiated by peer."""
if address not in self._connected_peers:
return
del self._connected_peers[address]
if self.on_device_disconnected:
self.on_device_disconnected(address)
def send(self, address: str, data: bytes):
"""
Send data to a connected peer.
Role-aware: automatically routes to linked driver's on_data_received.
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# Track for assertions
self.sent_data.append((address, data))
# If linked driver exists, deliver data
if self._linked_driver and self._linked_driver.local_address == address:
if self._linked_driver.on_data_received:
self._linked_driver.on_data_received(self.local_address, data)
# --- GATT Characteristic Operations ---
def read_characteristic(self, address: str, char_uuid: str) -> bytes:
"""
Read a characteristic value from a peer.
If linked driver exists, reads from its characteristics.
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# If linked driver, read from its characteristics
if self._linked_driver and self._linked_driver.local_address == address:
if char_uuid in self._linked_driver._characteristics:
return self._linked_driver._characteristics[char_uuid]
else:
raise KeyError(f"Characteristic {char_uuid} not found")
else:
# For testing without linked driver
if char_uuid in self._characteristics:
return self._characteristics[char_uuid]
else:
raise KeyError(f"Characteristic {char_uuid} not found")
def write_characteristic(self, address: str, char_uuid: str, data: bytes):
"""
Write a characteristic value to a peer.
If linked driver exists, writes to its characteristics.
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# If linked driver, write to its characteristics
if self._linked_driver and self._linked_driver.local_address == address:
self._linked_driver._characteristics[char_uuid] = data
else:
# For testing without linked driver
self._characteristics[char_uuid] = data
def start_notify(self, address: str, char_uuid: str, callback: Callable[[bytes], None]):
"""
Subscribe to notifications from a characteristic.
In the mock, this is a no-op since data delivery is automatic via send().
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# In mock, notifications are handled automatically via send()
pass
# --- Configuration & Queries ---
def get_local_address(self) -> str:
"""Return the simulated local MAC address."""
return self.local_address
def set_service_discovery_delay(self, seconds: float):
"""Set service discovery delay (no-op in mock)."""
self._service_discovery_delay = seconds
def set_power_mode(self, mode: str):
"""Set power mode (tracked but not enforced in mock)."""
self._power_mode = mode
# --- Test Helper Methods ---
def simulate_device_discovered(self, address: str, name: str, rssi: int,
service_uuids: Optional[List[str]] = None,
manufacturer_data: Optional[Dict[int, bytes]] = None):
"""
Simulate discovering a BLE device.
Args:
address: Device MAC address
name: Device name
rssi: Signal strength
service_uuids: Optional list of advertised service UUIDs
manufacturer_data: Optional manufacturer data
"""
if self._state != DriverState.SCANNING:
return
device = BLEDevice(
address=address,
name=name,
rssi=rssi,
service_uuids=service_uuids or [],
manufacturer_data=manufacturer_data or {}
)
if self.on_device_discovered:
self.on_device_discovered(device)
def simulate_mtu_change(self, address: str, new_mtu: int):
"""
Simulate MTU renegotiation on an existing connection.
Args:
address: Peer address
new_mtu: New MTU value
"""
if address not in self._connected_peers:
return
self._connected_peers[address]["mtu"] = new_mtu
if self.on_mtu_negotiated:
self.on_mtu_negotiated(address, new_mtu)
def simulate_error(self, severity: str, message: str, exception: Optional[Exception] = None):
"""
Simulate a platform error.
Args:
severity: "warning" or "error"
message: Error message
exception: Optional exception object
"""
if self.on_error:
self.on_error(severity, message, exception)
def get_peer_role(self, address: str) -> Optional[str]:
"""
Get the connection role for a peer.
Args:
address: Peer address
Returns:
"central" or "peripheral", or None if not connected
"""
if address in self._connected_peers:
return self._connected_peers[address]["role"]
return None
@staticmethod
def link_drivers(driver1: 'MockBLEDriver', driver2: 'MockBLEDriver'):
"""
Link two mock drivers for bidirectional communication.
This simulates a pair of BLE devices that can discover, connect,
and exchange data with each other.
Args:
driver1: First driver
driver2: Second driver
"""
driver1._linked_driver = driver2
driver2._linked_driver = driver1
def reset(self):
"""Reset the mock driver to initial state (useful between tests)."""
self.stop()
self.sent_data.clear()
self._characteristics.clear()
self._identity = None