- Fix mock_ble_driver.py import path (src/RNS/Interfaces -> src/ble_reticulum) - Add address_to_interface, _pending_detach, _pending_detach_grace_period to test fixture - Update test_identity_cache to expect grace period behavior (deferred cleanup) - Update test_v2_2_mac_sorting to use renamed _cleanup_stale_address function 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
405 lines
14 KiB
Python
405 lines
14 KiB
Python
"""
|
|
Mock BLE Driver for Unit Testing
|
|
|
|
This module provides a mock implementation of BLEDriverInterface that simulates
|
|
BLE behavior without requiring actual Bluetooth hardware. It's designed for
|
|
unit testing BLEInterface logic including:
|
|
|
|
- Fragmentation and reassembly
|
|
- Peer lifecycle management
|
|
- Connection blacklist logic
|
|
- MAC-based connection direction
|
|
- Error handling
|
|
|
|
Usage:
|
|
# Create two mock drivers to simulate a pair of peers
|
|
driver1 = MockBLEDriver()
|
|
driver2 = MockBLEDriver()
|
|
|
|
# Link them to enable bidirectional communication
|
|
MockBLEDriver.link_drivers(driver1, driver2)
|
|
|
|
# Simulate discovery
|
|
driver1.simulate_device_discovered("AA:BB:CC:DD:EE:FF", "RNS-Test", -60)
|
|
|
|
# Simulate connection
|
|
driver1.connect("AA:BB:CC:DD:EE:FF")
|
|
|
|
# Simulate data transfer
|
|
driver1.send("AA:BB:CC:DD:EE:FF", b"test data")
|
|
# -> Triggers driver2.on_data_received("11:22:33:44:55:66", b"test data")
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
|
|
# Add src directory to path for imports
|
|
src_path = os.path.join(os.path.dirname(__file__), '..', 'src')
|
|
if src_path not in sys.path:
|
|
sys.path.insert(0, src_path)
|
|
|
|
# Import directly using importlib to bypass RNS namespace conflicts
|
|
# This avoids issues when a real RNS package is installed globally
|
|
import importlib.util
|
|
bluetooth_driver_path = os.path.join(src_path, 'ble_reticulum', 'bluetooth_driver.py')
|
|
spec = importlib.util.spec_from_file_location("bluetooth_driver", bluetooth_driver_path)
|
|
bluetooth_driver = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(bluetooth_driver)
|
|
|
|
BLEDriverInterface = bluetooth_driver.BLEDriverInterface
|
|
BLEDevice = bluetooth_driver.BLEDevice
|
|
DriverState = bluetooth_driver.DriverState
|
|
from typing import List, Optional, Callable, Dict
|
|
import time
|
|
|
|
|
|
class MockBLEDriver(BLEDriverInterface):
|
|
"""
|
|
Mock BLE driver that simulates Bluetooth behavior for testing.
|
|
"""
|
|
|
|
def __init__(self, local_address: str = "11:22:33:44:55:66"):
|
|
"""
|
|
Initialize the mock driver.
|
|
|
|
Args:
|
|
local_address: Simulated MAC address for this driver
|
|
"""
|
|
self.local_address = local_address
|
|
self._state = DriverState.IDLE
|
|
self._connected_peers: Dict[str, dict] = {} # address -> {role, mtu, identity}
|
|
self._identity: Optional[bytes] = None
|
|
self._service_discovery_delay: float = 0.0 # No delay in mock
|
|
self._power_mode: str = "balanced"
|
|
|
|
# UUIDs (set via start())
|
|
self._service_uuid: Optional[str] = None
|
|
self._rx_char_uuid: Optional[str] = None
|
|
self._tx_char_uuid: Optional[str] = None
|
|
self._identity_char_uuid: Optional[str] = None
|
|
|
|
# Callbacks (assigned by consumer)
|
|
self.on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
|
|
self.on_device_connected: Optional[Callable[[str], None]] = None
|
|
self.on_device_disconnected: Optional[Callable[[str], None]] = None
|
|
self.on_data_received: Optional[Callable[[str, bytes], None]] = None
|
|
self.on_mtu_negotiated: Optional[Callable[[str, int], None]] = None
|
|
self.on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None
|
|
|
|
# Linked driver for bidirectional communication testing
|
|
self._linked_driver: Optional['MockBLEDriver'] = None
|
|
|
|
# Simulated characteristics storage
|
|
self._characteristics: Dict[str, bytes] = {} # char_uuid -> value
|
|
|
|
# Track sent data for assertions
|
|
self.sent_data: List[tuple] = [] # [(address, data), ...]
|
|
|
|
# --- Lifecycle & Configuration ---
|
|
|
|
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
|
|
"""Initialize the mock driver with UUIDs."""
|
|
self._service_uuid = service_uuid
|
|
self._rx_char_uuid = rx_char_uuid
|
|
self._tx_char_uuid = tx_char_uuid
|
|
self._identity_char_uuid = identity_char_uuid
|
|
self._state = DriverState.IDLE
|
|
|
|
def stop(self):
|
|
"""Stop all activity and disconnect all peers."""
|
|
for address in list(self._connected_peers.keys()):
|
|
self.disconnect(address)
|
|
self._state = DriverState.IDLE
|
|
|
|
def set_identity(self, identity_bytes: bytes):
|
|
"""Set the local identity value."""
|
|
self._identity = identity_bytes
|
|
self._characteristics[self._identity_char_uuid] = identity_bytes
|
|
|
|
# --- State & Properties ---
|
|
|
|
@property
|
|
def state(self) -> DriverState:
|
|
"""Return current state."""
|
|
return self._state
|
|
|
|
@property
|
|
def connected_peers(self) -> List[str]:
|
|
"""Return list of connected peer addresses."""
|
|
return list(self._connected_peers.keys())
|
|
|
|
# --- Core Actions ---
|
|
|
|
def start_scanning(self):
|
|
"""Start scanning (simulated)."""
|
|
self._state = DriverState.SCANNING
|
|
|
|
def stop_scanning(self):
|
|
"""Stop scanning."""
|
|
if self._state == DriverState.SCANNING:
|
|
self._state = DriverState.IDLE
|
|
|
|
def start_advertising(self, device_name: str, identity: bytes):
|
|
"""Start advertising (simulated)."""
|
|
self._identity = identity
|
|
self._characteristics[self._identity_char_uuid] = identity
|
|
self._state = DriverState.ADVERTISING
|
|
|
|
def stop_advertising(self):
|
|
"""Stop advertising."""
|
|
if self._state == DriverState.ADVERTISING:
|
|
self._state = DriverState.IDLE
|
|
|
|
def connect(self, address: str):
|
|
"""
|
|
Simulate connecting to a peer (central role).
|
|
|
|
If a linked driver is set and its address matches, establishes
|
|
a bidirectional connection.
|
|
"""
|
|
if address in self._connected_peers:
|
|
return # Already connected
|
|
|
|
# Simulate connection with default MTU
|
|
self._connected_peers[address] = {
|
|
"role": "central",
|
|
"mtu": 185, # Default MTU
|
|
"identity": None
|
|
}
|
|
|
|
# Trigger callback
|
|
if self.on_device_connected:
|
|
self.on_device_connected(address)
|
|
|
|
# Trigger MTU negotiation callback
|
|
if self.on_mtu_negotiated:
|
|
self.on_mtu_negotiated(address, 185)
|
|
|
|
# If linked driver exists and address matches, establish reverse connection
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
self._linked_driver._accept_connection(self.local_address)
|
|
|
|
def _accept_connection(self, address: str):
|
|
"""
|
|
Internal: Accept incoming connection (peripheral role).
|
|
Called by linked driver when it connects to us.
|
|
"""
|
|
if address in self._connected_peers:
|
|
return
|
|
|
|
self._connected_peers[address] = {
|
|
"role": "peripheral",
|
|
"mtu": 185,
|
|
"identity": None
|
|
}
|
|
|
|
if self.on_device_connected:
|
|
self.on_device_connected(address)
|
|
|
|
if self.on_mtu_negotiated:
|
|
self.on_mtu_negotiated(address, 185)
|
|
|
|
def disconnect(self, address: str):
|
|
"""Disconnect from a peer."""
|
|
if address not in self._connected_peers:
|
|
return
|
|
|
|
# Remove peer
|
|
role = self._connected_peers[address]["role"]
|
|
del self._connected_peers[address]
|
|
|
|
# Trigger callback
|
|
if self.on_device_disconnected:
|
|
self.on_device_disconnected(address)
|
|
|
|
# If linked, trigger disconnect on other side
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
if role == "central":
|
|
self._linked_driver._handle_disconnect(self.local_address)
|
|
else:
|
|
self._linked_driver._handle_disconnect(self.local_address)
|
|
|
|
def _handle_disconnect(self, address: str):
|
|
"""Internal: Handle disconnection initiated by peer."""
|
|
if address not in self._connected_peers:
|
|
return
|
|
|
|
del self._connected_peers[address]
|
|
|
|
if self.on_device_disconnected:
|
|
self.on_device_disconnected(address)
|
|
|
|
def send(self, address: str, data: bytes):
|
|
"""
|
|
Send data to a connected peer.
|
|
|
|
Role-aware: automatically routes to linked driver's on_data_received.
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
|
|
# Track for assertions
|
|
self.sent_data.append((address, data))
|
|
|
|
# If linked driver exists, deliver data
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
if self._linked_driver.on_data_received:
|
|
self._linked_driver.on_data_received(self.local_address, data)
|
|
|
|
# --- GATT Characteristic Operations ---
|
|
|
|
def read_characteristic(self, address: str, char_uuid: str) -> bytes:
|
|
"""
|
|
Read a characteristic value from a peer.
|
|
|
|
If linked driver exists, reads from its characteristics.
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
|
|
# If linked driver, read from its characteristics
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
if char_uuid in self._linked_driver._characteristics:
|
|
return self._linked_driver._characteristics[char_uuid]
|
|
else:
|
|
raise KeyError(f"Characteristic {char_uuid} not found")
|
|
else:
|
|
# For testing without linked driver
|
|
if char_uuid in self._characteristics:
|
|
return self._characteristics[char_uuid]
|
|
else:
|
|
raise KeyError(f"Characteristic {char_uuid} not found")
|
|
|
|
def write_characteristic(self, address: str, char_uuid: str, data: bytes):
|
|
"""
|
|
Write a characteristic value to a peer.
|
|
|
|
If linked driver exists, writes to its characteristics.
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
|
|
# If linked driver, write to its characteristics
|
|
if self._linked_driver and self._linked_driver.local_address == address:
|
|
self._linked_driver._characteristics[char_uuid] = data
|
|
else:
|
|
# For testing without linked driver
|
|
self._characteristics[char_uuid] = data
|
|
|
|
def start_notify(self, address: str, char_uuid: str, callback: Callable[[bytes], None]):
|
|
"""
|
|
Subscribe to notifications from a characteristic.
|
|
|
|
In the mock, this is a no-op since data delivery is automatic via send().
|
|
"""
|
|
if address not in self._connected_peers:
|
|
raise ConnectionError(f"Not connected to {address}")
|
|
# In mock, notifications are handled automatically via send()
|
|
pass
|
|
|
|
# --- Configuration & Queries ---
|
|
|
|
def get_local_address(self) -> str:
|
|
"""Return the simulated local MAC address."""
|
|
return self.local_address
|
|
|
|
def set_service_discovery_delay(self, seconds: float):
|
|
"""Set service discovery delay (no-op in mock)."""
|
|
self._service_discovery_delay = seconds
|
|
|
|
def set_power_mode(self, mode: str):
|
|
"""Set power mode (tracked but not enforced in mock)."""
|
|
self._power_mode = mode
|
|
|
|
# --- Test Helper Methods ---
|
|
|
|
def simulate_device_discovered(self, address: str, name: str, rssi: int,
|
|
service_uuids: Optional[List[str]] = None,
|
|
manufacturer_data: Optional[Dict[int, bytes]] = None):
|
|
"""
|
|
Simulate discovering a BLE device.
|
|
|
|
Args:
|
|
address: Device MAC address
|
|
name: Device name
|
|
rssi: Signal strength
|
|
service_uuids: Optional list of advertised service UUIDs
|
|
manufacturer_data: Optional manufacturer data
|
|
"""
|
|
if self._state != DriverState.SCANNING:
|
|
return
|
|
|
|
device = BLEDevice(
|
|
address=address,
|
|
name=name,
|
|
rssi=rssi,
|
|
service_uuids=service_uuids or [],
|
|
manufacturer_data=manufacturer_data or {}
|
|
)
|
|
|
|
if self.on_device_discovered:
|
|
self.on_device_discovered(device)
|
|
|
|
def simulate_mtu_change(self, address: str, new_mtu: int):
|
|
"""
|
|
Simulate MTU renegotiation on an existing connection.
|
|
|
|
Args:
|
|
address: Peer address
|
|
new_mtu: New MTU value
|
|
"""
|
|
if address not in self._connected_peers:
|
|
return
|
|
|
|
self._connected_peers[address]["mtu"] = new_mtu
|
|
|
|
if self.on_mtu_negotiated:
|
|
self.on_mtu_negotiated(address, new_mtu)
|
|
|
|
def simulate_error(self, severity: str, message: str, exception: Optional[Exception] = None):
|
|
"""
|
|
Simulate a platform error.
|
|
|
|
Args:
|
|
severity: "warning" or "error"
|
|
message: Error message
|
|
exception: Optional exception object
|
|
"""
|
|
if self.on_error:
|
|
self.on_error(severity, message, exception)
|
|
|
|
def get_peer_role(self, address: str) -> Optional[str]:
|
|
"""
|
|
Get the connection role for a peer.
|
|
|
|
Args:
|
|
address: Peer address
|
|
|
|
Returns:
|
|
"central" or "peripheral", or None if not connected
|
|
"""
|
|
if address in self._connected_peers:
|
|
return self._connected_peers[address]["role"]
|
|
return None
|
|
|
|
@staticmethod
|
|
def link_drivers(driver1: 'MockBLEDriver', driver2: 'MockBLEDriver'):
|
|
"""
|
|
Link two mock drivers for bidirectional communication.
|
|
|
|
This simulates a pair of BLE devices that can discover, connect,
|
|
and exchange data with each other.
|
|
|
|
Args:
|
|
driver1: First driver
|
|
driver2: Second driver
|
|
"""
|
|
driver1._linked_driver = driver2
|
|
driver2._linked_driver = driver1
|
|
|
|
def reset(self):
|
|
"""Reset the mock driver to initial state (useful between tests)."""
|
|
self.stop()
|
|
self.sent_data.clear()
|
|
self._characteristics.clear()
|
|
self._identity = None
|