Merge pull request #35 from torlando-tech/fix/ble-peer-interface-cleanup

fix(ble): prevent interface/fragmenter loss during MAC rotation
This commit is contained in:
Torlando 2026-01-01 19:50:56 -05:00 committed by GitHub
commit ac003b6c17
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1329 additions and 108 deletions

230
BLE_PROTOCOL_v0.3.0.md Normal file
View file

@ -0,0 +1,230 @@
# BLE-Reticulum Protocol Specification v0.3.0
**Version**: 0.3.0
**Date**: December 2025
**Status**: Draft
**Backwards Compatible With**: v2.2
## 1. Overview
This document specifies the v0.3.0 extension to the BLE-Reticulum protocol. This version adds **capability advertisement** to support devices that can only operate in peripheral mode (e.g., ESP32-S3).
### 1.1 Problem Statement
The v2.2 protocol uses MAC address sorting to determine connection direction: the device with the numerically lower MAC address initiates the connection (acts as BLE central). However, some hardware platforms (notably ESP32-S3) cannot reliably operate as BLE central due to stack limitations.
When such a device has a lower MAC address than a peer, neither device initiates a connection:
- The peripheral-only device cannot initiate (hardware limitation)
- The peer waits for the lower-MAC device to initiate (per v2.2 protocol)
### 1.2 Solution
v0.3.0 introduces **capability flags** in the advertising packet via BLE manufacturer-specific data. Devices advertise their role capabilities, allowing the connection direction logic to be overridden when one device is peripheral-only.
## 2. Manufacturer-Specific Data Format
### 2.1 Advertising Data Structure
v0.3.0 devices include manufacturer-specific data in their advertising packet:
```
AD Type: 0xFF (Manufacturer Specific Data)
Length: 5 bytes (1 type + 4 data)
Data Format (4 bytes):
┌─────────┬─────────┬─────────┬─────────┐
│ Byte 0 │ Byte 1 │ Byte 2 │ Byte 3 │
├─────────┼─────────┼─────────┼─────────┤
│ CID Low │ CID High│ Version │ Flags │
└─────────┴─────────┴─────────┴─────────┘
CID (Bytes 0-1): Company ID, little-endian
0xFFFF = Reserved for testing (Bluetooth SIG)
Version (Byte 2): Protocol version
0x03 = v0.3.0
Flags (Byte 3): Capability flags
Bit 0: PERIPHERAL_ONLY (1 = cannot act as central)
Bit 1: Reserved (CENTRAL_ONLY, future use)
Bits 2-7: Reserved (must be 0)
```
### 2.2 Example Values
| Device Type | CID | Version | Flags | Raw Bytes |
|-------------|-----|---------|-------|-----------|
| Dual-mode (full capability) | 0xFFFF | 0x03 | 0x00 | `FF FF 03 00` |
| Peripheral-only (ESP32-S3) | 0xFFFF | 0x03 | 0x01 | `FF FF 03 01` |
### 2.3 Advertising Packet Layout
The v0.3.0 advertising packet extends v2.2:
```
Main Advertising Packet (31 bytes max):
├── Flags (3 bytes)
├── Complete 128-bit Service UUID (18 bytes)
│ └── 37145b00-442d-4a94-917f-8f42c5da28e3
├── Manufacturer Data (5 bytes) ← NEW in v0.3.0
│ ├── AD Type 0xFF (1 byte)
│ └── Data (4 bytes): CID + Version + Flags
└── Remaining: 5 bytes available
Scan Response Packet (31 bytes max):
└── Device Name: "RNS-{identity}" (variable)
```
## 3. Connection Direction Logic
### 3.1 Decision Algorithm
```
FUNCTION shouldInitiateConnection(local_device, peer_device):
local_peripheral_only = local_device.flags & PERIPHERAL_ONLY
peer_peripheral_only = peer_device.flags & PERIPHERAL_ONLY
# Case 1: Peer is peripheral-only, we are not
IF peer_peripheral_only AND NOT local_peripheral_only:
RETURN TRUE # We MUST initiate (peer cannot)
# Case 2: We are peripheral-only, peer is not
IF local_peripheral_only AND NOT peer_peripheral_only:
RETURN FALSE # Peer MUST initiate (we cannot)
# Case 3: Both are peripheral-only (deadlock)
IF peer_peripheral_only AND local_peripheral_only:
LOG_WARNING("Both devices peripheral-only, connection impossible")
RETURN FALSE # Neither can initiate
# Case 4: Both have full capability - use v2.2 MAC sorting
RETURN local_device.mac < peer_device.mac
```
### 3.2 Capability Detection
When a device does not advertise manufacturer data (v2.2 device):
- Assume `has_capability_data = false`
- Assume `capability_flags = 0x00` (full capability)
- Fall back to v2.2 MAC sorting
When manufacturer data is present:
- Verify Company ID = 0xFFFF
- Verify Version >= 0x03
- Extract capability flags from byte 3
## 4. Backwards Compatibility
### 4.1 Compatibility Matrix
| Our Device | Peer Device | Connection Decision | Result |
|------------|-------------|---------------------|--------|
| v0.3.0 dual | v0.3.0 dual | MAC sorting | Works |
| v0.3.0 dual | v0.3.0 P-only | We initiate | Works |
| v0.3.0 P-only | v0.3.0 dual | Peer initiates | Works |
| v0.3.0 dual | v0.2.x | MAC sorting | Works |
| v0.3.0 P-only | v0.2.x (lower MAC) | v0.2.x initiates | Works |
| v0.3.0 P-only | v0.2.x (higher MAC) | Neither initiates | **Fails** |
| v0.3.0 P-only | v0.3.0 P-only | Neither initiates | **Fails** |
### 4.2 Known Limitations
1. **v0.3.0 peripheral-only ↔ v0.2.x with higher MAC**: No connection possible. The v0.2.x device uses MAC sorting and waits for the lower-MAC device (the v0.3.0 P-only) to initiate.
**Mitigation**: Upgrade the v0.2.x device to v0.3.0.
2. **Two peripheral-only devices**: Connection impossible as neither can initiate.
**Mitigation**: Ensure at least one device in the mesh has full capability.
## 5. GATT Service (Unchanged from v2.2)
The GATT service structure remains unchanged:
```
Reticulum Service: 37145b00-442d-4a94-917f-8f42c5da28e3
├── RX Characteristic: 37145b00-442d-4a94-917f-8f42c5da28e5
│ └── Properties: WRITE, WRITE_WITHOUT_RESPONSE
├── TX Characteristic: 37145b00-442d-4a94-917f-8f42c5da28e4
│ └── Properties: READ, NOTIFY
│ └── CCCD: 00002902-0000-1000-8000-00805f9b34fb
└── Identity Characteristic: 37145b00-442d-4a94-917f-8f42c5da28e6
└── Properties: READ
└── Value: 16-byte identity hash
```
## 6. Implementation Notes
### 6.1 NimBLE (ESP32)
```cpp
// Setting manufacturer data
uint8_t mfr_data[4] = {0xFF, 0xFF, 0x03, peripheral_only ? 0x01 : 0x00};
advertising->setManufacturerData(mfr_data, sizeof(mfr_data));
// Parsing manufacturer data
if (device->haveManufacturerData()) {
std::string data = device->getManufacturerData();
if (data.size() >= 4) {
uint16_t cid = (uint8_t)data[0] | ((uint8_t)data[1] << 8);
if (cid == 0xFFFF && data[2] >= 0x03) {
capability_flags = data[3];
}
}
}
```
### 6.2 Android
```kotlin
// Setting manufacturer data (note: Android API excludes CID in the byte array)
val mfrData = byteArrayOf(0x03.toByte(), flags.toByte())
advertiseData.addManufacturerData(0xFFFF, mfrData)
// Parsing manufacturer data
val mfrData = scanRecord.getManufacturerSpecificData(0xFFFF)
if (mfrData != null && mfrData.size >= 2 && mfrData[0] >= 0x03.toByte()) {
capabilityFlags = mfrData[1].toInt() and 0xFF
}
```
### 6.3 Python (BlueZ/Bleak)
```python
# Setting manufacturer data in advertisement
# BlueZ uses D-Bus ManufacturerData property
manufacturer_data = {0xFFFF: bytes([0x03, 0x01 if peripheral_only else 0x00])}
# Parsing manufacturer data from scan
mfr_data = device.metadata.get("manufacturer_data", {}).get(0xFFFF)
if mfr_data and len(mfr_data) >= 2 and mfr_data[0] >= 0x03:
capability_flags = mfr_data[1]
```
## 7. Future Extensions
Reserved capability flag bits for potential future use:
| Bit | Name | Description |
|-----|------|-------------|
| 0 | PERIPHERAL_ONLY | Cannot act as BLE central |
| 1 | CENTRAL_ONLY | Cannot act as BLE peripheral |
| 2 | HIGH_BANDWIDTH | Supports extended MTU/PHY |
| 3 | RELAY_CAPABLE | Can relay packets in mesh |
| 4-7 | Reserved | Must be 0 |
## 8. Version History
| Version | Date | Changes |
|---------|------|---------|
| v2.0 | Oct 2025 | Identity characteristic for peer identification |
| v2.1 | Oct 2025 | (Deprecated) Identity in device name |
| v2.2 | Nov 2025 | Identity handshake protocol, identity-based keying |
| v0.3.0 | Dec 2025 | Capability advertisement for peripheral-only devices |
## 9. References
- [BLE-Reticulum Protocol v2.2](BLE_PROTOCOL_v2.2.md) - Full protocol specification
- [Bluetooth Core Specification](https://www.bluetooth.com/specifications/specs/core-specification/) - BLE advertising data format
- [Bluetooth Assigned Numbers](https://www.bluetooth.com/specifications/assigned-numbers/) - Company IDs

View file

@ -376,11 +376,22 @@ class BLEInterface(Interface):
self.spawned_interfaces = {} # identity_hash (16 hex chars) -> BLEPeerInterface
self.address_to_identity = {} # address -> peer_identity (16-byte identity)
self.identity_to_address = {} # identity_hash -> address (for reverse lookup)
self.address_to_interface = {} # address -> BLEPeerInterface (for cleanup fallback)
# Cache for recently disconnected identities (address -> (identity, timestamp))
# Used to restore identity when peer reconnects before cache expiry (60s)
self._identity_cache = {}
self._identity_cache_ttl = 60 # seconds
# Pending connections awaiting identity handshake (address -> timestamp)
# If identity not received within timeout, connection is closed
self._pending_identity_connections = {}
self._pending_identity_timeout = 30 # seconds
# Pending interface detachments with grace period (identity_hash -> timestamp)
# Allows new connections to establish before detaching the interface
self._pending_detach = {}
self._pending_detach_grace_period = 2.0 # seconds
# Fragmentation
self.fragmenters = {} # address -> BLEFragmenter (per MTU)
self.reassemblers = {} # address -> BLEReassembler
@ -678,19 +689,23 @@ class BLEInterface(Interface):
self.cleanup_timer = threading.Timer(30.0, self._periodic_cleanup_task)
self.cleanup_timer.daemon = True
self.cleanup_timer.start()
RNS.log(f"{self} cleanup timer started (30s interval)", RNS.LOG_WARNING)
def _periodic_cleanup_task(self):
"""
Periodically clean up stale reassembly buffers (CRITICAL #2: prevent memory leak)
Periodically clean up stale reassembly buffers and orphaned interfaces.
This task runs every 30 seconds to remove incomplete packet reassembly buffers
that have timed out. Without this, failed transmissions would leave buffers in
memory indefinitely, leading to memory exhaustion on long-running instances
(especially critical on Pi Zero with only 512MB RAM).
This task runs every 30 seconds to:
1. Remove incomplete packet reassembly buffers that have timed out
(prevents memory exhaustion on long-running instances)
2. Validate spawned interfaces against actual connections
(catches orphaned interfaces from race conditions)
"""
if not self.online:
return # Don't reschedule if interface is offline
RNS.log(f"{self} periodic cleanup running, pending identity connections: {len(self._pending_identity_connections)}", RNS.LOG_WARNING)
with self.frag_lock:
total_cleaned = 0
for peer_address, reassembler in list(self.reassemblers.items()):
@ -704,9 +719,186 @@ class BLEInterface(Interface):
RNS.log(f"{self} periodic cleanup: removed {total_cleaned} stale reassembly buffer(s) total",
RNS.LOG_INFO)
# Validate spawned interfaces against actual connections
self._validate_spawned_interfaces()
# Check for pending connections that never received identity (timeout)
self._cleanup_pending_identity_connections()
# Process pending interface detachments (after grace period)
self._process_pending_detaches()
# Reschedule for next cleanup cycle
self._start_cleanup_timer()
def _cleanup_pending_identity_connections(self):
"""
Disconnect connections that never completed identity handshake.
This handles cases like non-Reticulum BLE devices (AirTags, scanners)
that connect to our GATT server but never send the identity handshake.
These connections are tracked when established and disconnected if
identity is not received within the timeout period.
"""
now = time.time()
timed_out = []
for address, connect_time in list(self._pending_identity_connections.items()):
elapsed = now - connect_time
if elapsed > self._pending_identity_timeout:
timed_out.append(address)
RNS.log(
f"{self} connection from {address} timed out waiting for identity "
f"({elapsed:.1f}s > {self._pending_identity_timeout}s), disconnecting",
RNS.LOG_WARNING
)
# Disconnect timed-out connections
for address in timed_out:
del self._pending_identity_connections[address]
try:
self.driver.disconnect(address)
except Exception as e:
RNS.log(f"{self} error disconnecting timed-out connection {address}: {e}", RNS.LOG_ERROR)
def _process_pending_detaches(self):
"""
Process pending interface detachments after grace period.
When an address disconnects, we schedule the interface for delayed detachment
to allow new connections with the same identity to establish first (MAC rotation).
After the grace period, we check again if any addresses are connected with that
identity - if not, we detach the interface.
"""
now = time.time()
to_detach = []
for identity_hash, scheduled_time in list(self._pending_detach.items()):
elapsed = now - scheduled_time
if elapsed >= self._pending_detach_grace_period:
# Grace period expired - check if any addresses now have this identity
has_connected_address = False
for addr, identity in self.address_to_identity.items():
if self._compute_identity_hash(identity) == identity_hash:
has_connected_address = True
break
if has_connected_address:
# New connection arrived during grace period - cancel detach
del self._pending_detach[identity_hash]
RNS.log(f"{self} cancelled detach for {identity_hash[:8]} - address reconnected during grace period", RNS.LOG_DEBUG)
else:
# No connections - safe to detach
to_detach.append(identity_hash)
# Actually detach interfaces
for identity_hash in to_detach:
del self._pending_detach[identity_hash]
peer_if = self.spawned_interfaces.get(identity_hash)
if peer_if:
# Get peer_identity for fragmenter cleanup before detaching
peer_identity = peer_if.peer_identity
peer_if.detach()
RNS.log(f"{self} detached interface for {identity_hash[:8]} after grace period", RNS.LOG_DEBUG)
if identity_hash in self.spawned_interfaces:
del self.spawned_interfaces[identity_hash]
if identity_hash in self.identity_to_address:
del self.identity_to_address[identity_hash]
# Clean up fragmenter/reassembler now that interface is fully detached
if peer_identity:
frag_key = self._get_fragmenter_key(peer_identity, "") # Address unused in key computation
with self.frag_lock:
if frag_key in self.fragmenters:
del self.fragmenters[frag_key]
RNS.log(f"{self} cleaned up fragmenter for {identity_hash[:8]}", RNS.LOG_DEBUG)
if frag_key in self.reassemblers:
del self.reassemblers[frag_key]
else:
RNS.log(f"{self} pending detach for {identity_hash[:8]} but interface already gone", RNS.LOG_DEBUG)
def _validate_spawned_interfaces(self):
"""
Validate that all spawned interfaces have actual underlying connections.
Cleans up orphaned interfaces where the BLE connection is gone but the
interface remains (race condition protection). This is a safety net for
cases where cleanup in disconnect callbacks fails due to timing issues.
"""
try:
# Get list of actually connected peers from driver
connected_addresses = set(self.driver.connected_peers)
# First pass: collect orphaned address mappings (addresses not in connected_addresses)
orphaned_addresses = []
for address in list(self.address_to_interface.keys()):
if address not in connected_addresses:
orphaned_addresses.append(address)
# Second pass: for each orphaned address, clean up mappings and check if interface should be detached
interfaces_to_detach = set() # Use set to avoid detaching same interface multiple times
for address in orphaned_addresses:
peer_if = self.address_to_interface.get(address)
if not peer_if:
continue
RNS.log(f"{self} cleaning up orphaned address mapping for {address}", RNS.LOG_DEBUG)
# Get identity info
peer_identity = None
identity_hash = None
if peer_if.peer_identity:
peer_identity = peer_if.peer_identity
identity_hash = self._compute_identity_hash(peer_identity)
# Remove address-specific mappings
if address in self.address_to_interface:
del self.address_to_interface[address]
if address in self.address_to_identity:
del self.address_to_identity[address]
# NOTE: Do NOT clean up fragmenters here - they are keyed by identity, not address
# Fragmenters are only cleaned up when the interface is fully detached (third pass)
# Check if ANY other addresses still use this identity/interface
if identity_hash:
other_addresses_connected = False
for other_addr in list(self.address_to_interface.keys()):
if other_addr in connected_addresses:
other_if = self.address_to_interface.get(other_addr)
if other_if == peer_if:
other_addresses_connected = True
# Update identity_to_address to point to a connected address
self.identity_to_address[identity_hash] = other_addr
break
if not other_addresses_connected:
# No other addresses connected with this identity - mark for detach
interfaces_to_detach.add((peer_if, identity_hash, peer_identity))
# Third pass: detach interfaces that have no connected addresses
for peer_if, identity_hash, peer_identity in interfaces_to_detach:
RNS.log(f"{self} detaching orphaned interface for {identity_hash[:8]} (no active connections)", RNS.LOG_WARNING)
peer_if.detach()
if identity_hash in self.spawned_interfaces:
del self.spawned_interfaces[identity_hash]
if identity_hash in self.identity_to_address:
del self.identity_to_address[identity_hash]
# Clean up fragmenter/reassembler only when interface is fully detached
if peer_identity:
frag_key = self._get_fragmenter_key(peer_identity, "") # Address unused in key computation
with self.frag_lock:
if frag_key in self.fragmenters:
del self.fragmenters[frag_key]
if frag_key in self.reassemblers:
del self.reassemblers[frag_key]
if orphaned_addresses:
RNS.log(f"{self} periodic validation: cleaned up {len(orphaned_addresses)} orphaned address(es), detached {len(interfaces_to_detach)} interface(s)", RNS.LOG_INFO)
except Exception as e:
RNS.log(f"{self} error during interface validation (non-fatal): {e}", RNS.LOG_WARNING)
def _device_discovered_callback(self, device: BLEDevice):
"""
Driver callback: Handle discovered BLE device.
@ -778,6 +970,11 @@ class BLEInterface(Interface):
if len(peer_identity) == 16:
identity_hash = self._compute_identity_hash(peer_identity)
# Cancel any pending detach for this identity (new connection arrived in time)
if identity_hash in self._pending_detach:
del self._pending_detach[identity_hash]
RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]} (new connection from {address})", RNS.LOG_DEBUG)
# Store identity mappings
self.address_to_identity[address] = peer_identity
self.identity_to_address[identity_hash] = address
@ -786,6 +983,10 @@ class BLEInterface(Interface):
RNS.log(f"{self} connected to {address} as {role_str}, received identity: {identity_hash}", RNS.LOG_INFO)
self._record_connection_success(address)
# Remove from pending identity tracking if it was tracked
if address in self._pending_identity_connections:
del self._pending_identity_connections[address]
# Check for pending MTU (race condition: MTU negotiated before identity)
if address in self.pending_mtu:
pending_mtu = self.pending_mtu.pop(address)
@ -799,11 +1000,15 @@ class BLEInterface(Interface):
elif role == "peripheral":
# Peripheral mode: identity will arrive via handshake
RNS.log(f"{self} connected to {address} as PERIPHERAL, waiting for identity handshake...", RNS.LOG_INFO)
# Track pending connection - will timeout if identity never arrives
self._pending_identity_connections[address] = time.time()
# The identity will be received in `_data_received_callback`
else:
RNS.log(f"{self} connected to {address}, but identity not provided and role is {role}. Disconnecting.", RNS.LOG_WARNING)
self.driver.disconnect(address)
# Connection without identity from non-peripheral role
# Track as pending - could be a device that connects but never handshakes
RNS.log(f"{self} connected to {address} (role={role}) without identity, tracking for timeout", RNS.LOG_INFO)
self._pending_identity_connections[address] = time.time()
def _check_duplicate_identity(self, address: str, peer_identity: bytes) -> bool:
"""
@ -953,6 +1158,11 @@ class BLEInterface(Interface):
)
RNS.log(f"{self} identity handshake complete for {address}", RNS.LOG_INFO)
# Remove from pending identity tracking (no longer waiting for handshake)
if address in self._pending_identity_connections:
del self._pending_identity_connections[address]
return True # Handshake processed successfully
except Exception as e:
@ -978,6 +1188,8 @@ class BLEInterface(Interface):
Driver callback: Handle device disconnection.
Cleans up peer state, interfaces, and fragmentation buffers.
Uses dual-index approach: tries identity lookup first, falls back to
address_to_interface for reliable cleanup even when identity unavailable.
"""
RNS.log(f"{self} disconnected from {address}", RNS.LOG_INFO)
@ -986,8 +1198,11 @@ class BLEInterface(Interface):
if address in self.peers:
del self.peers[address]
# Detach interface
# Try identity-based lookup first
peer_identity = self.address_to_identity.get(address)
peer_if = None
identity_hash = None
if peer_identity:
identity_hash = self._compute_identity_hash(peer_identity)
@ -996,74 +1211,93 @@ class BLEInterface(Interface):
self._identity_cache[address] = (peer_identity, time.time())
RNS.log(f"{self} cached identity for {address} (TTL {self._identity_cache_ttl}s)", RNS.LOG_DEBUG)
if identity_hash in self.spawned_interfaces:
peer_if = self.spawned_interfaces[identity_hash]
peer_if.detach()
del self.spawned_interfaces[identity_hash]
RNS.log(f"{self} detached interface for {address}", RNS.LOG_DEBUG)
# Get interface via identity
peer_if = self.spawned_interfaces.get(identity_hash)
# Clean up identity mappings to prevent stale connections
if address in self.address_to_identity:
del self.address_to_identity[address]
RNS.log(f"{self} cleaned up address_to_identity for {address}", RNS.LOG_DEBUG)
if identity_hash in self.identity_to_address:
del self.identity_to_address[identity_hash]
RNS.log(f"{self} cleaned up identity_to_address for {identity_hash}", RNS.LOG_DEBUG)
# Fallback: if no identity or interface not found via identity, try direct address lookup
if peer_if is None:
peer_if = self.address_to_interface.get(address)
if peer_if:
RNS.log(f"{self} using address-based fallback for cleanup of {address}", RNS.LOG_DEBUG)
# Get identity from the interface itself
if peer_if.peer_identity:
peer_identity = peer_if.peer_identity
identity_hash = self._compute_identity_hash(peer_identity)
# Clean up fragmenter/reassembler
if peer_identity:
frag_key = self._get_fragmenter_key(peer_identity, address)
with self.frag_lock:
if frag_key in self.fragmenters:
del self.fragmenters[frag_key]
if frag_key in self.reassemblers:
del self.reassemblers[frag_key]
# Clean up address-specific mappings first (before checking for other addresses)
if address in self.address_to_interface:
del self.address_to_interface[address]
if address in self.address_to_identity:
del self.address_to_identity[address]
RNS.log(f"{self} cleaned up address_to_identity for {address}", RNS.LOG_DEBUG)
# Check if any OTHER addresses still have the same identity
# If so, keep the peer interface alive - only detach when ALL addresses are gone
other_addresses_with_identity = []
if identity_hash:
for other_addr, other_identity in self.address_to_identity.items():
if other_addr != address:
other_hash = self._compute_identity_hash(other_identity)
if other_hash == identity_hash:
other_addresses_with_identity.append(other_addr)
if other_addresses_with_identity:
# Other addresses still connected with same identity - keep interface AND fragmenter alive
RNS.log(f"{self} keeping peer interface for {identity_hash[:8]} alive, other addresses still connected: {other_addresses_with_identity}", RNS.LOG_DEBUG)
# Update identity_to_address to point to one of the remaining addresses
self.identity_to_address[identity_hash] = other_addresses_with_identity[0]
# Cancel any pending detach for this identity
if identity_hash in self._pending_detach:
del self._pending_detach[identity_hash]
RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]}", RNS.LOG_DEBUG)
# NOTE: Do NOT clean up fragmenter - it's keyed by identity, not address
# Other addresses are still using it
else:
# No other addresses with this identity YET - schedule detach with grace period
# This allows new connections with the same identity to establish before detaching
if peer_if and identity_hash:
self._pending_detach[identity_hash] = time.time()
RNS.log(f"{self} scheduled detach for {identity_hash[:8]} in {self._pending_detach_grace_period}s", RNS.LOG_DEBUG)
elif not peer_if:
RNS.log(f"{self} no interface found for disconnected {address} (may have been cleaned already)", RNS.LOG_DEBUG)
# NOTE: Fragmenter cleanup happens in _process_pending_detaches after grace period
# This gives new connections time to establish before removing the fragmenter
# Clean up pending MTU (from MTU/identity race condition)
if address in self.pending_mtu:
del self.pending_mtu[address]
def _cleanup_stale_interface(self, identity_hash: str, old_address: str):
def _cleanup_stale_address(self, identity_hash: str, old_address: str):
"""
Clean up stale interface after MAC rotation.
Clean up stale address mappings after MAC rotation.
Called when we detect the same identity at a new MAC address but the
old connection is no longer alive. This allows reconnection to the
peer at their new MAC address.
old connection is no longer alive. This cleans up old address mappings
but KEEPS the interface alive for reuse with the new address.
Args:
identity_hash: 16-character hex hash of the peer's identity
old_address: The old MAC address that is no longer valid
"""
# Get peer identity for fragmenter cleanup
peer_identity = self.address_to_identity.get(old_address)
# NOTE: Do NOT detach the interface here! The interface should be REUSED
# for the new address. _spawn_peer_interface() handles reuse automatically.
# We only clean up stale address-specific mappings.
# Detach and remove old interface
if identity_hash in self.spawned_interfaces:
old_interface = self.spawned_interfaces.pop(identity_hash)
old_interface.detach()
RNS.log(f"{self} detached stale interface for {identity_hash[:8]}", RNS.LOG_DEBUG)
# Clean up address mappings (both directions)
if identity_hash in self.identity_to_address:
del self.identity_to_address[identity_hash]
# Clean up old address mappings (but keep identity_to_address - it will be updated)
if old_address in self.address_to_identity:
del self.address_to_identity[old_address]
# Clean up fragmenter/reassembler for old address
if peer_identity:
frag_key = self._get_fragmenter_key(peer_identity, old_address)
with self.frag_lock:
if frag_key in self.fragmenters:
del self.fragmenters[frag_key]
if frag_key in self.reassemblers:
del self.reassemblers[frag_key]
if old_address in self.address_to_interface:
del self.address_to_interface[old_address]
# Clean up pending MTU for old address
if old_address in self.pending_mtu:
del self.pending_mtu[old_address]
RNS.log(f"{self} cleaned up stale state for {old_address}", RNS.LOG_DEBUG)
# NOTE: Do NOT clean up fragmenters here - they are keyed by identity, not address
# The fragmenter will continue to work with the new address
RNS.log(f"{self} cleaned up stale address mappings for {old_address} (interface preserved for reuse)", RNS.LOG_DEBUG)
def _address_changed_callback(self, old_address: str, new_address: str, identity_hash: str):
"""
@ -1102,6 +1336,11 @@ class BLEInterface(Interface):
computed_hash = self._compute_identity_hash(peer_identity)
self.identity_to_address[computed_hash] = new_address
# Migrate address_to_interface mapping
if old_address in self.address_to_interface:
interface = self.address_to_interface.pop(old_address)
self.address_to_interface[new_address] = interface
# Migrate fragmenter/reassembler from old to new key
old_frag_key = self._get_fragmenter_key(peer_identity, old_address)
new_frag_key = self._get_fragmenter_key(peer_identity, new_address)
@ -1348,10 +1587,11 @@ class BLEInterface(Interface):
RNS.LOG_DEBUG)
continue
else:
# Old connection dead - clean up and allow new connection
RNS.log(f"{self} [v2.2] MAC rotation: {identity_hash[:8]} moved from {existing_address[-8:]} to {address[-8:]}, cleaning up stale interface",
# Old connection dead - clean up stale address mappings and allow new connection
# NOTE: Interface is preserved for reuse, not detached
RNS.log(f"{self} [v2.2] MAC rotation: {identity_hash[:8]} moved from {existing_address[-8:]} to {address[-8:]}, cleaning up stale address",
RNS.LOG_INFO)
self._cleanup_stale_interface(identity_hash, existing_address)
self._cleanup_stale_address(identity_hash, existing_address)
# Bypass MAC sorting - we must reconnect after MAC rotation
# regardless of which device has the higher MAC address
score = self._score_peer(peer)
@ -1548,25 +1788,40 @@ class BLEInterface(Interface):
# Compute lookup key using identity hash
identity_hash = self._compute_identity_hash(peer_identity)
# Check if interface already exists (MAC sorting should prevent this)
if identity_hash in self.spawned_interfaces:
RNS.log(f"{self} interface already exists for {name} ({identity_hash[:8]}), reusing", RNS.LOG_WARNING)
return self.spawned_interfaces[identity_hash]
# Use lock to prevent race condition where two threads create interfaces
# for the same identity simultaneously (e.g., central and peripheral callbacks)
with self.peer_lock:
# Check if interface already exists (MAC rotation causes same identity at different addresses)
if identity_hash in self.spawned_interfaces:
existing_if = self.spawned_interfaces[identity_hash]
# Update all address mappings for the new address (critical for cleanup and routing)
self.address_to_interface[address] = existing_if
self.address_to_identity[address] = peer_identity
self.identity_to_address[identity_hash] = address
# Cancel any pending detach for this identity - new connection arrived
if identity_hash in self._pending_detach:
del self._pending_detach[identity_hash]
RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]} (new connection at {address})", RNS.LOG_DEBUG)
# Mark interface as online in case it was marked offline during pending detach
existing_if.online = True
RNS.log(f"{self} interface already exists for {name} ({identity_hash[:8]}), reusing (added address mapping for {address})", RNS.LOG_DEBUG)
return existing_if
# Create new peer interface
peer_if = BLEPeerInterface(self, address, name, peer_identity)
peer_if.OUT = self.OUT
peer_if.IN = self.IN
peer_if.parent_interface = self
peer_if.bitrate = self.bitrate
peer_if.HW_MTU = self.HW_MTU
peer_if.online = True
# Create new peer interface
peer_if = BLEPeerInterface(self, address, name, peer_identity)
peer_if.OUT = self.OUT
peer_if.IN = self.IN
peer_if.parent_interface = self
peer_if.bitrate = self.bitrate
peer_if.HW_MTU = self.HW_MTU
peer_if.online = True
# Register with transport
RNS.Transport.interfaces.append(peer_if)
# Register with transport
RNS.Transport.interfaces.append(peer_if)
# Store in tracking dict
self.spawned_interfaces[identity_hash] = peer_if
# Store in tracking dicts (dual-indexed for reliable cleanup)
self.spawned_interfaces[identity_hash] = peer_if
self.address_to_interface[address] = peer_if
RNS.log(f"{self} created peer interface for {name} ({identity_hash[:8]}), type={connection_type}", RNS.LOG_INFO)
@ -1830,43 +2085,70 @@ class BLEInterface(Interface):
"""
Handle a central device disconnecting from our GATT server.
Uses dual-index approach: tries identity lookup first, falls back to
address_to_interface for reliable cleanup even when identity unavailable.
Args:
address: BLE address of the central device
"""
RNS.log(f"{self} central disconnected: {address}", RNS.LOG_INFO)
# Look up peer identity
# Try identity-based lookup first
peer_identity = self.address_to_identity.get(address, None)
peer_if = None
identity_hash = None
if not peer_identity:
RNS.log(f"{self} no identity for disconnected central {address}", RNS.LOG_WARNING)
return
if peer_identity:
identity_hash = self._compute_identity_hash(peer_identity)
peer_if = self.spawned_interfaces.get(identity_hash)
# Find and detach interface
identity_hash = self._compute_identity_hash(peer_identity)
if identity_hash in self.spawned_interfaces:
peer_if = self.spawned_interfaces[identity_hash]
peer_if.detach()
del self.spawned_interfaces[identity_hash]
RNS.log(f"{self} detached interface for {address}", RNS.LOG_DEBUG)
# Fallback: if no identity or interface not found via identity, try direct address lookup
if peer_if is None:
peer_if = self.address_to_interface.get(address)
if peer_if:
RNS.log(f"{self} using address-based fallback for cleanup of central {address}", RNS.LOG_DEBUG)
# Get identity from the interface itself
if peer_if.peer_identity:
peer_identity = peer_if.peer_identity
identity_hash = self._compute_identity_hash(peer_identity)
# Clean up identity mappings to prevent stale connections
if address in self.address_to_identity:
del self.address_to_identity[address]
RNS.log(f"{self} cleaned up address_to_identity for {address}", RNS.LOG_DEBUG)
if identity_hash in self.identity_to_address:
del self.identity_to_address[identity_hash]
RNS.log(f"{self} cleaned up identity_to_address for {identity_hash}", RNS.LOG_DEBUG)
# Clean up address-specific mappings first (before checking for other addresses)
if address in self.address_to_interface:
del self.address_to_interface[address]
# Clean up fragmenter/reassembler
frag_key = self._get_fragmenter_key(peer_identity, address)
with self.frag_lock:
if frag_key in self.reassemblers:
del self.reassemblers[frag_key]
RNS.log(f"{self} cleaned up reassembler for {address}", RNS.LOG_DEBUG)
if frag_key in self.fragmenters:
del self.fragmenters[frag_key]
RNS.log(f"{self} cleaned up fragmenter for {address}", RNS.LOG_DEBUG)
if address in self.address_to_identity:
del self.address_to_identity[address]
RNS.log(f"{self} cleaned up address_to_identity for {address}", RNS.LOG_DEBUG)
# Check if any OTHER addresses still have the same identity
# If so, keep the peer interface alive - only detach when ALL addresses are gone
other_addresses_with_identity = []
if identity_hash:
for other_addr, other_identity in self.address_to_identity.items():
if other_addr != address:
other_hash = self._compute_identity_hash(other_identity)
if other_hash == identity_hash:
other_addresses_with_identity.append(other_addr)
if other_addresses_with_identity:
# Other addresses still connected with same identity - keep interface AND fragmenter alive
RNS.log(f"{self} keeping peer interface for {identity_hash[:8]} alive (central disconnect), other addresses: {other_addresses_with_identity}", RNS.LOG_DEBUG)
# Update identity_to_address to point to one of the remaining addresses
self.identity_to_address[identity_hash] = other_addresses_with_identity[0]
# Cancel any pending detach for this identity
if identity_hash in self._pending_detach:
del self._pending_detach[identity_hash]
RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]}", RNS.LOG_DEBUG)
# NOTE: Do NOT clean up fragmenter - it's keyed by identity, not address
# Other addresses are still using it
else:
# No other addresses with this identity YET - schedule detach with grace period
if peer_if and identity_hash:
self._pending_detach[identity_hash] = time.time()
RNS.log(f"{self} scheduled detach for {identity_hash[:8]} in {self._pending_detach_grace_period}s (central)", RNS.LOG_DEBUG)
elif not peer_if:
RNS.log(f"{self} no interface found for disconnected central {address} (may have been cleaned already)", RNS.LOG_DEBUG)
# NOTE: Fragmenter cleanup happens in _process_pending_detaches after grace period
def process_incoming(self, data):
"""
@ -1926,6 +2208,7 @@ class BLEInterface(Interface):
for peer_if in list(self.spawned_interfaces.values()):
peer_if.detach()
self.spawned_interfaces.clear()
self.address_to_interface.clear()
# Clear fragmentation state
with self.frag_lock:

View file

@ -41,7 +41,7 @@ if src_path not in sys.path:
# Import directly using importlib to bypass RNS namespace conflicts
# This avoids issues when a real RNS package is installed globally
import importlib.util
bluetooth_driver_path = os.path.join(src_path, 'RNS', 'Interfaces', 'bluetooth_driver.py')
bluetooth_driver_path = os.path.join(src_path, 'ble_reticulum', 'bluetooth_driver.py')
spec = importlib.util.spec_from_file_location("bluetooth_driver", bluetooth_driver_path)
bluetooth_driver = importlib.util.module_from_spec(spec)
spec.loader.exec_module(bluetooth_driver)

View file

@ -85,8 +85,11 @@ def ble_interface(mock_rns, mock_driver):
interface.spawned_interfaces = {}
interface.address_to_identity = {}
interface.identity_to_address = {}
interface.address_to_interface = {} # address -> BLEPeerInterface
interface._identity_cache = {}
interface._identity_cache_ttl = 60
interface._pending_detach = {} # identity_hash -> timestamp
interface._pending_detach_grace_period = 2.0 # seconds
# Fragmentation
interface.fragmenters = {}
@ -143,12 +146,16 @@ class TestIdentityCacheOnDisconnect:
assert cached_identity == identity
assert time.time() - cached_time < 2 # Cached recently
# Assert: Active mappings should be cleaned up
# Assert: Address-specific mappings should be cleaned up immediately
assert mac not in ble_interface.address_to_identity
assert identity_hash not in ble_interface.identity_to_address
# Assert: Peer interface was detached
mock_peer_if.detach.assert_called_once()
# Assert: identity_to_address and interface are NOT cleaned up immediately
# (grace period allows reconnection with same identity at new address)
assert identity_hash in ble_interface.identity_to_address
# Assert: Detach is scheduled, not immediate
assert identity_hash in ble_interface._pending_detach
mock_peer_if.detach.assert_not_called()
def test_disconnect_unknown_address_no_crash(self, ble_interface, mock_rns):
"""

