Refactor BLEInterface to driver-based architecture

Major architectural refactoring to separate high-level Reticulum protocol
logic from platform-specific Bluetooth operations. This enables code sharing
between pure Python and Android (Columba) implementations, improves
testability, and creates a clean boundary for future platform support.

ARCHITECTURE CHANGES:

1. **Driver Abstraction Layer**
   - Created BLEDriverInterface (bluetooth_driver.py) defining the contract
     for all platform-specific BLE drivers
   - Abstraction includes 18 methods + 6 callbacks for complete BLE lifecycle
   - Enhanced BLEDevice dataclass with service_uuids and manufacturer_data
   - Added on_mtu_negotiated callback for delayed MTU reporting
   - Added on_error callback for consistent platform error reporting

2. **Linux Driver Implementation**
   - Created LinuxBluetoothDriver (linux_bluetooth_driver.py, 1534 lines)
   - Moved ALL bleak/bluezero/D-Bus code from BLEInterface
   - Preserves 5 critical platform workarounds:
     * BlueZ ServicesResolved race condition patch
     * D-Bus LE-only connection (ConnectDevice)
     * BLE Agent registration for Just Works pairing
     * MTU negotiation with 3-method fallback
     * Service discovery delay for bluezero timing
   - Role-aware send() automatically chooses GATT write vs notification
   - Dedicated asyncio event loop management in separate thread
   - Configuration via constructor (no Reticulum dependencies)

3. **Refactored BLEInterface**
   - Removed 801 lines (32.3% reduction: 2479 → 1678 lines)
   - Removed all platform-specific imports (bleak, bluezero, dbus_fast)
   - Removed 9 async methods (moved to driver)
   - Driver dependency injection via constructor
   - Implemented 6 driver callbacks for event handling
   - PRESERVED high-level logic:
     * Peer scoring algorithm (RSSI + history + recency)
     * Connection blacklist with exponential backoff
     * MAC-based connection direction (prevents dual connections)
     * Fragmentation/reassembly orchestration (identity-based keying)
     * Interface spawning per peer

4. **Simplified BLEPeerInterface**
   - Removed connection_type, client, mtu parameters
   - Deleted _send_via_central() and _send_via_peripheral() methods
   - Single send path via driver.send() (driver handles role routing)
   - 77 lines removed from peer interface class

5. **Mock Driver for Testing**
   - Created MockBLEDriver (tests/mock_ble_driver.py)
   - Complete BLEDriverInterface implementation without hardware
   - Bidirectional communication via link_drivers()
   - Enables unit testing of BLEInterface logic (fragmentation, reassembly,
     peer lifecycle, blacklist management)

CRITICAL FIXES:

1. **Restored Periodic Cleanup Task** (CRITICAL: prevents memory leaks)
   - Converted from async (driver-owned loop) to threading.Timer
   - Runs every 30 seconds to clean stale reassembly buffers
   - Essential for long-running instances (Pi Zero with 512MB RAM)
   - Properly cancelled in detach() for clean shutdown

2. **Fixed Naming Consistency**
   - Renamed processOutgoing → process_outgoing (snake_case)

FILES MODIFIED:
- src/RNS/Interfaces/BLEInterface.py (refactored, -801 lines)

FILES ADDED:
- bluetooth_driver.py (driver abstraction interface)
- linux_bluetooth_driver.py (Linux/BlueZ implementation, 1534 lines)
- tests/mock_ble_driver.py (mock driver for unit tests)
- REFACTORING_GUIDE.md (comprehensive refactoring documentation)
- BLE_PROTOCOL_v2.2.md (protocol specification)
- tests/test_refactor_suite.py (initial test suite)

BENEFITS:

1. **Testability** - Mock driver enables hardware-free unit testing
2. **Portability** - Easy to create Android/Windows/macOS drivers
3. **Maintainability** - Platform quirks isolated in single driver file
4. **Code Sharing** - High-level logic shared across all platforms
5. **Clean Architecture** - Clear separation of concerns

