From c76cfc1dd01964d889f940f5ae7d29cfd102bf6a Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Thu, 1 Jan 2026 13:23:43 -0500 Subject: [PATCH] fix(ble): prevent interface/fragmenter loss during MAC rotation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rewrite _validate_spawned_interfaces() with 3-pass approach: - Pass 1: Collect orphaned addresses - Pass 2: Clean up address mappings, track interfaces to detach - Pass 3: Only detach interfaces with zero connected addresses - Fragmenters only cleaned up when interface fully detached - Enhance _spawn_peer_interface() reuse logic: - Update address_to_identity and identity_to_address when reusing - Cancel pending detach for the identity - Mark interface as online - Fix disconnect callbacks to preserve fragmenters: - _device_disconnected_callback: defer fragmenter cleanup to grace period - handle_central_disconnected: same fragmenter preservation - _process_pending_detaches: clean up fragmenters on actual detach - Rename _cleanup_stale_interface() to _cleanup_stale_address(): - No longer detaches interface during MAC rotation - Only cleans up stale address-specific mappings - Interface preserved for reuse with new address Fixes orphaned peer interfaces and "No fragmenter for peer" warnings during BLE MAC address rotation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/ble_reticulum/BLEInterface.py | 345 +++++++++++++++++++----------- 1 file changed, 224 insertions(+), 121 deletions(-) diff --git a/src/ble_reticulum/BLEInterface.py b/src/ble_reticulum/BLEInterface.py index 459235e..d52f055 100644 --- a/src/ble_reticulum/BLEInterface.py +++ b/src/ble_reticulum/BLEInterface.py @@ -387,6 +387,11 @@ class BLEInterface(Interface): self._pending_identity_connections = {} self._pending_identity_timeout = 30 # seconds + # Pending interface detachments with grace period (identity_hash -> timestamp) + # Allows new connections to establish before detaching the interface + self._pending_detach = {} + self._pending_detach_grace_period = 2.0 # seconds + # Fragmentation self.fragmenters = {} # address -> BLEFragmenter (per MTU) self.reassemblers = {} # address -> BLEReassembler @@ -684,7 +689,7 @@ class BLEInterface(Interface): self.cleanup_timer = threading.Timer(30.0, self._periodic_cleanup_task) self.cleanup_timer.daemon = True self.cleanup_timer.start() - RNS.log(f"{self} cleanup timer started (30s interval)", RNS.LOG_DEBUG) + RNS.log(f"{self} cleanup timer started (30s interval)", RNS.LOG_WARNING) def _periodic_cleanup_task(self): """ @@ -699,7 +704,7 @@ class BLEInterface(Interface): if not self.online: return # Don't reschedule if interface is offline - RNS.log(f"{self} periodic cleanup running, pending identity connections: {len(self._pending_identity_connections)}", RNS.LOG_DEBUG) + RNS.log(f"{self} periodic cleanup running, pending identity connections: {len(self._pending_identity_connections)}", RNS.LOG_WARNING) with self.frag_lock: total_cleaned = 0 @@ -720,6 +725,9 @@ class BLEInterface(Interface): # Check for pending connections that never received identity (timeout) self._cleanup_pending_identity_connections() + # Process pending interface detachments (after grace period) + self._process_pending_detaches() + # Reschedule for next cleanup cycle self._start_cleanup_timer() @@ -753,6 +761,60 @@ class BLEInterface(Interface): except Exception as e: RNS.log(f"{self} error disconnecting timed-out connection {address}: {e}", RNS.LOG_ERROR) + def _process_pending_detaches(self): + """ + Process pending interface detachments after grace period. + + When an address disconnects, we schedule the interface for delayed detachment + to allow new connections with the same identity to establish first (MAC rotation). + After the grace period, we check again if any addresses are connected with that + identity - if not, we detach the interface. + """ + now = time.time() + to_detach = [] + + for identity_hash, scheduled_time in list(self._pending_detach.items()): + elapsed = now - scheduled_time + if elapsed >= self._pending_detach_grace_period: + # Grace period expired - check if any addresses now have this identity + has_connected_address = False + for addr, identity in self.address_to_identity.items(): + if self._compute_identity_hash(identity) == identity_hash: + has_connected_address = True + break + + if has_connected_address: + # New connection arrived during grace period - cancel detach + RNS.log(f"{self} cancelled detach for {identity_hash[:8]} - address reconnected during grace period", RNS.LOG_DEBUG) + else: + # No connections - safe to detach + to_detach.append(identity_hash) + + # Actually detach interfaces + for identity_hash in to_detach: + del self._pending_detach[identity_hash] + peer_if = self.spawned_interfaces.get(identity_hash) + if peer_if: + # Get peer_identity for fragmenter cleanup before detaching + peer_identity = peer_if.peer_identity + peer_if.detach() + RNS.log(f"{self} detached interface for {identity_hash[:8]} after grace period", RNS.LOG_DEBUG) + if identity_hash in self.spawned_interfaces: + del self.spawned_interfaces[identity_hash] + if identity_hash in self.identity_to_address: + del self.identity_to_address[identity_hash] + # Clean up fragmenter/reassembler now that interface is fully detached + if peer_identity: + frag_key = self._get_fragmenter_key(peer_identity, "") # Address unused in key computation + with self.frag_lock: + if frag_key in self.fragmenters: + del self.fragmenters[frag_key] + RNS.log(f"{self} cleaned up fragmenter for {identity_hash[:8]}", RNS.LOG_DEBUG) + if frag_key in self.reassemblers: + del self.reassemblers[frag_key] + else: + RNS.log(f"{self} pending detach for {identity_hash[:8]} but interface already gone", RNS.LOG_DEBUG) + def _validate_spawned_interfaces(self): """ Validate that all spawned interfaces have actual underlying connections. @@ -765,48 +827,73 @@ class BLEInterface(Interface): # Get list of actually connected peers from driver connected_addresses = set(self.driver.connected_peers) - # Check all address_to_interface entries - orphaned = [] - for address, peer_if in list(self.address_to_interface.items()): + # First pass: collect orphaned address mappings (addresses not in connected_addresses) + orphaned_addresses = [] + for address in list(self.address_to_interface.keys()): if address not in connected_addresses: - # Connection is gone but interface remains - orphaned.append((address, peer_if)) + orphaned_addresses.append(address) - # Clean up orphaned interfaces - for address, peer_if in orphaned: - RNS.log(f"{self} cleaning up orphaned interface for {address} (no active connection)", RNS.LOG_WARNING) + # Second pass: for each orphaned address, clean up mappings and check if interface should be detached + interfaces_to_detach = set() # Use set to avoid detaching same interface multiple times - # Get identity info from interface + for address in orphaned_addresses: + peer_if = self.address_to_interface.get(address) + if not peer_if: + continue + + RNS.log(f"{self} cleaning up orphaned address mapping for {address}", RNS.LOG_DEBUG) + + # Get identity info peer_identity = None identity_hash = None if peer_if.peer_identity: peer_identity = peer_if.peer_identity identity_hash = self._compute_identity_hash(peer_identity) - # Detach the interface - peer_if.detach() - - # Remove from all tracking dicts + # Remove address-specific mappings if address in self.address_to_interface: del self.address_to_interface[address] - if identity_hash and identity_hash in self.spawned_interfaces: - del self.spawned_interfaces[identity_hash] if address in self.address_to_identity: del self.address_to_identity[address] - if identity_hash and identity_hash in self.identity_to_address: - del self.identity_to_address[identity_hash] - # Clean up fragmentation state + # NOTE: Do NOT clean up fragmenters here - they are keyed by identity, not address + # Fragmenters are only cleaned up when the interface is fully detached (third pass) + + # Check if ANY other addresses still use this identity/interface + if identity_hash: + other_addresses_connected = False + for other_addr in list(self.address_to_interface.keys()): + if other_addr in connected_addresses: + other_if = self.address_to_interface.get(other_addr) + if other_if == peer_if: + other_addresses_connected = True + # Update identity_to_address to point to a connected address + self.identity_to_address[identity_hash] = other_addr + break + + if not other_addresses_connected: + # No other addresses connected with this identity - mark for detach + interfaces_to_detach.add((peer_if, identity_hash, peer_identity)) + + # Third pass: detach interfaces that have no connected addresses + for peer_if, identity_hash, peer_identity in interfaces_to_detach: + RNS.log(f"{self} detaching orphaned interface for {identity_hash[:8]} (no active connections)", RNS.LOG_WARNING) + peer_if.detach() + if identity_hash in self.spawned_interfaces: + del self.spawned_interfaces[identity_hash] + if identity_hash in self.identity_to_address: + del self.identity_to_address[identity_hash] + # Clean up fragmenter/reassembler only when interface is fully detached if peer_identity: - frag_key = self._get_fragmenter_key(peer_identity, address) + frag_key = self._get_fragmenter_key(peer_identity, "") # Address unused in key computation with self.frag_lock: if frag_key in self.fragmenters: del self.fragmenters[frag_key] if frag_key in self.reassemblers: del self.reassemblers[frag_key] - if orphaned: - RNS.log(f"{self} periodic validation: cleaned up {len(orphaned)} orphaned interface(s)", RNS.LOG_INFO) + if orphaned_addresses: + RNS.log(f"{self} periodic validation: cleaned up {len(orphaned_addresses)} orphaned address(es), detached {len(interfaces_to_detach)} interface(s)", RNS.LOG_INFO) except Exception as e: RNS.log(f"{self} error during interface validation (non-fatal): {e}", RNS.LOG_WARNING) @@ -882,6 +969,11 @@ class BLEInterface(Interface): if len(peer_identity) == 16: identity_hash = self._compute_identity_hash(peer_identity) + # Cancel any pending detach for this identity (new connection arrived in time) + if identity_hash in self._pending_detach: + del self._pending_detach[identity_hash] + RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]} (new connection from {address})", RNS.LOG_DEBUG) + # Store identity mappings self.address_to_identity[address] = peer_identity self.identity_to_address[identity_hash] = address @@ -1131,85 +1223,80 @@ class BLEInterface(Interface): peer_identity = peer_if.peer_identity identity_hash = self._compute_identity_hash(peer_identity) - # Detach interface if found - if peer_if: - peer_if.detach() - RNS.log(f"{self} detached interface for {address}", RNS.LOG_DEBUG) - - # Clean up spawned_interfaces dict - if identity_hash and identity_hash in self.spawned_interfaces: - del self.spawned_interfaces[identity_hash] - else: - RNS.log(f"{self} no interface found for disconnected {address} (may have been cleaned already)", RNS.LOG_DEBUG) - - # Always clean up address_to_interface mapping + # Clean up address-specific mappings first (before checking for other addresses) if address in self.address_to_interface: del self.address_to_interface[address] - # Clean up identity mappings if address in self.address_to_identity: del self.address_to_identity[address] RNS.log(f"{self} cleaned up address_to_identity for {address}", RNS.LOG_DEBUG) - if identity_hash and identity_hash in self.identity_to_address: - del self.identity_to_address[identity_hash] - RNS.log(f"{self} cleaned up identity_to_address for {identity_hash}", RNS.LOG_DEBUG) - # Clean up fragmenter/reassembler - if peer_identity: - frag_key = self._get_fragmenter_key(peer_identity, address) - with self.frag_lock: - if frag_key in self.fragmenters: - del self.fragmenters[frag_key] - if frag_key in self.reassemblers: - del self.reassemblers[frag_key] + # Check if any OTHER addresses still have the same identity + # If so, keep the peer interface alive - only detach when ALL addresses are gone + other_addresses_with_identity = [] + if identity_hash: + for other_addr, other_identity in self.address_to_identity.items(): + if other_addr != address: + other_hash = self._compute_identity_hash(other_identity) + if other_hash == identity_hash: + other_addresses_with_identity.append(other_addr) + + if other_addresses_with_identity: + # Other addresses still connected with same identity - keep interface AND fragmenter alive + RNS.log(f"{self} keeping peer interface for {identity_hash[:8]} alive, other addresses still connected: {other_addresses_with_identity}", RNS.LOG_DEBUG) + # Update identity_to_address to point to one of the remaining addresses + self.identity_to_address[identity_hash] = other_addresses_with_identity[0] + # Cancel any pending detach for this identity + if identity_hash in self._pending_detach: + del self._pending_detach[identity_hash] + RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]}", RNS.LOG_DEBUG) + # NOTE: Do NOT clean up fragmenter - it's keyed by identity, not address + # Other addresses are still using it + else: + # No other addresses with this identity YET - schedule detach with grace period + # This allows new connections with the same identity to establish before detaching + if peer_if and identity_hash: + self._pending_detach[identity_hash] = time.time() + RNS.log(f"{self} scheduled detach for {identity_hash[:8]} in {self._pending_detach_grace_period}s", RNS.LOG_DEBUG) + elif not peer_if: + RNS.log(f"{self} no interface found for disconnected {address} (may have been cleaned already)", RNS.LOG_DEBUG) + # NOTE: Fragmenter cleanup happens in _process_pending_detaches after grace period + # This gives new connections time to establish before removing the fragmenter # Clean up pending MTU (from MTU/identity race condition) if address in self.pending_mtu: del self.pending_mtu[address] - def _cleanup_stale_interface(self, identity_hash: str, old_address: str): + def _cleanup_stale_address(self, identity_hash: str, old_address: str): """ - Clean up stale interface after MAC rotation. + Clean up stale address mappings after MAC rotation. Called when we detect the same identity at a new MAC address but the - old connection is no longer alive. This allows reconnection to the - peer at their new MAC address. + old connection is no longer alive. This cleans up old address mappings + but KEEPS the interface alive for reuse with the new address. Args: identity_hash: 16-character hex hash of the peer's identity old_address: The old MAC address that is no longer valid """ - # Get peer identity for fragmenter cleanup - peer_identity = self.address_to_identity.get(old_address) + # NOTE: Do NOT detach the interface here! The interface should be REUSED + # for the new address. _spawn_peer_interface() handles reuse automatically. + # We only clean up stale address-specific mappings. - # Detach and remove old interface - if identity_hash in self.spawned_interfaces: - old_interface = self.spawned_interfaces.pop(identity_hash) - old_interface.detach() - RNS.log(f"{self} detached stale interface for {identity_hash[:8]}", RNS.LOG_DEBUG) - - # Clean up address mappings (both directions) - if identity_hash in self.identity_to_address: - del self.identity_to_address[identity_hash] + # Clean up old address mappings (but keep identity_to_address - it will be updated) if old_address in self.address_to_identity: del self.address_to_identity[old_address] if old_address in self.address_to_interface: del self.address_to_interface[old_address] - # Clean up fragmenter/reassembler for old address - if peer_identity: - frag_key = self._get_fragmenter_key(peer_identity, old_address) - with self.frag_lock: - if frag_key in self.fragmenters: - del self.fragmenters[frag_key] - if frag_key in self.reassemblers: - del self.reassemblers[frag_key] - # Clean up pending MTU for old address if old_address in self.pending_mtu: del self.pending_mtu[old_address] - RNS.log(f"{self} cleaned up stale state for {old_address}", RNS.LOG_DEBUG) + # NOTE: Do NOT clean up fragmenters here - they are keyed by identity, not address + # The fragmenter will continue to work with the new address + + RNS.log(f"{self} cleaned up stale address mappings for {old_address} (interface preserved for reuse)", RNS.LOG_DEBUG) def _address_changed_callback(self, old_address: str, new_address: str, identity_hash: str): """ @@ -1499,10 +1586,11 @@ class BLEInterface(Interface): RNS.LOG_DEBUG) continue else: - # Old connection dead - clean up and allow new connection - RNS.log(f"{self} [v2.2] MAC rotation: {identity_hash[:8]} moved from {existing_address[-8:]} to {address[-8:]}, cleaning up stale interface", + # Old connection dead - clean up stale address mappings and allow new connection + # NOTE: Interface is preserved for reuse, not detached + RNS.log(f"{self} [v2.2] MAC rotation: {identity_hash[:8]} moved from {existing_address[-8:]} to {address[-8:]}, cleaning up stale address", RNS.LOG_INFO) - self._cleanup_stale_interface(identity_hash, existing_address) + self._cleanup_stale_address(identity_hash, existing_address) # Bypass MAC sorting - we must reconnect after MAC rotation # regardless of which device has the higher MAC address score = self._score_peer(peer) @@ -1699,29 +1787,40 @@ class BLEInterface(Interface): # Compute lookup key using identity hash identity_hash = self._compute_identity_hash(peer_identity) - # Check if interface already exists (MAC rotation causes same identity at different addresses) - if identity_hash in self.spawned_interfaces: - existing_if = self.spawned_interfaces[identity_hash] - # Update address_to_interface for the new address (critical for cleanup) - self.address_to_interface[address] = existing_if - RNS.log(f"{self} interface already exists for {name} ({identity_hash[:8]}), reusing (added address mapping for {address})", RNS.LOG_DEBUG) - return existing_if + # Use lock to prevent race condition where two threads create interfaces + # for the same identity simultaneously (e.g., central and peripheral callbacks) + with self.peer_lock: + # Check if interface already exists (MAC rotation causes same identity at different addresses) + if identity_hash in self.spawned_interfaces: + existing_if = self.spawned_interfaces[identity_hash] + # Update all address mappings for the new address (critical for cleanup and routing) + self.address_to_interface[address] = existing_if + self.address_to_identity[address] = peer_identity + self.identity_to_address[identity_hash] = address + # Cancel any pending detach for this identity - new connection arrived + if identity_hash in self._pending_detach: + del self._pending_detach[identity_hash] + RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]} (new connection at {address})", RNS.LOG_DEBUG) + # Mark interface as online in case it was marked offline during pending detach + existing_if.online = True + RNS.log(f"{self} interface already exists for {name} ({identity_hash[:8]}), reusing (added address mapping for {address})", RNS.LOG_DEBUG) + return existing_if - # Create new peer interface - peer_if = BLEPeerInterface(self, address, name, peer_identity) - peer_if.OUT = self.OUT - peer_if.IN = self.IN - peer_if.parent_interface = self - peer_if.bitrate = self.bitrate - peer_if.HW_MTU = self.HW_MTU - peer_if.online = True + # Create new peer interface + peer_if = BLEPeerInterface(self, address, name, peer_identity) + peer_if.OUT = self.OUT + peer_if.IN = self.IN + peer_if.parent_interface = self + peer_if.bitrate = self.bitrate + peer_if.HW_MTU = self.HW_MTU + peer_if.online = True - # Register with transport - RNS.Transport.interfaces.append(peer_if) + # Register with transport + RNS.Transport.interfaces.append(peer_if) - # Store in tracking dicts (dual-indexed for reliable cleanup) - self.spawned_interfaces[identity_hash] = peer_if - self.address_to_interface[address] = peer_if + # Store in tracking dicts (dual-indexed for reliable cleanup) + self.spawned_interfaces[identity_hash] = peer_if + self.address_to_interface[address] = peer_if RNS.log(f"{self} created peer interface for {name} ({identity_hash[:8]}), type={connection_type}", RNS.LOG_INFO) @@ -2012,39 +2111,43 @@ class BLEInterface(Interface): peer_identity = peer_if.peer_identity identity_hash = self._compute_identity_hash(peer_identity) - # Detach interface if found - if peer_if: - peer_if.detach() - RNS.log(f"{self} detached interface for {address}", RNS.LOG_DEBUG) - - # Clean up spawned_interfaces dict - if identity_hash and identity_hash in self.spawned_interfaces: - del self.spawned_interfaces[identity_hash] - else: - RNS.log(f"{self} no interface found for disconnected central {address} (may have been cleaned already)", RNS.LOG_DEBUG) - - # Always clean up address_to_interface mapping + # Clean up address-specific mappings first (before checking for other addresses) if address in self.address_to_interface: del self.address_to_interface[address] - # Clean up identity mappings if address in self.address_to_identity: del self.address_to_identity[address] RNS.log(f"{self} cleaned up address_to_identity for {address}", RNS.LOG_DEBUG) - if identity_hash and identity_hash in self.identity_to_address: - del self.identity_to_address[identity_hash] - RNS.log(f"{self} cleaned up identity_to_address for {identity_hash}", RNS.LOG_DEBUG) - # Clean up fragmenter/reassembler - if peer_identity: - frag_key = self._get_fragmenter_key(peer_identity, address) - with self.frag_lock: - if frag_key in self.reassemblers: - del self.reassemblers[frag_key] - RNS.log(f"{self} cleaned up reassembler for {address}", RNS.LOG_DEBUG) - if frag_key in self.fragmenters: - del self.fragmenters[frag_key] - RNS.log(f"{self} cleaned up fragmenter for {address}", RNS.LOG_DEBUG) + # Check if any OTHER addresses still have the same identity + # If so, keep the peer interface alive - only detach when ALL addresses are gone + other_addresses_with_identity = [] + if identity_hash: + for other_addr, other_identity in self.address_to_identity.items(): + if other_addr != address: + other_hash = self._compute_identity_hash(other_identity) + if other_hash == identity_hash: + other_addresses_with_identity.append(other_addr) + + if other_addresses_with_identity: + # Other addresses still connected with same identity - keep interface AND fragmenter alive + RNS.log(f"{self} keeping peer interface for {identity_hash[:8]} alive (central disconnect), other addresses: {other_addresses_with_identity}", RNS.LOG_DEBUG) + # Update identity_to_address to point to one of the remaining addresses + self.identity_to_address[identity_hash] = other_addresses_with_identity[0] + # Cancel any pending detach for this identity + if identity_hash in self._pending_detach: + del self._pending_detach[identity_hash] + RNS.log(f"{self} cancelled pending detach for {identity_hash[:8]}", RNS.LOG_DEBUG) + # NOTE: Do NOT clean up fragmenter - it's keyed by identity, not address + # Other addresses are still using it + else: + # No other addresses with this identity YET - schedule detach with grace period + if peer_if and identity_hash: + self._pending_detach[identity_hash] = time.time() + RNS.log(f"{self} scheduled detach for {identity_hash[:8]} in {self._pending_detach_grace_period}s (central)", RNS.LOG_DEBUG) + elif not peer_if: + RNS.log(f"{self} no interface found for disconnected central {address} (may have been cleaned already)", RNS.LOG_DEBUG) + # NOTE: Fragmenter cleanup happens in _process_pending_detaches after grace period def process_incoming(self, data): """