Refactor BLEInterface to driver-based architecture
Major architectural refactoring to separate high-level Reticulum protocol
logic from platform-specific Bluetooth operations. This enables code sharing
between pure Python and Android (Columba) implementations, improves
testability, and creates a clean boundary for future platform support.
ARCHITECTURE CHANGES:
1. **Driver Abstraction Layer**
- Created BLEDriverInterface (bluetooth_driver.py) defining the contract
for all platform-specific BLE drivers
- Abstraction includes 18 methods + 6 callbacks for complete BLE lifecycle
- Enhanced BLEDevice dataclass with service_uuids and manufacturer_data
- Added on_mtu_negotiated callback for delayed MTU reporting
- Added on_error callback for consistent platform error reporting
2. **Linux Driver Implementation**
- Created LinuxBluetoothDriver (linux_bluetooth_driver.py, 1534 lines)
- Moved ALL bleak/bluezero/D-Bus code from BLEInterface
- Preserves 5 critical platform workarounds:
* BlueZ ServicesResolved race condition patch
* D-Bus LE-only connection (ConnectDevice)
* BLE Agent registration for Just Works pairing
* MTU negotiation with 3-method fallback
* Service discovery delay for bluezero timing
- Role-aware send() automatically chooses GATT write vs notification
- Dedicated asyncio event loop management in separate thread
- Configuration via constructor (no Reticulum dependencies)
3. **Refactored BLEInterface**
- Removed 801 lines (32.3% reduction: 2479 → 1678 lines)
- Removed all platform-specific imports (bleak, bluezero, dbus_fast)
- Removed 9 async methods (moved to driver)
- Driver dependency injection via constructor
- Implemented 6 driver callbacks for event handling
- PRESERVED high-level logic:
* Peer scoring algorithm (RSSI + history + recency)
* Connection blacklist with exponential backoff
* MAC-based connection direction (prevents dual connections)
* Fragmentation/reassembly orchestration (identity-based keying)
* Interface spawning per peer
4. **Simplified BLEPeerInterface**
- Removed connection_type, client, mtu parameters
- Deleted _send_via_central() and _send_via_peripheral() methods
- Single send path via driver.send() (driver handles role routing)
- 77 lines removed from peer interface class
5. **Mock Driver for Testing**
- Created MockBLEDriver (tests/mock_ble_driver.py)
- Complete BLEDriverInterface implementation without hardware
- Bidirectional communication via link_drivers()
- Enables unit testing of BLEInterface logic (fragmentation, reassembly,
peer lifecycle, blacklist management)
CRITICAL FIXES:
1. **Restored Periodic Cleanup Task** (CRITICAL: prevents memory leaks)
- Converted from async (driver-owned loop) to threading.Timer
- Runs every 30 seconds to clean stale reassembly buffers
- Essential for long-running instances (Pi Zero with 512MB RAM)
- Properly cancelled in detach() for clean shutdown
2. **Fixed Naming Consistency**
- Renamed processOutgoing → process_outgoing (snake_case)
FILES MODIFIED:
- src/RNS/Interfaces/BLEInterface.py (refactored, -801 lines)
FILES ADDED:
- bluetooth_driver.py (driver abstraction interface)
- linux_bluetooth_driver.py (Linux/BlueZ implementation, 1534 lines)
- tests/mock_ble_driver.py (mock driver for unit tests)
- REFACTORING_GUIDE.md (comprehensive refactoring documentation)
- BLE_PROTOCOL_v2.2.md (protocol specification)
- tests/test_refactor_suite.py (initial test suite)
BENEFITS:
1. **Testability** - Mock driver enables hardware-free unit testing
2. **Portability** - Easy to create Android/Windows/macOS drivers
3. **Maintainability** - Platform quirks isolated in single driver file
4. **Code Sharing** - High-level logic shared across all platforms
5. **Clean Architecture** - Clear separation of concerns
TESTING REQUIRED:
- Tier 1 (Unit): Test with MockBLEDriver (fragmentation, reassembly, lifecycle)
- Tier 2 (Integration): Test on Raspberry Pi hardware (scanning, connecting,
dual mode, MTU negotiation, identity exchange)
- Tier 3 (Regression): Full Reticulum stack (announces, LXMF, multi-hop)
- Tier 4 (Edge Cases): MAC rotation, identity handshake, reconnection,
reassembly timeout, discovery cache pruning
BACKWARD COMPATIBILITY:
- Configuration: Fully backward compatible (same config parameters)
- Protocol: No changes to BLE wire protocol (v2.2)
- Interface API: Unchanged for Reticulum Transport integration
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
9e5e564340
commit
4eaa9a4173
7 changed files with 3809 additions and 1158 deletions
1038
BLE_PROTOCOL_v2.2.md
Normal file
1038
BLE_PROTOCOL_v2.2.md
Normal file
File diff suppressed because it is too large
Load diff
270
REFACTORING_GUIDE.md
Normal file
270
REFACTORING_GUIDE.md
Normal file
|
|
@ -0,0 +1,270 @@
|
|||
# Refactoring BLEInterface to a Driver-Based Architecture
|
||||
|
||||
## 1. Goal
|
||||
|
||||
This guide outlines the process of refactoring the existing `RNS.Interfaces.BLEInterface` to decouple the high-level Reticulum protocol logic from the platform-specific Bluetooth implementation (`bleak`/`bluezero`).
|
||||
|
||||
The goal is to create a clean architectural boundary by introducing a `BLEDriverInterface`. The existing `BLEInterface` will be refactored to use this driver, and the Linux-specific `bleak` and `bluezero` code will be moved into a new concrete implementation of this driver, `BleakDriver`.
|
||||
|
||||
This will result in a more modular, maintainable, and testable system, and it will make it possible to share the high-level `BLEInterface` code between the pure Python implementation and the Android (Columba) implementation.
|
||||
|
||||
## 2. Prerequisites: The Driver Contract
|
||||
|
||||
First, create a new file, `RNS/Interfaces/bluetooth_driver.py`, and add the abstract interface definition we designed. This file defines the contract that all platform-specific drivers must follow.
|
||||
|
||||
```python
|
||||
# RNS/Interfaces/bluetooth_driver.py
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional, Callable
|
||||
from enum import Enum, auto
|
||||
from dataclasses import dataclass
|
||||
|
||||
# --- Data Structures ---
|
||||
|
||||
@dataclass
|
||||
class BLEDevice:
|
||||
"""Represents a discovered BLE device."""
|
||||
address: str
|
||||
name: str
|
||||
rssi: int
|
||||
|
||||
class DriverState(Enum):
|
||||
"""Represents the state of the BLE driver."""
|
||||
IDLE = auto()
|
||||
SCANNING = auto()
|
||||
ADVERTISING = auto()
|
||||
|
||||
# --- Driver Interface ---
|
||||
|
||||
class BLEDriverInterface(ABC):
|
||||
"""
|
||||
Abstract interface for a platform-specific BLE driver.
|
||||
"""
|
||||
|
||||
# --- Callbacks ---
|
||||
on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
|
||||
on_device_connected: Optional[Callable[[str, int], None]] = None # address, mtu
|
||||
on_device_disconnected: Optional[Callable[[str], None]] = None # address
|
||||
on_data_received: Optional[Callable[[str, bytes], None]] = None # address, data
|
||||
|
||||
# --- Lifecycle & Configuration ---
|
||||
|
||||
@abstractmethod
|
||||
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
|
||||
"""
|
||||
Initializes the driver and its underlying BLE stack.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop(self):
|
||||
"""
|
||||
Stops all BLE activity and releases resources.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_identity(self, identity_bytes: bytes):
|
||||
"""
|
||||
Sets the value of the read-only Identity characteristic for the local GATT server.
|
||||
"""
|
||||
pass
|
||||
|
||||
# --- State & Properties ---
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def state(self) -> DriverState:
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def connected_peers(self) -> List[str]:
|
||||
pass
|
||||
|
||||
# --- Core Actions ---
|
||||
|
||||
@abstractmethod
|
||||
def start_scanning(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop_scanning(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start_advertising(self, device_name: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop_advertising(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def connect(self, address: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def disconnect(self, address: str):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def send(self, address: str, data: bytes):
|
||||
pass
|
||||
```
|
||||
|
||||
## 3. Step-by-Step Refactoring Guide
|
||||
|
||||
### Step 1: Create the `BleakDriver` Implementation
|
||||
|
||||
Create a new file, `RNS/Interfaces/bleak_driver.py`. This file will contain the new `BleakDriver` class that implements the `BLEDriverInterface` and encapsulates all `bleak` and `bluezero` code.
|
||||
|
||||
```python
|
||||
# RNS/Interfaces/bleak_driver.py
|
||||
|
||||
from .bluetooth_driver import BLEDriverInterface, BLEDevice, DriverState
|
||||
# Add other necessary imports like bleak, bluezero, asyncio, etc.
|
||||
|
||||
class BleakDriver(BLEDriverInterface):
|
||||
def __init__(self):
|
||||
# Initialize properties to hold clients, state, etc.
|
||||
self._state = DriverState.IDLE
|
||||
self._clients = {} # address -> BleakClient
|
||||
# ...and so on
|
||||
|
||||
# Implement all the abstract methods from the interface here
|
||||
def start(self, service_uuid, rx_char_uuid, tx_char_uuid, identity_char_uuid):
|
||||
# Code to initialize bleak and bluezero will go here
|
||||
pass
|
||||
|
||||
def start_scanning(self):
|
||||
# Code that uses bleak.BleakScanner will go here
|
||||
pass
|
||||
|
||||
def send(self, address, data):
|
||||
# Code that uses bleak_client.write_gatt_char will go here
|
||||
pass
|
||||
|
||||
# ... etc.
|
||||
```
|
||||
|
||||
### Step 2: Move Platform-Specific Code to `BleakDriver`
|
||||
|
||||
Go through the existing `BLEInterface.py` method by method and move any code that directly calls `bleak` or `bluezero` into the corresponding method in your new `BleakDriver` class.
|
||||
|
||||
**Example: Moving the `send` logic**
|
||||
|
||||
**Before (`BLEInterface.py`):**
|
||||
```python
|
||||
# (Inside BLEPeerInterface class)
|
||||
async def _send_fragment(self, fragment):
|
||||
# ...
|
||||
await self.client.write_gatt_char(self.parent.WRITE_CH_UUID, fragment)
|
||||
# ...
|
||||
```
|
||||
|
||||
**After (`bleak_driver.py`):**
|
||||
```python
|
||||
# (Inside BleakDriver class)
|
||||
async def send(self, address: str, data: bytes):
|
||||
if address in self._clients:
|
||||
client = self._clients[address]
|
||||
try:
|
||||
# The driver now handles the actual write operation
|
||||
await client.write_gatt_char(self.rx_char_uuid, data)
|
||||
except Exception as e:
|
||||
# Handle exceptions and possibly trigger disconnect
|
||||
pass
|
||||
```
|
||||
|
||||
### Step 3: Refactor `BLEInterface` to Use the Driver
|
||||
|
||||
Modify `BLEInterface.py` to remove all direct dependencies on `bleak` and `bluezero`. Instead, it will be initialized with a driver instance and will use it to perform all BLE operations.
|
||||
|
||||
**Example: Refactoring `__init__` and `_send_fragment`**
|
||||
|
||||
**Before (`BLEInterface.py`):**
|
||||
```python
|
||||
import bleak
|
||||
from bluezero import peripheral
|
||||
|
||||
class BLEInterface(Interface):
|
||||
def __init__(self, owner, name, ...):
|
||||
# ... bleak and bluezero objects initialized here
|
||||
pass
|
||||
|
||||
# ... methods with direct bleak/bluezero calls
|
||||
```
|
||||
|
||||
**After (`BLEInterface.py`):**
|
||||
```python
|
||||
# No more bleak or bluezero imports!
|
||||
from .bluetooth_driver import BLEDriverInterface, BLEDevice
|
||||
|
||||
class BLEInterface(Interface):
|
||||
def __init__(self, owner, name, ..., driver: BLEDriverInterface):
|
||||
super().__init__()
|
||||
self.driver = driver # Dependency Injection
|
||||
|
||||
# Assign callbacks so the driver can report events back to us
|
||||
self.driver.on_device_discovered = self._device_discovered_callback
|
||||
self.driver.on_data_received = self._data_received_callback
|
||||
# ... etc.
|
||||
|
||||
# This method no longer needs to be async if the driver's send is blocking
|
||||
# or if we want to fire-and-forget
|
||||
def _send_fragment(self, fragment, peer_address):
|
||||
# High-level logic just tells the driver to send
|
||||
self.driver.send(peer_address, fragment)
|
||||
|
||||
# --- Callback Implementations ---
|
||||
def _device_discovered_callback(self, device: BLEDevice):
|
||||
# Logic to handle a discovered device
|
||||
pass
|
||||
|
||||
def _data_received_callback(self, address: str, data: bytes):
|
||||
# This is where you feed the raw data (a fragment) into the reassembler
|
||||
pass
|
||||
```
|
||||
|
||||
## 4. Thorough Testing Plan
|
||||
|
||||
A multi-layered testing strategy is crucial for a refactor of this scale.
|
||||
|
||||
### Tier 1: Unit Testing (Mock Driver)
|
||||
|
||||
The biggest advantage of this new architecture is testability. You can now test your entire `BLEInterface` and fragmentation logic without any Bluetooth hardware.
|
||||
|
||||
1. **Create a `MockBLEDriver`:**
|
||||
* Create a `tests/mock_ble_driver.py` file.
|
||||
* The `MockBLEDriver` class will implement `BLEDriverInterface`.
|
||||
* Its methods will not use Bluetooth. Instead, they will simulate it. For example, its `send()` method could store the data in a list and immediately trigger the `on_data_received` callback on a paired "virtual" peer's mock driver.
|
||||
2. **Write `BLEInterface` Unit Tests:**
|
||||
* Write `pytest` tests that initialize `BLEInterface` with the `MockBLEDriver`.
|
||||
* **Test Case 1: Fragmentation.** Call `BLEInterface.process_outgoing()` with a large packet. Assert that the `mock_driver.send()` method was called multiple times with correctly fragmented data (correct headers, sequence numbers, etc.).
|
||||
* **Test Case 2: Reassembly.** Have the `mock_driver` call the `on_data_received` callback with a sequence of fragments. Assert that `BLEInterface` correctly reassembles them and passes the complete packet to `RNS.Transport.inbound`.
|
||||
* **Test Case 3: Peer Lifecycle.** Simulate device discovery, connection, and disconnection events from the mock driver and assert that `BLEInterface` creates and destroys its internal peer representations correctly.
|
||||
|
||||
### Tier 2: Integration Testing (Driver Level)
|
||||
|
||||
This tier tests your actual `BleakDriver` implementation against real hardware.
|
||||
|
||||
1. **Create Test Scripts:** Write simple Python scripts that use *only* the `BleakDriver`.
|
||||
2. **Setup:** You will need two machines with Bluetooth, or one machine and your Columba app on an Android device.
|
||||
3. **Test Cases:**
|
||||
* **Scanning Test:** Run a script that starts the driver and prints discovered devices. Verify that it finds your other test device.
|
||||
* **Connection Test:** Write a script to connect to the test device. Verify that the `on_device_connected` callback fires and that `driver.connected_peers` is updated.
|
||||
* **Data I/O Test:** After connecting, use `driver.send()` to send a simple "hello world" byte string. On the other device, verify that the bytes are received correctly. Test this in both directions.
|
||||
|
||||
### Tier 3: End-to-End Testing (Full Stack)
|
||||
|
||||
This is the final validation, testing the entire refactored application.
|
||||
|
||||
1. **Run Full Application:** Start the full Reticulum application on two Linux machines using the refactored code.
|
||||
2. **Test Cases:**
|
||||
* **Announce Exchange:** Verify that the two nodes discover each other and exchange announces. Check the logs for successful path discovery.
|
||||
* **LXMF Message Transfer:** Use a tool like `lxmf-send` or a simple script to send a message from one node to the other. Verify it is received.
|
||||
* **Cross-Compatibility Test:** Test interoperability between a refactored pure Python node and your Columba Android application.
|
||||
|
||||
By following this guide and testing plan, you can confidently execute the refactor, resulting in a more robust, maintainable, and future-proof architecture for your project.
|
||||
File diff suppressed because it is too large
Load diff
198
src/RNS/Interfaces/bluetooth_driver.py
Normal file
198
src/RNS/Interfaces/bluetooth_driver.py
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional, Callable, Dict
|
||||
from enum import Enum, auto
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
# --- Data Structures ---
|
||||
|
||||
@dataclass
|
||||
class BLEDevice:
|
||||
"""Represents a discovered BLE device."""
|
||||
address: str
|
||||
name: str
|
||||
rssi: int
|
||||
service_uuids: List[str] = field(default_factory=list)
|
||||
manufacturer_data: Dict[int, bytes] = field(default_factory=dict)
|
||||
|
||||
class DriverState(Enum):
|
||||
"""Represents the state of the BLE driver."""
|
||||
IDLE = auto()
|
||||
SCANNING = auto()
|
||||
ADVERTISING = auto()
|
||||
# Note: More granular states like CONNECTING could be added if the
|
||||
# high-level logic requires them, but the list of connected peers
|
||||
# might be sufficient for most use cases.
|
||||
|
||||
# --- Driver Interface ---
|
||||
|
||||
class BLEDriverInterface(ABC):
|
||||
"""
|
||||
Abstract interface for a platform-specific BLE driver.
|
||||
|
||||
This contract separates the high-level Reticulum BLE interface logic
|
||||
from the low-level, platform-specific Bluetooth operations. It is designed
|
||||
to be implemented by different backend libraries (e.g., bleak/bluezero on Linux,
|
||||
or a Chaquopy-bridged Kotlin implementation on Android).
|
||||
|
||||
The driver is responsible for managing the actual BLE connections, but it
|
||||
reports events asynchronously via the provided callbacks.
|
||||
"""
|
||||
|
||||
# --- Callbacks ---
|
||||
# The consumer of this driver (e.g., a high-level BLEInterface) must
|
||||
# implement and assign these callbacks to receive events from the driver.
|
||||
|
||||
on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
|
||||
on_device_connected: Optional[Callable[[str], None]] = None # address (MTU reported separately)
|
||||
on_device_disconnected: Optional[Callable[[str], None]] = None # address
|
||||
on_data_received: Optional[Callable[[str, bytes], None]] = None # address, data
|
||||
on_mtu_negotiated: Optional[Callable[[str, int], None]] = None # address, mtu
|
||||
on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None # severity, message, exception
|
||||
|
||||
# --- Lifecycle & Configuration ---
|
||||
|
||||
@abstractmethod
|
||||
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
|
||||
"""
|
||||
Initializes the driver and its underlying BLE stack. This includes
|
||||
setting up the GATT server characteristics required for the peripheral role.
|
||||
This method should be called before any other operations.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop(self):
|
||||
"""
|
||||
Stops all BLE activity (scanning, advertising, connections) and releases all
|
||||
underlying system resources.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_identity(self, identity_bytes: bytes):
|
||||
"""
|
||||
Sets the value of the read-only Identity characteristic for the local GATT server.
|
||||
This must be called before starting advertising.
|
||||
"""
|
||||
pass
|
||||
|
||||
# --- State & Properties ---
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def state(self) -> DriverState:
|
||||
"""Returns the current operational state of the driver."""
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def connected_peers(self) -> List[str]:
|
||||
"""Returns a list of MAC addresses for all currently connected peers."""
|
||||
pass
|
||||
|
||||
# --- Core Actions ---
|
||||
|
||||
@abstractmethod
|
||||
def start_scanning(self):
|
||||
"""
|
||||
Starts scanning for devices advertising the configured service UUID.
|
||||
Discovered devices will be reported via the on_device_discovered callback.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop_scanning(self):
|
||||
"""Stops scanning for devices."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start_advertising(self, device_name: str, identity: bytes):
|
||||
"""
|
||||
Starts advertising the configured service UUID and the given device name.
|
||||
The identity parameter is used to populate the Identity characteristic.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop_advertising(self):
|
||||
"""Stops advertising."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def connect(self, address: str):
|
||||
"""
|
||||
Initiates a connection to a peer device (central role).
|
||||
Connection status is reported via on_device_connected/on_device_disconnected.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def disconnect(self, address: str):
|
||||
"""Disconnects from a peer device."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def send(self, address: str, data: bytes):
|
||||
"""
|
||||
Sends data to a connected peer.
|
||||
|
||||
The driver implementation is responsible for choosing the correct underlying BLE
|
||||
operation (GATT Write for central role, or Notification for peripheral role)
|
||||
based on the current connection type for the given address. This method
|
||||
should ideally block or be awaitable until the send operation is confirmed
|
||||
by the BLE stack to ensure sequential transmission.
|
||||
"""
|
||||
pass
|
||||
|
||||
# --- GATT Characteristic Operations ---
|
||||
|
||||
@abstractmethod
|
||||
def read_characteristic(self, address: str, char_uuid: str) -> bytes:
|
||||
"""
|
||||
Reads a GATT characteristic value from a connected peer.
|
||||
Raises an exception if the operation fails.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def write_characteristic(self, address: str, char_uuid: str, data: bytes):
|
||||
"""
|
||||
Writes a value to a GATT characteristic on a connected peer.
|
||||
Raises an exception if the operation fails.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start_notify(self, address: str, char_uuid: str, callback: Callable[[bytes], None]):
|
||||
"""
|
||||
Subscribes to notifications from a GATT characteristic on a connected peer.
|
||||
The callback will be invoked whenever a notification is received.
|
||||
"""
|
||||
pass
|
||||
|
||||
# --- Configuration & Queries ---
|
||||
|
||||
@abstractmethod
|
||||
def get_local_address(self) -> str:
|
||||
"""
|
||||
Returns the MAC address of the local Bluetooth adapter.
|
||||
Used for connection direction determination (MAC sorting).
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_service_discovery_delay(self, seconds: float):
|
||||
"""
|
||||
Sets the delay between connection establishment and service discovery.
|
||||
This is a workaround for bluezero D-Bus registration timing issues.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_power_mode(self, mode: str):
|
||||
"""
|
||||
Sets the power mode for scanning operations.
|
||||
Valid modes: "aggressive", "balanced", "saver"
|
||||
"""
|
||||
pass
|
||||
1534
src/RNS/Interfaces/linux_bluetooth_driver.py
Normal file
1534
src/RNS/Interfaces/linux_bluetooth_driver.py
Normal file
File diff suppressed because it is too large
Load diff
392
tests/mock_ble_driver.py
Normal file
392
tests/mock_ble_driver.py
Normal file
|
|
@ -0,0 +1,392 @@
|
|||
"""
|
||||
Mock BLE Driver for Unit Testing
|
||||
|
||||
This module provides a mock implementation of BLEDriverInterface that simulates
|
||||
BLE behavior without requiring actual Bluetooth hardware. It's designed for
|
||||
unit testing BLEInterface logic including:
|
||||
|
||||
- Fragmentation and reassembly
|
||||
- Peer lifecycle management
|
||||
- Connection blacklist logic
|
||||
- MAC-based connection direction
|
||||
- Error handling
|
||||
|
||||
Usage:
|
||||
# Create two mock drivers to simulate a pair of peers
|
||||
driver1 = MockBLEDriver()
|
||||
driver2 = MockBLEDriver()
|
||||
|
||||
# Link them to enable bidirectional communication
|
||||
MockBLEDriver.link_drivers(driver1, driver2)
|
||||
|
||||
# Simulate discovery
|
||||
driver1.simulate_device_discovered("AA:BB:CC:DD:EE:FF", "RNS-Test", -60)
|
||||
|
||||
# Simulate connection
|
||||
driver1.connect("AA:BB:CC:DD:EE:FF")
|
||||
|
||||
# Simulate data transfer
|
||||
driver1.send("AA:BB:CC:DD:EE:FF", b"test data")
|
||||
# -> Triggers driver2.on_data_received("11:22:33:44:55:66", b"test data")
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
# Add src directory to path for imports
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
||||
|
||||
from RNS.Interfaces.bluetooth_driver import BLEDriverInterface, BLEDevice, DriverState
|
||||
from typing import List, Optional, Callable, Dict
|
||||
import time
|
||||
|
||||
|
||||
class MockBLEDriver(BLEDriverInterface):
|
||||
"""
|
||||
Mock BLE driver that simulates Bluetooth behavior for testing.
|
||||
"""
|
||||
|
||||
def __init__(self, local_address: str = "11:22:33:44:55:66"):
|
||||
"""
|
||||
Initialize the mock driver.
|
||||
|
||||
Args:
|
||||
local_address: Simulated MAC address for this driver
|
||||
"""
|
||||
self.local_address = local_address
|
||||
self._state = DriverState.IDLE
|
||||
self._connected_peers: Dict[str, dict] = {} # address -> {role, mtu, identity}
|
||||
self._identity: Optional[bytes] = None
|
||||
self._service_discovery_delay: float = 0.0 # No delay in mock
|
||||
self._power_mode: str = "balanced"
|
||||
|
||||
# UUIDs (set via start())
|
||||
self._service_uuid: Optional[str] = None
|
||||
self._rx_char_uuid: Optional[str] = None
|
||||
self._tx_char_uuid: Optional[str] = None
|
||||
self._identity_char_uuid: Optional[str] = None
|
||||
|
||||
# Callbacks (assigned by consumer)
|
||||
self.on_device_discovered: Optional[Callable[[BLEDevice], None]] = None
|
||||
self.on_device_connected: Optional[Callable[[str], None]] = None
|
||||
self.on_device_disconnected: Optional[Callable[[str], None]] = None
|
||||
self.on_data_received: Optional[Callable[[str, bytes], None]] = None
|
||||
self.on_mtu_negotiated: Optional[Callable[[str, int], None]] = None
|
||||
self.on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None
|
||||
|
||||
# Linked driver for bidirectional communication testing
|
||||
self._linked_driver: Optional['MockBLEDriver'] = None
|
||||
|
||||
# Simulated characteristics storage
|
||||
self._characteristics: Dict[str, bytes] = {} # char_uuid -> value
|
||||
|
||||
# Track sent data for assertions
|
||||
self.sent_data: List[tuple] = [] # [(address, data), ...]
|
||||
|
||||
# --- Lifecycle & Configuration ---
|
||||
|
||||
def start(self, service_uuid: str, rx_char_uuid: str, tx_char_uuid: str, identity_char_uuid: str):
|
||||
"""Initialize the mock driver with UUIDs."""
|
||||
self._service_uuid = service_uuid
|
||||
self._rx_char_uuid = rx_char_uuid
|
||||
self._tx_char_uuid = tx_char_uuid
|
||||
self._identity_char_uuid = identity_char_uuid
|
||||
self._state = DriverState.IDLE
|
||||
|
||||
def stop(self):
|
||||
"""Stop all activity and disconnect all peers."""
|
||||
for address in list(self._connected_peers.keys()):
|
||||
self.disconnect(address)
|
||||
self._state = DriverState.IDLE
|
||||
|
||||
def set_identity(self, identity_bytes: bytes):
|
||||
"""Set the local identity value."""
|
||||
self._identity = identity_bytes
|
||||
self._characteristics[self._identity_char_uuid] = identity_bytes
|
||||
|
||||
# --- State & Properties ---
|
||||
|
||||
@property
|
||||
def state(self) -> DriverState:
|
||||
"""Return current state."""
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def connected_peers(self) -> List[str]:
|
||||
"""Return list of connected peer addresses."""
|
||||
return list(self._connected_peers.keys())
|
||||
|
||||
# --- Core Actions ---
|
||||
|
||||
def start_scanning(self):
|
||||
"""Start scanning (simulated)."""
|
||||
self._state = DriverState.SCANNING
|
||||
|
||||
def stop_scanning(self):
|
||||
"""Stop scanning."""
|
||||
if self._state == DriverState.SCANNING:
|
||||
self._state = DriverState.IDLE
|
||||
|
||||
def start_advertising(self, device_name: str, identity: bytes):
|
||||
"""Start advertising (simulated)."""
|
||||
self._identity = identity
|
||||
self._characteristics[self._identity_char_uuid] = identity
|
||||
self._state = DriverState.ADVERTISING
|
||||
|
||||
def stop_advertising(self):
|
||||
"""Stop advertising."""
|
||||
if self._state == DriverState.ADVERTISING:
|
||||
self._state = DriverState.IDLE
|
||||
|
||||
def connect(self, address: str):
|
||||
"""
|
||||
Simulate connecting to a peer (central role).
|
||||
|
||||
If a linked driver is set and its address matches, establishes
|
||||
a bidirectional connection.
|
||||
"""
|
||||
if address in self._connected_peers:
|
||||
return # Already connected
|
||||
|
||||
# Simulate connection with default MTU
|
||||
self._connected_peers[address] = {
|
||||
"role": "central",
|
||||
"mtu": 185, # Default MTU
|
||||
"identity": None
|
||||
}
|
||||
|
||||
# Trigger callback
|
||||
if self.on_device_connected:
|
||||
self.on_device_connected(address)
|
||||
|
||||
# Trigger MTU negotiation callback
|
||||
if self.on_mtu_negotiated:
|
||||
self.on_mtu_negotiated(address, 185)
|
||||
|
||||
# If linked driver exists and address matches, establish reverse connection
|
||||
if self._linked_driver and self._linked_driver.local_address == address:
|
||||
self._linked_driver._accept_connection(self.local_address)
|
||||
|
||||
def _accept_connection(self, address: str):
|
||||
"""
|
||||
Internal: Accept incoming connection (peripheral role).
|
||||
Called by linked driver when it connects to us.
|
||||
"""
|
||||
if address in self._connected_peers:
|
||||
return
|
||||
|
||||
self._connected_peers[address] = {
|
||||
"role": "peripheral",
|
||||
"mtu": 185,
|
||||
"identity": None
|
||||
}
|
||||
|
||||
if self.on_device_connected:
|
||||
self.on_device_connected(address)
|
||||
|
||||
if self.on_mtu_negotiated:
|
||||
self.on_mtu_negotiated(address, 185)
|
||||
|
||||
def disconnect(self, address: str):
|
||||
"""Disconnect from a peer."""
|
||||
if address not in self._connected_peers:
|
||||
return
|
||||
|
||||
# Remove peer
|
||||
role = self._connected_peers[address]["role"]
|
||||
del self._connected_peers[address]
|
||||
|
||||
# Trigger callback
|
||||
if self.on_device_disconnected:
|
||||
self.on_device_disconnected(address)
|
||||
|
||||
# If linked, trigger disconnect on other side
|
||||
if self._linked_driver and self._linked_driver.local_address == address:
|
||||
if role == "central":
|
||||
self._linked_driver._handle_disconnect(self.local_address)
|
||||
else:
|
||||
self._linked_driver._handle_disconnect(self.local_address)
|
||||
|
||||
def _handle_disconnect(self, address: str):
|
||||
"""Internal: Handle disconnection initiated by peer."""
|
||||
if address not in self._connected_peers:
|
||||
return
|
||||
|
||||
del self._connected_peers[address]
|
||||
|
||||
if self.on_device_disconnected:
|
||||
self.on_device_disconnected(address)
|
||||
|
||||
def send(self, address: str, data: bytes):
|
||||
"""
|
||||
Send data to a connected peer.
|
||||
|
||||
Role-aware: automatically routes to linked driver's on_data_received.
|
||||
"""
|
||||
if address not in self._connected_peers:
|
||||
raise ConnectionError(f"Not connected to {address}")
|
||||
|
||||
# Track for assertions
|
||||
self.sent_data.append((address, data))
|
||||
|
||||
# If linked driver exists, deliver data
|
||||
if self._linked_driver and self._linked_driver.local_address == address:
|
||||
if self._linked_driver.on_data_received:
|
||||
self._linked_driver.on_data_received(self.local_address, data)
|
||||
|
||||
# --- GATT Characteristic Operations ---
|
||||
|
||||
def read_characteristic(self, address: str, char_uuid: str) -> bytes:
|
||||
"""
|
||||
Read a characteristic value from a peer.
|
||||
|
||||
If linked driver exists, reads from its characteristics.
|
||||
"""
|
||||
if address not in self._connected_peers:
|
||||
raise ConnectionError(f"Not connected to {address}")
|
||||
|
||||
# If linked driver, read from its characteristics
|
||||
if self._linked_driver and self._linked_driver.local_address == address:
|
||||
if char_uuid in self._linked_driver._characteristics:
|
||||
return self._linked_driver._characteristics[char_uuid]
|
||||
else:
|
||||
raise KeyError(f"Characteristic {char_uuid} not found")
|
||||
else:
|
||||
# For testing without linked driver
|
||||
if char_uuid in self._characteristics:
|
||||
return self._characteristics[char_uuid]
|
||||
else:
|
||||
raise KeyError(f"Characteristic {char_uuid} not found")
|
||||
|
||||
def write_characteristic(self, address: str, char_uuid: str, data: bytes):
|
||||
"""
|
||||
Write a characteristic value to a peer.
|
||||
|
||||
If linked driver exists, writes to its characteristics.
|
||||
"""
|
||||
if address not in self._connected_peers:
|
||||
raise ConnectionError(f"Not connected to {address}")
|
||||
|
||||
# If linked driver, write to its characteristics
|
||||
if self._linked_driver and self._linked_driver.local_address == address:
|
||||
self._linked_driver._characteristics[char_uuid] = data
|
||||
else:
|
||||
# For testing without linked driver
|
||||
self._characteristics[char_uuid] = data
|
||||
|
||||
def start_notify(self, address: str, char_uuid: str, callback: Callable[[bytes], None]):
|
||||
"""
|
||||
Subscribe to notifications from a characteristic.
|
||||
|
||||
In the mock, this is a no-op since data delivery is automatic via send().
|
||||
"""
|
||||
if address not in self._connected_peers:
|
||||
raise ConnectionError(f"Not connected to {address}")
|
||||
# In mock, notifications are handled automatically via send()
|
||||
pass
|
||||
|
||||
# --- Configuration & Queries ---
|
||||
|
||||
def get_local_address(self) -> str:
|
||||
"""Return the simulated local MAC address."""
|
||||
return self.local_address
|
||||
|
||||
def set_service_discovery_delay(self, seconds: float):
|
||||
"""Set service discovery delay (no-op in mock)."""
|
||||
self._service_discovery_delay = seconds
|
||||
|
||||
def set_power_mode(self, mode: str):
|
||||
"""Set power mode (tracked but not enforced in mock)."""
|
||||
self._power_mode = mode
|
||||
|
||||
# --- Test Helper Methods ---
|
||||
|
||||
def simulate_device_discovered(self, address: str, name: str, rssi: int,
|
||||
service_uuids: Optional[List[str]] = None,
|
||||
manufacturer_data: Optional[Dict[int, bytes]] = None):
|
||||
"""
|
||||
Simulate discovering a BLE device.
|
||||
|
||||
Args:
|
||||
address: Device MAC address
|
||||
name: Device name
|
||||
rssi: Signal strength
|
||||
service_uuids: Optional list of advertised service UUIDs
|
||||
manufacturer_data: Optional manufacturer data
|
||||
"""
|
||||
if self._state != DriverState.SCANNING:
|
||||
return
|
||||
|
||||
device = BLEDevice(
|
||||
address=address,
|
||||
name=name,
|
||||
rssi=rssi,
|
||||
service_uuids=service_uuids or [],
|
||||
manufacturer_data=manufacturer_data or {}
|
||||
)
|
||||
|
||||
if self.on_device_discovered:
|
||||
self.on_device_discovered(device)
|
||||
|
||||
def simulate_mtu_change(self, address: str, new_mtu: int):
|
||||
"""
|
||||
Simulate MTU renegotiation on an existing connection.
|
||||
|
||||
Args:
|
||||
address: Peer address
|
||||
new_mtu: New MTU value
|
||||
"""
|
||||
if address not in self._connected_peers:
|
||||
return
|
||||
|
||||
self._connected_peers[address]["mtu"] = new_mtu
|
||||
|
||||
if self.on_mtu_negotiated:
|
||||
self.on_mtu_negotiated(address, new_mtu)
|
||||
|
||||
def simulate_error(self, severity: str, message: str, exception: Optional[Exception] = None):
|
||||
"""
|
||||
Simulate a platform error.
|
||||
|
||||
Args:
|
||||
severity: "warning" or "error"
|
||||
message: Error message
|
||||
exception: Optional exception object
|
||||
"""
|
||||
if self.on_error:
|
||||
self.on_error(severity, message, exception)
|
||||
|
||||
def get_peer_role(self, address: str) -> Optional[str]:
|
||||
"""
|
||||
Get the connection role for a peer.
|
||||
|
||||
Args:
|
||||
address: Peer address
|
||||
|
||||
Returns:
|
||||
"central" or "peripheral", or None if not connected
|
||||
"""
|
||||
if address in self._connected_peers:
|
||||
return self._connected_peers[address]["role"]
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def link_drivers(driver1: 'MockBLEDriver', driver2: 'MockBLEDriver'):
|
||||
"""
|
||||
Link two mock drivers for bidirectional communication.
|
||||
|
||||
This simulates a pair of BLE devices that can discover, connect,
|
||||
and exchange data with each other.
|
||||
|
||||
Args:
|
||||
driver1: First driver
|
||||
driver2: Second driver
|
||||
"""
|
||||
driver1._linked_driver = driver2
|
||||
driver2._linked_driver = driver1
|
||||
|
||||
def reset(self):
|
||||
"""Reset the mock driver to initial state (useful between tests)."""
|
||||
self.stop()
|
||||
self.sent_data.clear()
|
||||
self._characteristics.clear()
|
||||
self._identity = None
|
||||
62
tests/test_refactor_suite.py
Normal file
62
tests/test_refactor_suite.py
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
|
||||
import pytest
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Add the project root to the Python path
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||
sys.path.insert(0, project_root)
|
||||
|
||||
from src.RNS.Interfaces.BLEInterface import BLEInterface
|
||||
|
||||
class MockReticulum:
|
||||
def __init__(self):
|
||||
self.transport_enabled = False
|
||||
self.is_connected_to_shared_instance = False
|
||||
|
||||
def register_interface(self, interface):
|
||||
pass
|
||||
|
||||
class MockOwner:
|
||||
def __init__(self):
|
||||
self.reticulum = MockReticulum()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_two_device_communication():
|
||||
"""
|
||||
Tests a basic two-device communication scenario where one device acts as a
|
||||
peripheral and the other as a central.
|
||||
"""
|
||||
# Create mock owner and configuration for the peripheral device
|
||||
peripheral_owner = MockOwner()
|
||||
peripheral_config = {
|
||||
'name': 'PeripheralInterface',
|
||||
'enable_central': False,
|
||||
'enable_peripheral': True,
|
||||
'device_name': 'TestPeripheral',
|
||||
}
|
||||
|
||||
# Create mock owner and configuration for the central device
|
||||
central_owner = MockOwner()
|
||||
central_config = {
|
||||
'name': 'CentralInterface',
|
||||
'enable_central': True,
|
||||
'enable_peripheral': False,
|
||||
}
|
||||
|
||||
# Create the peripheral and central interfaces
|
||||
peripheral_interface = BLEInterface(peripheral_owner, peripheral_config)
|
||||
central_interface = BLEInterface(central_owner, central_config)
|
||||
|
||||
# Allow some time for the interfaces to start and for discovery to happen
|
||||
await asyncio.sleep(10)
|
||||
|
||||
# Check that the central has discovered and connected to the peripheral
|
||||
assert len(central_interface.peers) > 0, "Central did not connect to any peers"
|
||||
|
||||
# TODO: Add assertions to verify data exchange
|
||||
|
||||
# Clean up
|
||||
await peripheral_interface.stop()
|
||||
await central_interface.stop()
|
||||
Loading…
Add table
Add a link
Reference in a new issue