TESTING REQUIRED:

- Tier 1 (Unit): Test with MockBLEDriver (fragmentation, reassembly, lifecycle)
- Tier 2 (Integration): Test on Raspberry Pi hardware (scanning, connecting,
  dual mode, MTU negotiation, identity exchange)
- Tier 3 (Regression): Full Reticulum stack (announces, LXMF, multi-hop)
- Tier 4 (Edge Cases): MAC rotation, identity handshake, reconnection,
  reassembly timeout, discovery cache pruning

BACKWARD COMPATIBILITY:

- Configuration: Fully backward compatible (same config parameters)
- Protocol: No changes to BLE wire protocol (v2.2)
- Interface API: Unchanged for Reticulum Transport integration

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
torlando-tech 2025-11-03 23:03:54 -05:00
commit 63064ccf3a
7 changed files with 3809 additions and 1158 deletions

1038
BLE_PROTOCOL_v2.2.md Normal file

File diff suppressed because it is too large Load diff

270
REFACTORING_GUIDE.md Normal file
View file

@ -0,0 +1,270 @@
# Refactoring BLEInterface to a Driver-Based Architecture
## 1. Goal
This guide outlines the process of refactoring the existing `RNS.Interfaces.BLEInterface` to decouple the high-level Reticulum protocol logic from the platform-specific Bluetooth implementation (`bleak`/`bluezero`).
The goal is to create a clean architectural boundary by introducing a `BLEDriverInterface`. The existing `BLEInterface` will be refactored to use this driver, and the Linux-specific `bleak` and `bluezero` code will be moved into a new concrete implementation of this driver, `BleakDriver`.
This will result in a more modular, maintainable, and testable system, and it will make it possible to share the high-level `BLEInterface` code between the pure Python implementation and the Android (Columba) implementation.
## 2. Prerequisites: The Driver Contract
First, create a new file, `RNS/Interfaces/bluetooth_driver.py`, and add the abstract interface definition we designed. This file defines the contract that all platform-specific drivers must follow.
```python
# RNS/Interfaces/bluetooth_driver.py
from abc import ABC, abstractmethod
from typing import List, Optional, Callable
from enum import Enum, auto
from dataclasses import dataclass
# --- Data Structures ---
@dataclass
class BLEDevice:
"""Represents a discovered BLE device."""
address: str
name: str
rssi: int
class DriverState(Enum):
"""Represents the state of the BLE driver."""
IDLE = auto()
SCANNING = auto()
ADVERTISING = auto()
# --- Driver Interface ---
class BLEDriverInterface(ABC):
"""
Abstract interface for a platform-specific BLE driver.
"""
# --- Callbacks ---
on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
on_device_connected: Optional[Callable[[str, int], None]] = None # address, mtu
on_device_disconnected: Optional[Callable[[str], None]] = None # address
on_data_received: Optional[Callable[[str, bytes], None]] = None # address, data
# --- Lifecycle & Configuration ---
@abstractmethod
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
"""
Initializes the driver and its underlying BLE stack.
"""
pass
@abstractmethod
def stop(self):
"""
Stops all BLE activity and releases resources.
"""
pass
@abstractmethod
def set_identity(self, identity_bytes: bytes):
"""
Sets the value of the read-only Identity characteristic for the local GATT server.
"""
pass
# --- State & Properties ---
@property
@abstractmethod
def state(self) -> DriverState:
pass
@property
@abstractmethod
def connected_peers(self) -> List[str]:
pass
# --- Core Actions ---
@abstractmethod
def start_scanning(self):
pass
@abstractmethod
def stop_scanning(self):
pass
@abstractmethod
def start_advertising(self, device_name: str):
pass
@abstractmethod
def stop_advertising(self):
pass
@abstractmethod
def connect(self, address: str):
pass
@abstractmethod
def disconnect(self, address: str):
pass
@abstractmethod
def send(self, address: str, data: bytes):
pass
```
## 3. Step-by-Step Refactoring Guide
### Step 1: Create the `BleakDriver` Implementation
Create a new file, `RNS/Interfaces/bleak_driver.py`. This file will contain the new `BleakDriver` class that implements the `BLEDriverInterface` and encapsulates all `bleak` and `bluezero` code.
```python
# RNS/Interfaces/bleak_driver.py
from .bluetooth_driver import BLEDriverInterface, BLEDevice, DriverState
# Add other necessary imports like bleak, bluezero, asyncio, etc.
class BleakDriver(BLEDriverInterface):
def __init__(self):
# Initialize properties to hold clients, state, etc.
self._state = DriverState.IDLE
self._clients = {} # address -> BleakClient
# ...and so on
# Implement all the abstract methods from the interface here
def start(self, service_uuid, rx_char_uuid, tx_char_uuid, identity_char_uuid):
# Code to initialize bleak and bluezero will go here
pass
def start_scanning(self):
# Code that uses bleak.BleakScanner will go here
pass
def send(self, address, data):
# Code that uses bleak_client.write_gatt_char will go here
pass
# ... etc.
```
### Step 2: Move Platform-Specific Code to `BleakDriver`
Go through the existing `BLEInterface.py` method by method and move any code that directly calls `bleak` or `bluezero` into the corresponding method in your new `BleakDriver` class.
**Example: Moving the `send` logic**
**Before (`BLEInterface.py`):**
```python
# (Inside BLEPeerInterface class)
async def _send_fragment(self, fragment):
# ...
await self.client.write_gatt_char(self.parent.WRITE_CH_UUID, fragment)
# ...
```
**After (`bleak_driver.py`):**
```python
# (Inside BleakDriver class)
async def send(self, address: str, data: bytes):
if address in self._clients:
client = self._clients[address]
try:
# The driver now handles the actual write operation
await client.write_gatt_char(self.rx_char_uuid, data)
except Exception as e:
# Handle exceptions and possibly trigger disconnect
pass
```
### Step 3: Refactor `BLEInterface` to Use the Driver
Modify `BLEInterface.py` to remove all direct dependencies on `bleak` and `bluezero`. Instead, it will be initialized with a driver instance and will use it to perform all BLE operations.
**Example: Refactoring `__init__` and `_send_fragment`**
**Before (`BLEInterface.py`):**
```python
import bleak
from bluezero import peripheral
class BLEInterface(Interface):
def __init__(self, owner, name, ...):
# ... bleak and bluezero objects initialized here
pass
# ... methods with direct bleak/bluezero calls
```
**After (`BLEInterface.py`):**
```python
# No more bleak or bluezero imports!
from .bluetooth_driver import BLEDriverInterface, BLEDevice
class BLEInterface(Interface):
def __init__(self, owner, name, ..., driver: BLEDriverInterface):
super().__init__()
self.driver = driver # Dependency Injection
# Assign callbacks so the driver can report events back to us
self.driver.on_device_discovered = self._device_discovered_callback
self.driver.on_data_received = self._data_received_callback
# ... etc.
# This method no longer needs to be async if the driver's send is blocking
# or if we want to fire-and-forget
def _send_fragment(self, fragment, peer_address):
# High-level logic just tells the driver to send
self.driver.send(peer_address, fragment)
# --- Callback Implementations ---
def _device_discovered_callback(self, device: BLEDevice):
# Logic to handle a discovered device
pass
def _data_received_callback(self, address: str, data: bytes):
# This is where you feed the raw data (a fragment) into the reassembler
pass
```
## 4. Thorough Testing Plan
A multi-layered testing strategy is crucial for a refactor of this scale.
### Tier 1: Unit Testing (Mock Driver)
The biggest advantage of this new architecture is testability. You can now test your entire `BLEInterface` and fragmentation logic without any Bluetooth hardware.
1. **Create a `MockBLEDriver`:**
* Create a `tests/mock_ble_driver.py` file.
* The `MockBLEDriver` class will implement `BLEDriverInterface`.
* Its methods will not use Bluetooth. Instead, they will simulate it. For example, its `send()` method could store the data in a list and immediately trigger the `on_data_received` callback on a paired "virtual" peer's mock driver.
2. **Write `BLEInterface` Unit Tests:**
* Write `pytest` tests that initialize `BLEInterface` with the `MockBLEDriver`.
* **Test Case 1: Fragmentation.** Call `BLEInterface.process_outgoing()` with a large packet. Assert that the `mock_driver.send()` method was called multiple times with correctly fragmented data (correct headers, sequence numbers, etc.).
* **Test Case 2: Reassembly.** Have the `mock_driver` call the `on_data_received` callback with a sequence of fragments. Assert that `BLEInterface` correctly reassembles them and passes the complete packet to `RNS.Transport.inbound`.
* **Test Case 3: Peer Lifecycle.** Simulate device discovery, connection, and disconnection events from the mock driver and assert that `BLEInterface` creates and destroys its internal peer representations correctly.
### Tier 2: Integration Testing (Driver Level)
This tier tests your actual `BleakDriver` implementation against real hardware.
1. **Create Test Scripts:** Write simple Python scripts that use *only* the `BleakDriver`.
2. **Setup:** You will need two machines with Bluetooth, or one machine and your Columba app on an Android device.
3. **Test Cases:**
* **Scanning Test:** Run a script that starts the driver and prints discovered devices. Verify that it finds your other test device.
* **Connection Test:** Write a script to connect to the test device. Verify that the `on_device_connected` callback fires and that `driver.connected_peers` is updated.
* **Data I/O Test:** After connecting, use `driver.send()` to send a simple "hello world" byte string. On the other device, verify that the bytes are received correctly. Test this in both directions.
### Tier 3: End-to-End Testing (Full Stack)
This is the final validation, testing the entire refactored application.
1. **Run Full Application:** Start the full Reticulum application on two Linux machines using the refactored code.
2. **Test Cases:**
* **Announce Exchange:** Verify that the two nodes discover each other and exchange announces. Check the logs for successful path discovery.
* **LXMF Message Transfer:** Use a tool like `lxmf-send` or a simple script to send a message from one node to the other. Verify it is received.
* **Cross-Compatibility Test:** Test interoperability between a refactored pure Python node and your Columba Android application.
By following this guide and testing plan, you can confidently execute the refactor, resulting in a more robust, maintainable, and future-proof architecture for your project.

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,198 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Callable, Dict
from enum import Enum, auto
from dataclasses import dataclass, field
# --- Data Structures ---
@dataclass
class BLEDevice:
"""Represents a discovered BLE device."""
address: str
name: str
rssi: int
service_uuids: List[str] = field(default_factory=list)
manufacturer_data: Dict[int, bytes] = field(default_factory=dict)
class DriverState(Enum):
"""Represents the state of the BLE driver."""
IDLE = auto()
SCANNING = auto()
ADVERTISING = auto()
# Note: More granular states like CONNECTING could be added if the
# high-level logic requires them, but the list of connected peers
# might be sufficient for most use cases.
# --- Driver Interface ---
class BLEDriverInterface(ABC):
"""
Abstract interface for a platform-specific BLE driver.
This contract separates the high-level Reticulum BLE interface logic
from the low-level, platform-specific Bluetooth operations. It is designed
to be implemented by different backend libraries (e.g., bleak/bluezero on Linux,
or a Chaquopy-bridged Kotlin implementation on Android).
The driver is responsible for managing the actual BLE connections, but it
reports events asynchronously via the provided callbacks.
"""
# --- Callbacks ---
# The consumer of this driver (e.g., a high-level BLEInterface) must
# implement and assign these callbacks to receive events from the driver.
on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
on_device_connected: Optional[Callable[[str], None]] = None # address (MTU reported separately)
on_device_disconnected: Optional[Callable[[str], None]] = None # address
on_data_received: Optional[Callable[[str, bytes], None]] = None # address, data
on_mtu_negotiated: Optional[Callable[[str, int], None]] = None # address, mtu
on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None # severity, message, exception
# --- Lifecycle & Configuration ---
@abstractmethod
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
"""
Initializes the driver and its underlying BLE stack. This includes
setting up the GATT server characteristics required for the peripheral role.
This method should be called before any other operations.
"""
pass
@abstractmethod
def stop(self):
"""
Stops all BLE activity (scanning, advertising, connections) and releases all
underlying system resources.
"""
pass
@abstractmethod
def set_identity(self, identity_bytes: bytes):
"""
Sets the value of the read-only Identity characteristic for the local GATT server.
This must be called before starting advertising.
"""
pass
# --- State & Properties ---
@property
@abstractmethod
def state(self) -> DriverState:
"""Returns the current operational state of the driver."""
pass
@property
@abstractmethod
def connected_peers(self) -> List[str]:
"""Returns a list of MAC addresses for all currently connected peers."""
pass
# --- Core Actions ---
@abstractmethod
def start_scanning(self):
"""
Starts scanning for devices advertising the configured service UUID.
Discovered devices will be reported via the on_device_discovered callback.
"""
pass
@abstractmethod
def stop_scanning(self):
"""Stops scanning for devices."""
pass
@abstractmethod
def start_advertising(self, device_name: str, identity: bytes):
"""
Starts advertising the configured service UUID and the given device name.
The identity parameter is used to populate the Identity characteristic.
"""
pass
@abstractmethod
def stop_advertising(self):
"""Stops advertising."""
pass
@abstractmethod
def connect(self, address: str):
"""
Initiates a connection to a peer device (central role).
Connection status is reported via on_device_connected/on_device_disconnected.
"""
pass
@abstractmethod
def disconnect(self, address: str):
"""Disconnects from a peer device."""
pass
@abstractmethod
def send(self, address: str, data: bytes):
"""
Sends data to a connected peer.
The driver implementation is responsible for choosing the correct underlying BLE
operation (GATT Write for central role, or Notification for peripheral role)
based on the current connection type for the given address. This method
should ideally block or be awaitable until the send operation is confirmed
by the BLE stack to ensure sequential transmission.
"""
pass
# --- GATT Characteristic Operations ---
@abstractmethod
def read_characteristic(self, address: str, char_uuid: str) -> bytes:
"""
Reads a GATT characteristic value from a connected peer.
Raises an exception if the operation fails.
"""
pass
@abstractmethod
def write_characteristic(self, address: str, char_uuid: str, data: bytes):
"""
Writes a value to a GATT characteristic on a connected peer.
Raises an exception if the operation fails.
"""
pass
@abstractmethod
def start_notify(self, address: str, char_uuid: str, callback: Callable[[bytes], None]):
"""
Subscribes to notifications from a GATT characteristic on a connected peer.
The callback will be invoked whenever a notification is received.
"""
pass
# --- Configuration & Queries ---
@abstractmethod
def get_local_address(self) -> str:
"""
Returns the MAC address of the local Bluetooth adapter.
Used for connection direction determination (MAC sorting).
"""
pass
@abstractmethod
def set_service_discovery_delay(self, seconds: float):
"""
Sets the delay between connection establishment and service discovery.
This is a workaround for bluezero D-Bus registration timing issues.
"""
pass
@abstractmethod
def set_power_mode(self, mode: str):
"""
Sets the power mode for scanning operations.
Valid modes: "aggressive", "balanced", "saver"
"""
pass