View file

@ -0,0 +1,701 @@
"""
Tests for BLEInterface cleanup methods.
These tests cover the new cleanup functionality added to handle:
1. Pending identity connection timeouts (non-Reticulum devices)
2. Delayed interface detachment with grace period (MAC rotation)
3. Orphaned interface validation (race condition protection)
4. Multi-address identity handling (same identity at multiple addresses)
These tests exercise the REAL BLEInterface code with mocked external dependencies.
"""
import pytest
import time
import threading
from unittest.mock import Mock, MagicMock, patch, PropertyMock
import sys
import os
# Ensure src is in path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
@pytest.fixture
def mock_rns():
"""Mock RNS module for testing."""
with patch.dict('sys.modules', {'RNS': MagicMock()}):
import sys
rns = sys.modules['RNS']
rns.LOG_CRITICAL = 0
rns.LOG_ERROR = 1
rns.LOG_WARNING = 2
rns.LOG_NOTICE = 3
rns.LOG_INFO = 4
rns.LOG_VERBOSE = 5
rns.LOG_DEBUG = 6
rns.LOG_EXTREME = 7
rns.log = Mock()
rns.prettyhexrep = lambda x: x.hex() if isinstance(x, bytes) else str(x)
rns.Transport = Mock()
rns.Transport.interfaces = []
yield rns
@pytest.fixture
def mock_driver():
"""Create a mock BLE driver."""
driver = Mock()
driver.on_device_connected = None
driver.on_device_disconnected = None
driver.on_data_received = None
driver.on_identity_received = None
driver.on_error = None
driver.on_duplicate_identity_detected = None
driver.on_address_changed = None
driver.request_identity_resync = Mock(return_value=False)
driver.get_cached_identity = Mock(return_value=None)
driver.disconnect = Mock()
driver.connected_peers = []
return driver
@pytest.fixture
def ble_interface(mock_rns, mock_driver):
"""
Create a BLEInterface instance with mocked dependencies.
This uses the REAL BLEInterface class but mocks external dependencies
to allow testing the cleanup logic.
"""
try:
from ble_reticulum.BLEInterface import BLEInterface
# Create interface without calling __init__
interface = object.__new__(BLEInterface)
# Initialize all required attributes manually
interface.name = "TestBLE"
interface.owner = Mock()
interface.online = True
interface.driver = mock_driver
# Identity mappings
interface.peers = {}
interface.spawned_interfaces = {}
interface.address_to_identity = {}
interface.identity_to_address = {}
interface.address_to_interface = {}
# Identity cache
interface._identity_cache = {}
interface._identity_cache_ttl = 60
# Pending identity connections
interface._pending_identity_connections = {}
interface._pending_identity_timeout = 30
# Pending detachments
interface._pending_detach = {}
interface._pending_detach_grace_period = 2.0
# Fragmentation
interface.fragmenters = {}
interface.reassemblers = {}
interface.pending_mtu = {}
# Locks
interface.peer_lock = threading.RLock()
interface.frag_lock = threading.RLock()
# Other attributes
interface.HW_MTU = 500
interface.MIN_MTU = 20
interface.bitrate = 700000
interface.rxb = 0
interface.txb = 0
interface.OUT = True
interface.IN = True
# Add required methods
interface._compute_identity_hash = lambda x: x[:8].hex()
interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}"
return interface
except ImportError as e:
pytest.skip(f"Cannot import BLEInterface (RNS not installed): {e}")
class TestCleanupPendingIdentityConnections:
"""Test _cleanup_pending_identity_connections() method."""
def test_no_pending_connections(self, ble_interface, mock_rns):
"""Should handle empty pending connections gracefully."""
assert len(ble_interface._pending_identity_connections) == 0
ble_interface._cleanup_pending_identity_connections()
# Should not crash, no disconnects called
ble_interface.driver.disconnect.assert_not_called()
def test_connection_not_timed_out(self, ble_interface, mock_rns):
"""Connections within timeout should not be disconnected."""
mac = "AA:BB:CC:DD:EE:FF"
# Connection started 5 seconds ago (within 30s timeout)
ble_interface._pending_identity_connections[mac] = time.time() - 5
ble_interface._cleanup_pending_identity_connections()
# Should not disconnect
ble_interface.driver.disconnect.assert_not_called()
# Should still be tracked
assert mac in ble_interface._pending_identity_connections
def test_connection_timed_out(self, ble_interface, mock_rns):
"""Connections past timeout should be disconnected."""
mac = "AA:BB:CC:DD:EE:FF"
# Connection started 35 seconds ago (past 30s timeout)
ble_interface._pending_identity_connections[mac] = time.time() - 35
ble_interface._cleanup_pending_identity_connections()
# Should disconnect
ble_interface.driver.disconnect.assert_called_once_with(mac)
# Should be removed from tracking
assert mac not in ble_interface._pending_identity_connections
def test_multiple_connections_mixed_states(self, ble_interface, mock_rns):
"""Should handle mix of timed-out and valid connections."""
mac_expired1 = "AA:BB:CC:DD:EE:01"
mac_valid = "AA:BB:CC:DD:EE:02"
mac_expired2 = "AA:BB:CC:DD:EE:03"
ble_interface._pending_identity_connections[mac_expired1] = time.time() - 40
ble_interface._pending_identity_connections[mac_valid] = time.time() - 5
ble_interface._pending_identity_connections[mac_expired2] = time.time() - 35
ble_interface._cleanup_pending_identity_connections()
# Should disconnect both expired connections
assert ble_interface.driver.disconnect.call_count == 2
# Valid connection should remain
assert mac_valid in ble_interface._pending_identity_connections
assert mac_expired1 not in ble_interface._pending_identity_connections
assert mac_expired2 not in ble_interface._pending_identity_connections
def test_disconnect_error_handled(self, ble_interface, mock_rns):
"""Errors during disconnect should be caught and logged."""
mac = "AA:BB:CC:DD:EE:FF"
ble_interface._pending_identity_connections[mac] = time.time() - 35
ble_interface.driver.disconnect.side_effect = Exception("BLE disconnect failed")
# Should not raise
ble_interface._cleanup_pending_identity_connections()
# Connection should still be removed from tracking
assert mac not in ble_interface._pending_identity_connections
class TestProcessPendingDetaches:
"""Test _process_pending_detaches() method."""
def test_no_pending_detaches(self, ble_interface, mock_rns):
"""Should handle empty pending detaches gracefully."""
assert len(ble_interface._pending_detach) == 0
ble_interface._process_pending_detaches()
# Should not crash
def test_detach_within_grace_period(self, ble_interface, mock_rns):
"""Detaches within grace period should not execute."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
# Scheduled 0.5 seconds ago (within 2s grace period)
ble_interface._pending_detach[identity_hash] = time.time() - 0.5
# Create mock interface
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface._process_pending_detaches()
# Should not detach yet
mock_peer_if.detach.assert_not_called()
assert identity_hash in ble_interface._pending_detach
assert identity_hash in ble_interface.spawned_interfaces
def test_detach_after_grace_period_no_reconnect(self, ble_interface, mock_rns):
"""Detaches past grace period should execute if no reconnection."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
# Scheduled 3 seconds ago (past 2s grace period)
ble_interface._pending_detach[identity_hash] = time.time() - 3
ble_interface.identity_to_address[identity_hash] = "AA:BB:CC:DD:EE:FF"
# Create mock interface
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface._process_pending_detaches()
# Should detach
mock_peer_if.detach.assert_called_once()
assert identity_hash not in ble_interface._pending_detach
assert identity_hash not in ble_interface.spawned_interfaces
assert identity_hash not in ble_interface.identity_to_address
def test_detach_cancelled_if_reconnected(self, ble_interface, mock_rns):
"""Detach should be cancelled if address reconnected during grace period."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
# Scheduled 3 seconds ago (past 2s grace period)
ble_interface._pending_detach[identity_hash] = time.time() - 3
# But address reconnected (identity mapping exists)
ble_interface.address_to_identity[mac] = identity
# Create mock interface
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface._process_pending_detaches()
# Should NOT detach (reconnected during grace period)
mock_peer_if.detach.assert_not_called()
assert identity_hash not in ble_interface._pending_detach # Pending cleared
assert identity_hash in ble_interface.spawned_interfaces # Interface kept
def test_detach_cleans_up_fragmenter(self, ble_interface, mock_rns):
"""Detach should clean up fragmenter and reassembler."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
frag_key = identity_hash
# Scheduled 3 seconds ago (past 2s grace period)
ble_interface._pending_detach[identity_hash] = time.time() - 3
ble_interface.identity_to_address[identity_hash] = "AA:BB:CC:DD:EE:FF"
# Create mock interface
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
# Create mock fragmenter/reassembler
ble_interface.fragmenters[frag_key] = Mock()
ble_interface.reassemblers[frag_key] = Mock()
ble_interface._process_pending_detaches()
# Fragmenter/reassembler should be cleaned up
assert frag_key not in ble_interface.fragmenters
assert frag_key not in ble_interface.reassemblers
def test_detach_interface_already_gone(self, ble_interface, mock_rns):
"""Should handle case where interface was already removed."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
# Scheduled 3 seconds ago (past 2s grace period)
ble_interface._pending_detach[identity_hash] = time.time() - 3
# No interface exists (already cleaned up elsewhere)
assert identity_hash not in ble_interface.spawned_interfaces
# Should not crash
ble_interface._process_pending_detaches()
# Pending entry should be cleared
assert identity_hash not in ble_interface._pending_detach
class TestValidateSpawnedInterfaces:
"""Test _validate_spawned_interfaces() method."""
def test_no_orphaned_interfaces(self, ble_interface, mock_rns):
"""Should handle case with no orphaned interfaces."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
# Interface with connected address
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac] = mock_peer_if
ble_interface.address_to_identity[mac] = identity
ble_interface.identity_to_address[identity_hash] = mac
# Driver shows address connected
ble_interface.driver.connected_peers = [mac]
ble_interface._validate_spawned_interfaces()
# Nothing should be detached
mock_peer_if.detach.assert_not_called()
assert identity_hash in ble_interface.spawned_interfaces
assert mac in ble_interface.address_to_interface
def test_orphaned_address_mapping_cleaned(self, ble_interface, mock_rns):
"""Should clean up address mappings for disconnected addresses."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac_disconnected = "AA:BB:CC:DD:EE:01"
mac_connected = "AA:BB:CC:DD:EE:02"
# Interface with two address mappings
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac_disconnected] = mock_peer_if
ble_interface.address_to_interface[mac_connected] = mock_peer_if
ble_interface.address_to_identity[mac_disconnected] = identity
ble_interface.address_to_identity[mac_connected] = identity
ble_interface.identity_to_address[identity_hash] = mac_disconnected
# Only one address still connected
ble_interface.driver.connected_peers = [mac_connected]
ble_interface._validate_spawned_interfaces()
# Disconnected address should be cleaned up
assert mac_disconnected not in ble_interface.address_to_interface
assert mac_disconnected not in ble_interface.address_to_identity
# Connected address should remain
assert mac_connected in ble_interface.address_to_interface
# Interface should NOT be detached (other address still connected)
mock_peer_if.detach.assert_not_called()
assert identity_hash in ble_interface.spawned_interfaces
# identity_to_address should point to connected address
assert ble_interface.identity_to_address[identity_hash] == mac_connected
def test_orphaned_interface_detached(self, ble_interface, mock_rns):
"""Should detach interface when all addresses disconnected."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
frag_key = identity_hash
# Interface with no connected addresses
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac] = mock_peer_if
ble_interface.address_to_identity[mac] = identity
ble_interface.identity_to_address[identity_hash] = mac
# Create fragmenter/reassembler
ble_interface.fragmenters[frag_key] = Mock()
ble_interface.reassemblers[frag_key] = Mock()
# No addresses connected
ble_interface.driver.connected_peers = []
ble_interface._validate_spawned_interfaces()
# Interface should be detached
mock_peer_if.detach.assert_called_once()
assert identity_hash not in ble_interface.spawned_interfaces
assert identity_hash not in ble_interface.identity_to_address
assert mac not in ble_interface.address_to_interface
assert mac not in ble_interface.address_to_identity
# Fragmenter/reassembler should be cleaned up
assert frag_key not in ble_interface.fragmenters
assert frag_key not in ble_interface.reassemblers
def test_validation_handles_exception(self, ble_interface, mock_rns):
"""Should catch and log exceptions during validation."""
# Make connected_peers property raise an exception
type(ble_interface.driver).connected_peers = PropertyMock(
side_effect=Exception("BLE driver error")
)
# Should not raise
ble_interface._validate_spawned_interfaces()
class TestMultiAddressDisconnect:
"""Test disconnect callback with multiple addresses per identity."""
def test_disconnect_preserves_interface_with_other_addresses(self, ble_interface, mock_rns):
"""When one address disconnects, interface should stay if others remain."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac1 = "AA:BB:CC:DD:EE:01"
mac2 = "AA:BB:CC:DD:EE:02"
# Setup: Two addresses mapped to same identity/interface
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
mock_peer_if.detach = Mock()
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac1] = mock_peer_if
ble_interface.address_to_interface[mac2] = mock_peer_if
ble_interface.address_to_identity[mac1] = identity
ble_interface.address_to_identity[mac2] = identity
ble_interface.identity_to_address[identity_hash] = mac1
# Disconnect mac1
ble_interface._device_disconnected_callback(mac1)
# Interface should NOT be detached (mac2 still connected)
mock_peer_if.detach.assert_not_called()
assert identity_hash in ble_interface.spawned_interfaces
# mac1 mappings should be cleaned
assert mac1 not in ble_interface.address_to_interface
assert mac1 not in ble_interface.address_to_identity
# mac2 should remain
assert mac2 in ble_interface.address_to_interface
assert mac2 in ble_interface.address_to_identity
# identity_to_address should point to mac2 now
assert ble_interface.identity_to_address[identity_hash] == mac2
# No pending detach (other address still connected)
assert identity_hash not in ble_interface._pending_detach
def test_disconnect_schedules_detach_when_last_address(self, ble_interface, mock_rns):
"""When last address disconnects, detach should be scheduled."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
# Setup: Single address mapped to identity
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
mock_peer_if.detach = Mock()
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac] = mock_peer_if
ble_interface.address_to_identity[mac] = identity
ble_interface.identity_to_address[identity_hash] = mac
# Disconnect
ble_interface._device_disconnected_callback(mac)
# Interface should NOT be immediately detached
mock_peer_if.detach.assert_not_called()
# But detach should be scheduled
assert identity_hash in ble_interface._pending_detach
# Address mappings should be cleaned immediately
assert mac not in ble_interface.address_to_interface
assert mac not in ble_interface.address_to_identity
class TestCentralDisconnected:
"""Test handle_central_disconnected with multi-address handling."""
def test_central_disconnect_preserves_interface_with_other_addresses(self, ble_interface, mock_rns):
"""When central disconnects, interface should stay if other addresses exist."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac_central = "AA:BB:CC:DD:EE:01"
mac_other = "AA:BB:CC:DD:EE:02"
# Setup: Two addresses mapped to same identity
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
mock_peer_if.detach = Mock()
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac_central] = mock_peer_if
ble_interface.address_to_interface[mac_other] = mock_peer_if
ble_interface.address_to_identity[mac_central] = identity
ble_interface.address_to_identity[mac_other] = identity
ble_interface.identity_to_address[identity_hash] = mac_central
# Central disconnects
ble_interface.handle_central_disconnected(mac_central)
# Interface should NOT be detached
mock_peer_if.detach.assert_not_called()
assert identity_hash in ble_interface.spawned_interfaces
# identity_to_address should point to remaining address
assert ble_interface.identity_to_address[identity_hash] == mac_other
def test_central_disconnect_uses_address_fallback(self, ble_interface, mock_rns):
"""When no identity mapping, should use address_to_interface fallback."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
# Setup: Only address_to_interface exists (no identity mapping)
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
mock_peer_if.detach = Mock()
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac] = mock_peer_if
ble_interface.identity_to_address[identity_hash] = mac
# Note: address_to_identity is NOT set - testing fallback
# Central disconnects
ble_interface.handle_central_disconnected(mac)
# Should still schedule detach via fallback
assert identity_hash in ble_interface._pending_detach
class TestSpawnPeerInterfaceThreadSafety:
"""Test thread-safety of _spawn_peer_interface."""
def test_spawn_interface_reuses_existing(self, ble_interface, mock_rns):
"""Should reuse existing interface for same identity at new address."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac_old = "AA:BB:CC:DD:EE:01"
mac_new = "AA:BB:CC:DD:EE:02"
# Create existing interface
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
mock_peer_if.online = True
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[mac_old] = mock_peer_if
# Spawn interface for new address with same identity
result = ble_interface._spawn_peer_interface(
mac_new, "RNS-peer", identity, client=None, mtu=185
)
# Should return existing interface
assert result == mock_peer_if
# New address should be mapped
assert ble_interface.address_to_interface[mac_new] == mock_peer_if
assert ble_interface.address_to_identity[mac_new] == identity
assert ble_interface.identity_to_address[identity_hash] == mac_new
def test_spawn_interface_cancels_pending_detach(self, ble_interface, mock_rns):
"""Spawning interface should cancel any pending detach."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
# Create existing interface with pending detach
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
mock_peer_if.online = False
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface._pending_detach[identity_hash] = time.time() - 1
# Spawn interface (reuse)
result = ble_interface._spawn_peer_interface(
mac, "RNS-peer", identity, client=None, mtu=185
)
# Pending detach should be cancelled
assert identity_hash not in ble_interface._pending_detach
# Interface should be marked online
assert result.online is True
class TestDeviceConnectedCallback:
"""Test _device_connected_callback with pending identity handling."""
def test_connected_cancels_pending_detach(self, ble_interface, mock_rns):
"""Connection with identity should cancel pending detach."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
# Setup: pending detach exists
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface._pending_detach[identity_hash] = time.time() - 1
# Mock driver role
ble_interface.driver.get_peer_role = Mock(return_value="central")
ble_interface._check_duplicate_identity = Mock(return_value=False)
ble_interface._record_connection_success = Mock()
ble_interface._spawn_peer_interface = Mock(return_value=mock_peer_if)
ble_interface._ensure_fragmenter = Mock()
# Device connects
ble_interface._device_connected_callback(mac, identity)
# Pending detach should be cancelled
assert identity_hash not in ble_interface._pending_detach
def test_connected_removes_from_pending_identity(self, ble_interface, mock_rns):
"""Connection with identity should remove from pending identity tracking."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
mac = "AA:BB:CC:DD:EE:FF"
# Setup: pending identity exists
ble_interface._pending_identity_connections[mac] = time.time() - 5
# Mock driver role
ble_interface.driver.get_peer_role = Mock(return_value="central")
ble_interface._check_duplicate_identity = Mock(return_value=False)
ble_interface._record_connection_success = Mock()
ble_interface._spawn_peer_interface = Mock(return_value=Mock())
ble_interface._ensure_fragmenter = Mock()
# Device connects
ble_interface._device_connected_callback(mac, identity)
# Should be removed from pending tracking
assert mac not in ble_interface._pending_identity_connections
def test_peripheral_connection_tracked_for_timeout(self, ble_interface, mock_rns):
"""Peripheral connection without identity should be tracked for timeout."""
mac = "AA:BB:CC:DD:EE:FF"
# Mock driver role (peripheral = waiting for identity handshake)
ble_interface.driver.get_peer_role = Mock(return_value="peripheral")
# Device connects as peripheral (no identity yet)
ble_interface._device_connected_callback(mac, None)
# Should be tracked for identity timeout
assert mac in ble_interface._pending_identity_connections
class TestCleanupStaleAddress:
"""Test _cleanup_stale_address (renamed from _cleanup_stale_interface)."""
def test_cleanup_preserves_interface(self, ble_interface, mock_rns):
"""Cleanup should preserve interface for reuse."""
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
identity_hash = identity[:8].hex()
old_mac = "AA:BB:CC:DD:EE:01"
# Setup: interface and mappings
mock_peer_if = Mock()
mock_peer_if.peer_identity = identity
mock_peer_if.detach = Mock()
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
ble_interface.address_to_interface[old_mac] = mock_peer_if
ble_interface.address_to_identity[old_mac] = identity
ble_interface.identity_to_address[identity_hash] = old_mac
ble_interface.pending_mtu[old_mac] = 185
# Cleanup old address
ble_interface._cleanup_stale_address(identity_hash, old_mac)
# Interface should NOT be detached (preserved for reuse)
mock_peer_if.detach.assert_not_called()
assert identity_hash in ble_interface.spawned_interfaces
# Old address mappings should be cleaned
assert old_mac not in ble_interface.address_to_interface
assert old_mac not in ble_interface.address_to_identity
assert old_mac not in ble_interface.pending_mtu
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View file

