Reorder operations in handle_peripheral_data() to create fragmenter/reassembler BEFORE spawning peer interface. This prevents data from being dropped during the brief window when the interface exists but the reassembler doesn't. Also adds unit tests to verify the fix and prevent regression. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
405 lines
14 KiB
Python
405 lines
14 KiB
Python
"""
|
|
Mock BLE Driver for Unit Testing
|
|
|
|
This module provides a mock implementation of BLEDriverInterface that simulates
|
|
BLE behavior without requiring actual Bluetooth hardware. It's designed for
|
|
unit testing BLEInterface logic including:
|
|
|
|
- Fragmentation and reassembly
|
|
- Peer lifecycle management
|
|
- Connection blacklist logic
|
|
- MAC-based connection direction
|
|
- Error handling
|
|
|
|
Usage:
|
|
# Create two mock drivers to simulate a pair of peers
|
|
driver1 = MockBLEDriver()
|
|
driver2 = MockBLEDriver()
|
|
|
|
# Link them to enable bidirectional communication
|
|
MockBLEDriver.link_drivers(driver1, driver2)
|
|
|
|
# Simulate discovery
|
|
driver1.simulate_device_discovered("AA:BB:CC:DD:EE:FF", "RNS-Test", -60)
|
|
|
|
# Simulate connection
|
|
driver1.connect("AA:BB:CC:DD:EE:FF")
|
|
|
|
# Simulate data transfer
|
|
driver1.send("AA:BB:CC:DD:EE:FF", b"test data")
|
|
# -> Triggers driver2.on_data_received("11:22:33:44:55:66", b"test data")
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
|
|
# Add src directory to path for imports
|
|
src_path = os.path.join(os.path.dirname(__file__), '..', 'src')
|
|
if src_path not in sys.path:
|
|
sys.path.insert(0, src_path)
|
|
|
|
# Import directly using importlib to bypass RNS namespace conflicts
|
|
# This avoids issues when a real RNS package is installed globally
|
|
import importlib.util
|
|
bluetooth_driver_path = os.path.join(src_path, 'RNS', 'Interfaces', 'bluetooth_driver.py')
|
|
spec = importlib.util.spec_from_file_location("bluetooth_driver", bluetooth_driver_path)
|
|
bluetooth_driver = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(bluetooth_driver)
|
|
|
|
BLEDriverInterface = bluetooth_driver.BLEDriverInterface
|
|
BLEDevice = bluetooth_driver.BLEDevice
|
|
DriverState = bluetooth_driver.DriverState
|
|
from typing import List, Optional, Callable, Dict
|
|
import time
|
|
|
|
|
|
class MockBLEDriver(BLEDriverInterface):
|
|
"""
|
|
Mock BLE driver that simulates Bluetooth behavior for testing.
|
|
"""
|
|
|
|
def __init__(self, local_address: str = "11:22:33:44:55:66"):
|
|
"""
|
|
Initialize the mock driver.
|
|
|
|
Args:
|
|
local_address: Simulated MAC address for this driver
|
|
"""
|
|
self.local_address = local_address
|
|
self._state = DriverState.IDLE
|
|
self._connected_peers: Dict[str, dict] = {} # address -> {role, mtu, identity}
|
|
self._identity: Optional[bytes] = None
|
|
self._service_discovery_delay: float = 0.0 # No delay in mock
|
|
self._power_mode: str = "balanced"
|
|
|
|
# UUIDs (set via start())
|
|
self._service_uuid: Optional[str] = None
|
|
self._rx_char_uuid: Optional[str] = None
|
|
self._tx_char_uuid: Optional[str] = None
|
|
self._identity_char_uuid: Optional[str] = None
|
|
|
|
# Callbacks (assigned by consumer)
|
|
self.on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
|
|
self.on_device_connected: Optional[Callable[[str], None]] = None
|
|
self.on_device_disconnected: Optional[Callable[[str], None]] = None
|
|
self.on_data_received: Optional[Callable[[str, bytes], None]] = None
|
|
self.on_mtu_negotiated: Optional[Callable[[str, int], None]] = None
|
|
self.on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None
|
|
|
|
# Linked driver for bidirectional communication testing
|
|
self._linked_driver: Optional['MockBLEDriver'] = None
|
|
|
|
# Simulated characteristics storage
|
|
self._characteristics: Dict[str, bytes] = {} # char_uuid -> value
|
|
|
|
# Track sent data for assertions
|
|
self.sent_data: List[tuple] = [] # [(address, data), ...]
|
|
|
|
# --- Lifecycle & Configuration ---
|
|
|
|
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
|
|
"""Initialize the mock driver with UUIDs."""
|
|
self._service_uuid = service_uuid
|
|
self._rx_char_uuid = rx_char_uuid
|
|
self._tx_char_uuid = tx_char_uuid
|
|
self._identity_char_uuid = identity_char_uuid
|
|
self._state = DriverState.IDLE
|
|
|
|
def stop(self):
|
|
"""Stop all activity and disconnect all peers."""
|
|
for address in list(self._connected_peers.keys()):
|
|
self.disconnect(address)
|
|
self._state = DriverState.IDLE
|
|
|
|
def set_identity(self, identity_bytes: bytes):
|
|
"""Set the local identity value."""
|
|
self._identity = identity_bytes
|
|
self._characteristics[self._identity_char_uuid] = identity_bytes
|
|
|
|
# --- State & Properties ---
|
|
|
|
@property
|
|
def state(self) -> DriverState:
|
|
"""Return current state."""
|
|
return self._state
|
|
|
|
@property
|
|
def connected_peers(self) -> List[str]:
|
|
"""Return list of connected peer addresses."""
|
|
return list(self._connected_peers.keys())
|
|
|
|
# --- Core Actions ---
|
|
|
|
def start_scanning(self):
|
|
"""Start scanning (simulated)."""
|
|
self._state = DriverState.SCANNING
|
|
|
|
def stop_scanning(self):
|
|
"""Stop scanning."""
|
|
if self._state == DriverState.SCANNING:
|
|
self._state = DriverState.IDLE
|
|
|
|
def start_advertising(self, device_name: str, identity: bytes):
|
|
"""Start advertising (simulated)."""
|
|
self._identity = identity
|
|
self._characteristics[self._identity_char_uuid] = identity
|
|
self._state = DriverState.ADVERTISING
|
|
|
|
def stop_advertising(self):
|
|
"""Stop advertising."""
|
|
if self._state == DriverState.ADVERTISING:
|
|
self._state = DriverState.IDLE
|
|
|
|
def connect(self, address: str):
|
|
"""
|
|
Simulate connecting to a peer (central role).
|
|
|
|
If a linked driver is set and its address matches, establishes
|
|
a bidirectional connection.
|
|
"""
|
|
if address in self._connected_peers:
|
|
return # Already connected
|
|
|
|
# Simulate connection with default MTU
|
|
self._connected_peers[address] = {
|
|
"role": "central",
|
|
"mtu": 185, # Default MTU
|
|
"identity": None
|
|
}
|
|
|
|
# Trigger callback
|
|
if self.on_device_connected:
|
|
self.on_device_connected(address)
|
|
|
|
# Trigger MTU negotiation callback
|
|
if self.on_mtu_negotiated:
|
|
self.on_mtu_negotiated(address, 185)
|
|
|
|
# If linked driver exists and address matches, establish reverse connection
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
self._linked_driver._accept_connection(self.local_address)
|
|
|
|
def _accept_connection(self, address: str):
|
|
"""
|
|
Internal: Accept incoming connection (peripheral role).
|
|
Called by linked driver when it connects to us.
|
|
"""
|
|
if address in self._connected_peers:
|
|
return
|
|
|
|
self._connected_peers[address] = {
|
|
"role": "peripheral",
|
|
"mtu": 185,
|
|
"identity": None
|
|
}
|
|
|
|
if self.on_device_connected:
|
|
self.on_device_connected(address)
|
|
|
|
if self.on_mtu_negotiated:
|
|
self.on_mtu_negotiated(address, 185)
|
|
|
|
def disconnect(self, address: str):
|
|
"""Disconnect from a peer."""
|
|
if address not in self._connected_peers:
|
|
return
|
|
|
|
# Remove peer
|
|
role = self._connected_peers[address]["role"]
|
|
del self._connected_peers[address]
|
|
|
|
# Trigger callback
|
|
if self.on_device_disconnected:
|
|
self.on_device_disconnected(address)
|
|
|
|
# If linked, trigger disconnect on other side
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
if role == "central":
|
|
self._linked_driver._handle_disconnect(self.local_address)
|
|
else:
|
|
self._linked_driver._handle_disconnect(self.local_address)
|
|
|
|
def _handle_disconnect(self, address: str):
|
|
"""Internal: Handle disconnection initiated by peer."""
|
|
if address not in self._connected_peers:
|
|
return
|
|
|
|
del self._connected_peers[address]
|
|
|
|
if self.on_device_disconnected:
|
|
self.on_device_disconnected(address)
|
|
|
|
def send(self, address: str, data: bytes):
|
|
"""
|
|
Send data to a connected peer.
|
|
|
|
Role-aware: automatically routes to linked driver's on_data_received.
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
|
|
# Track for assertions
|
|
self.sent_data.append((address, data))
|
|
|
|
# If linked driver exists, deliver data
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
if self._linked_driver.on_data_received:
|
|
self._linked_driver.on_data_received(self.local_address, data)
|
|
|
|
# --- GATT Characteristic Operations ---
|
|
|
|
def read_characteristic(self, address: str, char_uuid: str) -> bytes:
|
|
"""
|
|
Read a characteristic value from a peer.
|
|
|
|
If linked driver exists, reads from its characteristics.
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
|
|
# If linked driver, read from its characteristics
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
if char_uuid in self._linked_driver._characteristics:
|
|
return self._linked_driver._characteristics[char_uuid]
|
|
else:
|
|
raise KeyError(f"Characteristic {char_uuid} not found")
|
|
else:
|
|
# For testing without linked driver
|
|
if char_uuid in self._characteristics:
|
|
return self._characteristics[char_uuid]
|
|
else:
|
|
raise KeyError(f"Characteristic {char_uuid} not found")
|
|
|
|
def write_characteristic(self, address: str, char_uuid: str, data: bytes):
|
|
"""
|
|
Write a characteristic value to a peer.
|
|
|
|
If linked driver exists, writes to its characteristics.
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
|
|
# If linked driver, write to its characteristics
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
self._linked_driver._characteristics[char_uuid] = data
|
|
else:
|
|
# For testing without linked driver
|
|
self._characteristics[char_uuid] = data
|
|
|
|
def start_notify(self, address: str, char_uuid: str, callback: Callable[[bytes], None]):
|
|
"""
|
|
Subscribe to notifications from a characteristic.
|
|
|
|
In the mock, this is a no-op since data delivery is automatic via send().
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
# In mock, notifications are handled automatically via send()
|
|
pass
|
|
|
|
# --- Configuration & Queries ---
|
|
|
|
def get_local_address(self) -> str:
|
|
"""Return the simulated local MAC address."""
|
|
return self.local_address
|
|
|
|
def set_service_discovery_delay(self, seconds: float):
|
|
"""Set service discovery delay (no-op in mock)."""
|
|
self._service_discovery_delay = seconds
|
|
|
|
def set_power_mode(self, mode: str):
|
|
"""Set power mode (tracked but not enforced in mock)."""
|
|
self._power_mode = mode
|
|
|
|
# --- Test Helper Methods ---
|
|
|
|
def simulate_device_discovered(self, address: str, name: str, rssi: int,
|
|
service_uuids: Optional[List[str]] = None,
|
|
manufacturer_data: Optional[Dict[int, bytes]] = None):
|
|
"""
|
|
Simulate discovering a BLE device.
|
|
|
|
Args:
|
|
address: Device MAC address
|
|
name: Device name
|
|
rssi: Signal strength
|
|
service_uuids: Optional list of advertised service UUIDs
|
|
manufacturer_data: Optional manufacturer data
|
|
"""
|
|
if self._state != DriverState.SCANNING:
|
|
return
|
|
|
|
device = BLEDevice(
|
|
address=address,
|
|
name=name,
|
|
rssi=rssi,
|
|
service_uuids=service_uuids or [],
|
|
manufacturer_data=manufacturer_data or {}
|
|
)
|
|
|
|
if self.on_device_discovered:
|
|
self.on_device_discovered(device)
|
|
|
|
def simulate_mtu_change(self, address: str, new_mtu: int):
|
|
"""
|
|
Simulate MTU renegotiation on an existing connection.
|
|
|
|
Args:
|
|
address: Peer address
|
|
new_mtu: New MTU value
|
|
"""
|
|
if address not in self._connected_peers:
|
|
return
|
|
|
|
self._connected_peers[address]["mtu"] = new_mtu
|
|
|
|
if self.on_mtu_negotiated:
|
|
self.on_mtu_negotiated(address, new_mtu)
|
|
|
|
def simulate_error(self, severity: str, message: str, exception: Optional[Exception] = None):
|
|
"""
|
|
Simulate a platform error.
|
|
|
|
Args:
|
|
severity: "warning" or "error"
|
|
message: Error message
|
|
exception: Optional exception object
|
|
"""
|
|
if self.on_error:
|
|
self.on_error(severity, message, exception)
|
|
|
|
def get_peer_role(self, address: str) -> Optional[str]:
|
|
"""
|
|
Get the connection role for a peer.
|
|
|
|
Args:
|
|
address: Peer address
|
|
|
|
Returns:
|
|
"central" or "peripheral", or None if not connected
|
|
"""
|
|
if address in self._connected_peers:
|
|
return self._connected_peers[address]["role"]
|
|
return None
|
|
|
|
@staticmethod
|
|
def link_drivers(driver1: 'MockBLEDriver', driver2: 'MockBLEDriver'):
|
|
"""
|
|
Link two mock drivers for bidirectional communication.
|
|
|
|
This simulates a pair of BLE devices that can discover, connect,
|
|
and exchange data with each other.
|
|
|
|
Args:
|
|
driver1: First driver
|
|
driver2: Second driver
|
|
"""
|
|
driver1._linked_driver = driver2
|
|
driver2._linked_driver = driver1
|
|
|
|
def reset(self):
|
|
"""Reset the mock driver to initial state (useful between tests)."""
|
|
self.stop()
|
|
self.sent_data.clear()
|
|
self._characteristics.clear()
|
|
self._identity = None
|