File diff suppressed because it is too large Load diff

392
tests/mock_ble_driver.py Normal file
View file

@ -0,0 +1,392 @@
"""
Mock BLE Driver for Unit Testing
This module provides a mock implementation of BLEDriverInterface that simulates
BLE behavior without requiring actual Bluetooth hardware. It's designed for
unit testing BLEInterface logic including:
- Fragmentation and reassembly
- Peer lifecycle management
- Connection blacklist logic
- MAC-based connection direction
- Error handling
Usage:
# Create two mock drivers to simulate a pair of peers
driver1 = MockBLEDriver()
driver2 = MockBLEDriver()
# Link them to enable bidirectional communication
MockBLEDriver.link_drivers(driver1, driver2)
# Simulate discovery
driver1.simulate_device_discovered("AA:BB:CC:DD:EE:FF", "RNS-Test", -60)
# Simulate connection
driver1.connect("AA:BB:CC:DD:EE:FF")
# Simulate data transfer
driver1.send("AA:BB:CC:DD:EE:FF", b"test data")
# -> Triggers driver2.on_data_received("11:22:33:44:55:66", b"test data")
"""
import sys
import os
# Add src directory to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from RNS.Interfaces.bluetooth_driver import BLEDriverInterface, BLEDevice, DriverState
from typing import List, Optional, Callable, Dict
import time
class MockBLEDriver(BLEDriverInterface):
"""
Mock BLE driver that simulates Bluetooth behavior for testing.
"""
def __init__(self, local_address: str = "11:22:33:44:55:66"):
"""
Initialize the mock driver.
Args:
local_address: Simulated MAC address for this driver
"""
self.local_address = local_address
self._state = DriverState.IDLE
self._connected_peers: Dict[str, dict] = {} # address -> {role, mtu, identity}
self._identity: Optional[bytes] = None
self._service_discovery_delay: float = 0.0 # No delay in mock
self._power_mode: str = "balanced"
# UUIDs (set via start())
self._service_uuid: Optional[str] = None
self._rx_char_uuid: Optional[str] = None
self._tx_char_uuid: Optional[str] = None
self._identity_char_uuid: Optional[str] = None
# Callbacks (assigned by consumer)
self.on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
self.on_device_connected: Optional[Callable[[str], None]] = None
self.on_device_disconnected: Optional[Callable[[str], None]] = None
self.on_data_received: Optional[Callable[[str, bytes], None]] = None
self.on_mtu_negotiated: Optional[Callable[[str, int], None]] = None
self.on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None
# Linked driver for bidirectional communication testing
self._linked_driver: Optional['MockBLEDriver'] = None
# Simulated characteristics storage
self._characteristics: Dict[str, bytes] = {} # char_uuid -> value
# Track sent data for assertions
self.sent_data: List[tuple] = [] # [(address, data), ...]
# --- Lifecycle & Configuration ---
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
"""Initialize the mock driver with UUIDs."""
self._service_uuid = service_uuid
self._rx_char_uuid = rx_char_uuid
self._tx_char_uuid = tx_char_uuid
self._identity_char_uuid = identity_char_uuid
self._state = DriverState.IDLE
def stop(self):
"""Stop all activity and disconnect all peers."""
for address in list(self._connected_peers.keys()):
self.disconnect(address)
self._state = DriverState.IDLE
def set_identity(self, identity_bytes: bytes):
"""Set the local identity value."""
self._identity = identity_bytes
self._characteristics[self._identity_char_uuid] = identity_bytes
# --- State & Properties ---
@property
def state(self) -> DriverState:
"""Return current state."""
return self._state
@property
def connected_peers(self) -> List[str]:
"""Return list of connected peer addresses."""
return list(self._connected_peers.keys())
# --- Core Actions ---
def start_scanning(self):
"""Start scanning (simulated)."""
self._state = DriverState.SCANNING
def stop_scanning(self):
"""Stop scanning."""
if self._state == DriverState.SCANNING:
self._state = DriverState.IDLE
def start_advertising(self, device_name: str, identity: bytes):
"""Start advertising (simulated)."""
self._identity = identity
self._characteristics[self._identity_char_uuid] = identity
self._state = DriverState.ADVERTISING
def stop_advertising(self):
"""Stop advertising."""
if self._state == DriverState.ADVERTISING:
self._state = DriverState.IDLE
def connect(self, address: str):
"""
Simulate connecting to a peer (central role).
If a linked driver is set and its address matches, establishes
a bidirectional connection.
"""
if address in self._connected_peers:
return # Already connected
# Simulate connection with default MTU
self._connected_peers[address] = {
"role": "central",
"mtu": 185, # Default MTU
"identity": None
}
# Trigger callback
if self.on_device_connected:
self.on_device_connected(address)
# Trigger MTU negotiation callback
if self.on_mtu_negotiated:
self.on_mtu_negotiated(address, 185)
# If linked driver exists and address matches, establish reverse connection
if self._linked_driver and self._linked_driver.local_address == address:
self._linked_driver._accept_connection(self.local_address)
def _accept_connection(self, address: str):
"""
Internal: Accept incoming connection (peripheral role).
Called by linked driver when it connects to us.
"""
if address in self._connected_peers:
return
self._connected_peers[address] = {
"role": "peripheral",
"mtu": 185,
"identity": None
}
if self.on_device_connected:
self.on_device_connected(address)
if self.on_mtu_negotiated:
self.on_mtu_negotiated(address, 185)
def disconnect(self, address: str):
"""Disconnect from a peer."""
if address not in self._connected_peers:
return
# Remove peer
role = self._connected_peers[address]["role"]
del self._connected_peers[address]
# Trigger callback
if self.on_device_disconnected:
self.on_device_disconnected(address)
# If linked, trigger disconnect on other side
if self._linked_driver and self._linked_driver.local_address == address:
if role == "central":
self._linked_driver._handle_disconnect(self.local_address)
else:
self._linked_driver._handle_disconnect(self.local_address)
def _handle_disconnect(self, address: str):
"""Internal: Handle disconnection initiated by peer."""
if address not in self._connected_peers:
return
del self._connected_peers[address]
if self.on_device_disconnected:
self.on_device_disconnected(address)
def send(self, address: str, data: bytes):
"""
Send data to a connected peer.
Role-aware: automatically routes to linked driver's on_data_received.
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# Track for assertions
self.sent_data.append((address, data))
# If linked driver exists, deliver data
if self._linked_driver and self._linked_driver.local_address == address:
if self._linked_driver.on_data_received:
self._linked_driver.on_data_received(self.local_address, data)
# --- GATT Characteristic Operations ---
def read_characteristic(self, address: str, char_uuid: str) -> bytes:
"""
Read a characteristic value from a peer.
If linked driver exists, reads from its characteristics.
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# If linked driver, read from its characteristics
if self._linked_driver and self._linked_driver.local_address == address:
if char_uuid in self._linked_driver._characteristics:
return self._linked_driver._characteristics[char_uuid]
else:
raise KeyError(f"Characteristic {char_uuid} not found")
else:
# For testing without linked driver
if char_uuid in self._characteristics:
return self._characteristics[char_uuid]
else:
raise KeyError(f"Characteristic {char_uuid} not found")
def write_characteristic(self, address: str, char_uuid: str, data: bytes):
"""
Write a characteristic value to a peer.
If linked driver exists, writes to its characteristics.
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# If linked driver, write to its characteristics
if self._linked_driver and self._linked_driver.local_address == address:
self._linked_driver._characteristics[char_uuid] = data
else:
# For testing without linked driver
self._characteristics[char_uuid] = data
def start_notify(self, address: str, char_uuid: str, callback: Callable[[bytes], None]):
"""
Subscribe to notifications from a characteristic.
In the mock, this is a no-op since data delivery is automatic via send().
"""
if address not in self._connected_peers:
raise ConnectionError(f"Not connected to {address}")
# In mock, notifications are handled automatically via send()
pass
# --- Configuration & Queries ---
def get_local_address(self) -> str:
"""Return the simulated local MAC address."""
return self.local_address
def set_service_discovery_delay(self, seconds: float):
"""Set service discovery delay (no-op in mock)."""
self._service_discovery_delay = seconds
def set_power_mode(self, mode: str):
"""Set power mode (tracked but not enforced in mock)."""
self._power_mode = mode
# --- Test Helper Methods ---
def simulate_device_discovered(self, address: str, name: str, rssi: int,
service_uuids: Optional[List[str]] = None,
manufacturer_data: Optional[Dict[int, bytes]] = None):
"""
Simulate discovering a BLE device.
Args:
address: Device MAC address
name: Device name
rssi: Signal strength
service_uuids: Optional list of advertised service UUIDs
manufacturer_data: Optional manufacturer data
"""
if self._state != DriverState.SCANNING:
return
device = BLEDevice(
address=address,
name=name,
rssi=rssi,
service_uuids=service_uuids or [],
manufacturer_data=manufacturer_data or {}
)
if self.on_device_discovered:
self.on_device_discovered(device)
def simulate_mtu_change(self, address: str, new_mtu: int):
"""
Simulate MTU renegotiation on an existing connection.
Args:
address: Peer address
new_mtu: New MTU value
"""
if address not in self._connected_peers:
return
self._connected_peers[address]["mtu"] = new_mtu
if self.on_mtu_negotiated:
self.on_mtu_negotiated(address, new_mtu)
def simulate_error(self, severity: str, message: str, exception: Optional[Exception] = None):
"""
Simulate a platform error.
Args:
severity: "warning" or "error"
message: Error message
exception: Optional exception object
"""
if self.on_error:
self.on_error(severity, message, exception)
def get_peer_role(self, address: str) -> Optional[str]:
"""
Get the connection role for a peer.
Args:
address: Peer address
Returns:
"central" or "peripheral", or None if not connected
"""
if address in self._connected_peers:
return self._connected_peers[address]["role"]
return None
@staticmethod
def link_drivers(driver1: 'MockBLEDriver', driver2: 'MockBLEDriver'):
"""
Link two mock drivers for bidirectional communication.
This simulates a pair of BLE devices that can discover, connect,
and exchange data with each other.
Args:
driver1: First driver
driver2: Second driver
"""
driver1._linked_driver = driver2
driver2._linked_driver = driver1
def reset(self):
"""Reset the mock driver to initial state (useful between tests)."""
self.stop()
self.sent_data.clear()
self._characteristics.clear()
self._identity = None

View file

@ -0,0 +1,62 @@
import pytest
import asyncio
import os
import sys
# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, project_root)
from src.RNS.Interfaces.BLEInterface import BLEInterface
class MockReticulum:
def __init__(self):
self.transport_enabled = False
self.is_connected_to_shared_instance = False
def register_interface(self, interface):
pass
class MockOwner:
def __init__(self):
self.reticulum = MockReticulum()
@pytest.mark.asyncio
async def test_two_device_communication():
"""
Tests a basic two-device communication scenario where one device acts as a
peripheral and the other as a central.
"""
# Create mock owner and configuration for the peripheral device
peripheral_owner = MockOwner()
peripheral_config = {
'name': 'PeripheralInterface',
'enable_central': False,
'enable_peripheral': True,
'device_name': 'TestPeripheral',
}
# Create mock owner and configuration for the central device
central_owner = MockOwner()
central_config = {
'name': 'CentralInterface',
'enable_central': True,
'enable_peripheral': False,
}
# Create the peripheral and central interfaces
peripheral_interface = BLEInterface(peripheral_owner, peripheral_config)
central_interface = BLEInterface(central_owner, central_config)
# Allow some time for the interfaces to start and for discovery to happen
await asyncio.sleep(10)
# Check that the central has discovered and connected to the peripheral
assert len(central_interface.peers) > 0, "Central did not connect to any peers"
# TODO: Add assertions to verify data exchange
# Clean up
await peripheral_interface.stop()
await central_interface.stop()