@ -360,7 +360,7 @@ class TestMACRotationBypassesSorting:
list regardless of MAC sorting. Previously, the code fell through to the MAC
sorting check which could skip the peer if local MAC > peer MAC.
Fix: After _cleanup_stale_interface(), immediately add peer and continue,
Fix: After _cleanup_stale_address(), immediately add peer and continue,
bypassing the MAC sorting check.
"""
@ -419,7 +419,7 @@ class TestMACRotationBypassesSorting:
"MAC rotation should bypass MAC sorting and add peer"
def test_mac_rotation_cleanup_is_called(self):
"""Test that _cleanup_stale_interface is called during MAC rotation."""
"""Test that _cleanup_stale_address is called during MAC rotation."""
driver = MockBLEDriver(local_address="FF:FF:FF:FF:FF:FF")
owner = MockOwner()
@ -430,13 +430,13 @@ class TestMACRotationBypassesSorting:
# Track cleanup calls
cleanup_calls = []
original_cleanup = interface._cleanup_stale_interface
original_cleanup = interface._cleanup_stale_address
def tracked_cleanup(identity_hash, old_address):
cleanup_calls.append((identity_hash, old_address))
return original_cleanup(identity_hash, old_address)
interface._cleanup_stale_interface = tracked_cleanup
interface._cleanup_stale_address = tracked_cleanup
# Set up MAC rotation scenario
old_address = "AA:AA:AA:AA:AA:AA"