diff --git a/src/RNS/Interfaces/BLEFragmentation.py b/src/RNS/Interfaces/BLEFragmentation.py index 60c7025..87bea23 100644 --- a/src/RNS/Interfaces/BLEFragmentation.py +++ b/src/RNS/Interfaces/BLEFragmentation.py @@ -89,10 +89,6 @@ class BLEFragmenter: Returns: list of bytes, each element is one BLE fragment with header + data """ - # DIAGNOSTIC: Entry logging - if RNS: - RNS.log(f"BLEFragmenter: ENTRY fragment_packet({len(packet) if isinstance(packet, bytes) else 'NOT BYTES'} bytes)", RNS.LOG_DEBUG) - if not isinstance(packet, bytes): raise TypeError("Packet must be bytes") @@ -220,10 +216,6 @@ class BLEReassembler: Raises: ValueError: If fragment is malformed """ - # DIAGNOSTIC: Entry logging - if RNS: - RNS.log(f"BLEReassembler: ENTRY receive_fragment({len(fragment) if isinstance(fragment, bytes) else 'NOT BYTES'} bytes, sender={sender_id})", RNS.LOG_DEBUG) - if not isinstance(fragment, bytes): raise TypeError("Fragment must be bytes") diff --git a/src/RNS/Interfaces/BLEGATTServer.py b/src/RNS/Interfaces/BLEGATTServer.py index 6b2c5cc..848b308 100644 --- a/src/RNS/Interfaces/BLEGATTServer.py +++ b/src/RNS/Interfaces/BLEGATTServer.py @@ -153,10 +153,6 @@ class BLEGATTServer: Returns: value: Echo back the value (required by bluezero) """ - # DIAGNOSTIC: Entry point for peripheral data reception - value_len = len(value) if hasattr(value, '__len__') else 'N/A' - self._log(f"_handle_write_rx ENTRY: value_len={value_len}, options_keys={list(options.keys())}", level="DEBUG") - # Convert to bytes - ensure we always have bytes type if isinstance(value, list): data = bytes(value) @@ -192,9 +188,7 @@ class BLEGATTServer: self._log(f"Updated MTU for {central_address}: {old_mtu} -> {mtu}", level="DEBUG") # Pass data to callback for processing - # IMPORTANT: Ensure data is bytes before passing to reassembler if self.on_data_received: - self._log(f"DIAGNOSTIC: on_data_received callback EXISTS, preparing to call with {len(data)} bytes for {central_address}", level="DEBUG") try: # Verify data is bytes before callback if not isinstance(data, bytes): @@ -202,15 +196,13 @@ class BLEGATTServer: data = bytes(data) # Call the callback (synchronous call - runs in bluezero thread) - self._log(f"DIAGNOSTIC: CALLING on_data_received({len(data)} bytes, {central_address})", level="DEBUG") self.on_data_received(data, central_address) - self._log(f"DIAGNOSTIC: on_data_received RETURNED successfully", level="DEBUG") except Exception as e: self._log(f"ERROR in data received callback: {type(e).__name__}: {e}", level="ERROR") import traceback self._log(f"Traceback: {traceback.format_exc()}", level="ERROR") else: - self._log(f"DIAGNOSTIC: on_data_received callback is NONE! Data LOST: {len(data)} bytes from {central_address}", level="ERROR") + self._log(f"on_data_received callback is NONE! Data LOST: {len(data)} bytes from {central_address}", level="ERROR") return value # bluezero expects us to return the value @@ -270,10 +262,6 @@ class BLEGATTServer: self._log(f"Central connected: {central_address} (MTU: {effective_mtu})", level="INFO") - # DIAGNOSTIC: Check callback registration and invoke - callback_registered = self.on_central_connected is not None - self._log(f"on_central_connected callback: registered={callback_registered}", level="DEBUG") - if self.on_central_connected: try: self._log(f"Invoking on_central_connected({central_address})...", level="DEBUG") @@ -383,7 +371,7 @@ class BLEGATTServer: chr_id=2, uuid=self.TX_CHAR_UUID, value=[], - notifying=True, # Enable notifications + notifying=True, flags=['read', 'notify'] ) self._log(f"Added TX characteristic: {self.TX_CHAR_UUID} (READ, NOTIFY)", level="DEBUG") diff --git a/src/RNS/Interfaces/BLEInterface.py b/src/RNS/Interfaces/BLEInterface.py index 564b887..ebd8be0 100644 --- a/src/RNS/Interfaces/BLEInterface.py +++ b/src/RNS/Interfaces/BLEInterface.py @@ -404,10 +404,8 @@ class BLEInterface(Interface): self.peers = {} # address -> (client, last_seen, mtu) self.peer_lock = threading.Lock() - # NEW: Identity-based interface tracking (unified dual-connection architecture) - self.spawned_interfaces = {} # identity_hash -> BLEPeerInterface (unified interface per peer) - # OLD format (legacy): "AA:BB:CC:DD:EE:FF-central" or "AA:BB:CC:DD:EE:FF-peripheral" - # NEW format: identity_hash (first 16 hex chars of full hash) + # Identity-based interface tracking + self.spawned_interfaces = {} # identity_hash (16 hex chars) -> BLEPeerInterface self.address_to_identity = {} # address -> peer_identity (16-byte identity) self.identity_to_address = {} # identity_hash -> address (for reverse lookup) @@ -525,17 +523,18 @@ class BLEInterface(Interface): def _start_gatt_when_identity_ready(self): """ Background thread that waits for Transport.identity, sets it on GATT server, - then starts the server. No timeout - identity loading is guaranteed. + then starts the server. Times out after 60 seconds if identity doesn't load. """ import RNS.Transport as Transport attempt = 0 start_time = time.time() + timeout = 60.0 # 60 second timeout RNS.log(f"{self} Waiting for Transport.identity to be loaded...", RNS.LOG_DEBUG) - # Poll until Transport.identity is available (no timeout - it WILL load) - while True: + # Poll until Transport.identity is available (with 60s timeout) + while time.time() - start_time < timeout: attempt += 1 try: @@ -578,6 +577,10 @@ class BLEInterface(Interface): time.sleep(0.1) # Poll every 100ms + # Timeout reached + RNS.log(f"{self} TIMEOUT waiting for Transport.identity after {timeout}s - GATT server will NOT start!", RNS.LOG_ERROR) + RNS.log(f"{self} BLE peripheral mode disabled due to identity timeout", RNS.LOG_ERROR) + def _run_async_loop(self): """Run the asyncio event loop in a separate thread.""" self.loop = asyncio.new_event_loop() @@ -921,9 +924,9 @@ class BLEInterface(Interface): def detection_callback(device, advertisement_data): """Callback invoked for each discovered BLE device.""" # Debug: Log ALL devices to diagnose why matching fails - RNS.log(f"{self} DEBUG: Device {device.address} name={device.name} " - f"service_uuids={advertisement_data.service_uuids} " - f"local_name={advertisement_data.local_name}", RNS.LOG_DEBUG) + # RNS.log(f"{self} DEBUG: Device {device.address} name={device.name} " + # f"service_uuids={advertisement_data.service_uuids} " + # f"local_name={advertisement_data.local_name}", RNS.LOG_DEBUG) discovered_devices.append((device, advertisement_data)) # Scan duration based on power mode @@ -983,6 +986,23 @@ class BLEInterface(Interface): matched = True match_method = "service UUID" + # Protocol v2.2: Check for manufacturer data with identity + # If present, extract identity immediately (faster than GATT read) + if hasattr(adv_data, 'manufacturer_data') and 0xFFFF in adv_data.manufacturer_data: + try: + mfg_data = bytes(adv_data.manufacturer_data[0xFFFF]) + if len(mfg_data) == 16: + # This is a Reticulum identity hash! + peer_identity = mfg_data + self.address_to_identity[device.address] = peer_identity + identity_hex = peer_identity.hex() + self.identity_to_address[identity_hex[:16]] = device.address + match_method = "service UUID + manufacturer data (identity)" + RNS.log(f"{self} [v2.2] parsed identity from manufacturer data (0xFFFF): {identity_hex[:16]}...", + RNS.LOG_INFO) + except Exception as e: + RNS.log(f"{self} failed to parse manufacturer data: {e}", RNS.LOG_DEBUG) + # Fallback: Match by device name pattern # Protocol v2.1: Extract identity from device name (format: RNS-{16-char-hex-hash}) # This bypasses bluezero service_uuid bug where service_uuids=[] in Bleak scans @@ -1210,6 +1230,38 @@ class BLEInterface(Interface): if address in self.peers: continue + # Protocol v2.2: Skip if interface exists for this identity (any connection type) + # This prevents dual connections (central + peripheral to same peer) + peer_identity = self.address_to_identity.get(address) + if peer_identity: + identity_hash = self._compute_identity_hash(peer_identity) + if identity_hash in self.spawned_interfaces: + RNS.log(f"{self} [v2.2] skipping {peer.name} - interface exists for identity {identity_hash[:8]}", + RNS.LOG_DEBUG) + continue + + # Protocol v2.2: MAC address sorting - deterministic connection direction + # Lower MAC initiates (central), higher MAC only accepts (peripheral) + # This prevents simultaneous connection attempts from both sides + if self.local_address is not None: + try: + # Normalize addresses (remove colons) + my_mac = self.local_address.replace(":", "") + peer_mac = address.replace(":", "") + + my_mac_int = int(my_mac, 16) + peer_mac_int = int(peer_mac, 16) + + if my_mac_int > peer_mac_int: + # Our MAC is higher - let them connect to us (we stay peripheral only) + RNS.log(f"{self} [v2.2] skipping {peer.name} (MAC {address[:17]}) - " + f"connection direction: they initiate (lower MAC connects to higher)", + RNS.LOG_DEBUG) + continue + except (ValueError, AttributeError) as e: + # MAC parsing failed - fall through to normal connection logic + RNS.log(f"{self} MAC sorting failed for {peer.name}: {e}", RNS.LOG_DEBUG) + # Skip if blacklisted if self._is_blacklisted(address): continue @@ -1325,27 +1377,24 @@ class BLEInterface(Interface): Args: peer: DiscoveredPeer object to connect to """ - # Check if already connected (either as central or if they connected to us as peripheral) + # Check if already connected with self.peer_lock: if peer.address in self.peers: - RNS.log(f"{self} already connected to {peer.name} (central mode)", RNS.LOG_EXTREME) + RNS.log(f"{self} already connected to {peer.name}", RNS.LOG_EXTREME) return - # Dual-connection mode (BitChat model): Always attempt central connection - # Both devices connect to each other, creating TWO interfaces per peer: - # - "address-central" (we connect to their peripheral) - # - "address-peripheral" (they connect to our peripheral) - # Reticulum Transport handles deduplication if packets sent on both paths + # Skip if we're trying to connect to ourselves + if self.local_address and peer.address == self.local_address: + RNS.log(f"{self} skipping connection to self ({peer.address})", RNS.LOG_DEBUG) + return - # Skip if we're trying to connect to ourselves - if self.local_address and peer.address == self.local_address: - RNS.log(f"{self} skipping connection to self ({peer.address})", RNS.LOG_DEBUG) - return - - # Check if we already have a CENTRAL connection to this peer - conn_id = f"{peer.address}-central" - if conn_id in self.spawned_interfaces: - RNS.log(f"{self} already connected to {peer.name} as central", RNS.LOG_EXTREME) + # Additional check: if we have identity from discovery, verify no interface exists + # (MAC sorting should prevent this, but belt-and-suspenders) + peer_identity_preview = self.address_to_identity.get(peer.address) + if peer_identity_preview: + identity_hash = self._compute_identity_hash(peer_identity_preview) + if identity_hash in self.spawned_interfaces: + RNS.log(f"{self} interface already exists for {peer.name}", RNS.LOG_EXTREME) return # Record connection attempt @@ -1362,7 +1411,7 @@ class BLEInterface(Interface): """Called when BlueZ reports the device has disconnected""" RNS.log(f"{self} BLE client for {peer.name} ({peer.address}) disconnected unexpectedly", RNS.LOG_WARNING) - # Clean up all peer state atomically (CRITICAL #1: memory leak fix) + # Clean up all peer state atomically # This prevents fragmentation state from leaking when peers disconnect mid-transmission # 1. Clean up peer connection state @@ -1370,47 +1419,26 @@ class BLEInterface(Interface): if peer.address in self.peers: del self.peers[peer.address] - # 2. Remove central connection from unified interface + # 2. Detach interface peer_identity = self.address_to_identity.get(peer.address, None) if peer_identity: - # Protocol v2: Use identity-based lookup - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] + identity_hash = self._compute_identity_hash(peer_identity) if identity_hash in self.spawned_interfaces: peer_if = self.spawned_interfaces[identity_hash] - peer_if.remove_central_connection() + peer_if.detach() + del self.spawned_interfaces[identity_hash] + RNS.log(f"{self} detached interface for {peer.address}", RNS.LOG_DEBUG) - # If no connections remain, detach and remove - if not peer_if.has_central_connection and not peer_if.has_peripheral_connection: - peer_if.detach() - del self.spawned_interfaces[identity_hash] - RNS.log(f"{self} detached unified interface for {peer.address} (no connections remain)", RNS.LOG_DEBUG) - else: - # Protocol v1 fallback: Use address-based lookup - conn_id = f"{peer.address}-central" - if conn_id in self.spawned_interfaces: - self.spawned_interfaces[conn_id].detach() - del self.spawned_interfaces[conn_id] - RNS.log(f"{self} cleaned up legacy spawned interface for {peer.address}", RNS.LOG_DEBUG) - - # 3. Clean up fragmentation state only if no connections remain - should_cleanup_frag = True + # 3. Clean up fragmenter/reassembler if peer_identity: - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - if identity_hash in self.spawned_interfaces: - should_cleanup_frag = False # Interface still has peripheral connection - else: - # Check legacy peripheral connection - if f"{peer.address}-peripheral" in self.spawned_interfaces: - should_cleanup_frag = False - - if should_cleanup_frag: + frag_key = self._get_fragmenter_key(peer_identity, peer.address) with self.frag_lock: - if peer.address in self.fragmenters: - del self.fragmenters[peer.address] + if frag_key in self.fragmenters: + del self.fragmenters[frag_key] RNS.log(f"{self} cleaned up fragmenter for {peer.address}", RNS.LOG_DEBUG) - if peer.address in self.reassemblers: - del self.reassemblers[peer.address] + if frag_key in self.reassemblers: + del self.reassemblers[frag_key] RNS.log(f"{self} cleaned up reassembler for {peer.address}", RNS.LOG_DEBUG) # Try LE-specific connection if BlueZ >= 5.49 and we haven't confirmed ConnectDevice unavailable @@ -1506,7 +1534,7 @@ class BLEInterface(Interface): if identity_value and len(identity_value) == 16: # Store as bytes for identity-based interface tracking peer_identity = bytes(identity_value) - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] + identity_hash = self._compute_identity_hash(peer_identity) # Store identity mappings for unified interface architecture self.address_to_identity[peer.address] = peer_identity @@ -1516,22 +1544,9 @@ class BLEInterface(Interface): else: RNS.log(f"{self} invalid identity size from {peer.name}: {len(identity_value) if identity_value else 0} bytes", RNS.LOG_WARNING) else: - RNS.log(f"{self} Identity characteristic not found on {peer.name} (Protocol v1 device)", RNS.LOG_DEBUG) + RNS.log(f"{self} Identity characteristic not found on {peer.name}", RNS.LOG_WARNING) except Exception as e: - RNS.log(f"{self} failed to read identity from {peer.name}: {type(e).__name__}: {e}", RNS.LOG_DEBUG) - # Continue without identity - - # Send connection handshake WITH our identity to trigger peripheral callback - # This enables the peripheral to create a unified interface with our identity - # without needing to discover us via scanning (solves asymmetric discovery issue) - try: - # Get our own identity to send in handshake - our_identity = self.gatt_server.identity_hash if (self.gatt_server and self.gatt_server.identity_hash) else b'\x00' * 16 - await client.write_gatt_char(self.CHARACTERISTIC_RX_UUID, our_identity, response=True) - identity_preview = our_identity[:8].hex() if len(our_identity) >= 8 else "null" - RNS.log(f"{self} sent connection handshake WITH identity to {peer.name} ({len(our_identity)} bytes, {identity_preview}...)", RNS.LOG_DEBUG) - except Exception as e: - RNS.log(f"{self} handshake write failed (non-critical): {e}", RNS.LOG_WARNING) + RNS.log(f"{self} failed to read identity from {peer.name}: {type(e).__name__}: {e}", RNS.LOG_WARNING) # Get negotiated MTU try: @@ -1561,11 +1576,11 @@ class BLEInterface(Interface): self.reassemblers[frag_key] = BLEReassembler(timeout=self.connection_timeout) RNS.log(f"{self} created fragmenter/reassembler for peer (key: {frag_key[:16]})", RNS.LOG_DEBUG) - # Create or update unified peer interface with central connection - self._spawn_or_update_peer_interface( + # Create peer interface with central connection + self._spawn_peer_interface( address=peer.address, name=peer.name, - peer_identity=peer_identity, # May be None for Protocol v1 devices + peer_identity=peer_identity, client=client, mtu=mtu, connection_type="central" @@ -1615,16 +1630,21 @@ class BLEInterface(Interface): with self.peer_lock: if peer.address in self.peers: del self.peers[peer.address] - with self.frag_lock: - if peer.address in self.fragmenters: - del self.fragmenters[peer.address] - if peer.address in self.reassemblers: - del self.reassemblers[peer.address] - # Clean up central connection peer interface - conn_id = f"{peer.address}-central" - if conn_id in self.spawned_interfaces: - self.spawned_interfaces[conn_id].detach() - del self.spawned_interfaces[conn_id] + + # Clean up fragmenter/reassembler and interface + if peer_identity: + frag_key = self._get_fragmenter_key(peer_identity, peer.address) + with self.frag_lock: + if frag_key in self.fragmenters: + del self.fragmenters[frag_key] + if frag_key in self.reassemblers: + del self.reassemblers[frag_key] + + identity_hash = self._compute_identity_hash(peer_identity) + if identity_hash in self.spawned_interfaces: + self.spawned_interfaces[identity_hash].detach() + del self.spawned_interfaces[identity_hash] + await client.disconnect() # Record failure and return (don't raise exception) self._record_connection_failure(peer.address) @@ -1675,67 +1695,54 @@ class BLEInterface(Interface): def _get_fragmenter_key(self, peer_identity, peer_address): """ - Compute fragmenter/reassembler dictionary key. - - Uses identity_hash for Protocol v2 devices (survives MAC rotation), - falls back to normalized MAC for Protocol v1 legacy devices. + Compute fragmenter/reassembler dictionary key using identity hash. Args: - peer_identity: 16-byte peer identity (None for Protocol v1) - peer_address: BLE MAC address (may have "dev:" prefix) + peer_identity: 16-byte peer identity + peer_address: BLE MAC address (unused, kept for compatibility) Returns: - str: Identity hash (16 hex chars) or normalized MAC address + str: Identity hash (16 hex chars) """ - if peer_identity: - # Protocol v2: Use identity hash (immune to MAC rotation) - return RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - else: - # Protocol v1 fallback: Use normalized MAC address - return peer_address.replace("dev:", "") + return RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - def _spawn_or_update_peer_interface(self, address, name, peer_identity=None, client=None, mtu=None, connection_type="central"): + def _compute_identity_hash(self, peer_identity): """ - Create or update a unified peer interface that can handle both central and peripheral connections. + Compute 16-character hex identity hash for interface tracking. - This implements the unified interface architecture where one BLEPeerInterface manages - both connection types for a given peer identity, eliminating duplicate interfaces. + Args: + peer_identity: 16-byte peer identity + + Returns: + str: Identity hash (16 hex chars) + """ + return RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] + + def _spawn_peer_interface(self, address, name, peer_identity, client=None, mtu=None, connection_type="central"): + """ + Create a peer interface for a BLE connection. Args: address: BLE address of peer name: Name of peer device - peer_identity: 16-byte peer identity (None for Protocol v1 legacy devices) + peer_identity: 16-byte peer identity client: BleakClient instance (for central connections) mtu: Negotiated MTU (for central connections) connection_type: "central" (we connected to them) or "peripheral" (they connected to us) Returns: - BLEPeerInterface: The spawned or updated interface + BLEPeerInterface: The spawned interface """ - # Compute lookup key: identity_hash for v2, address-based for v1 legacy - if peer_identity: - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - else: - # Legacy Protocol v1 device - use address-based key - identity_hash = f"{address}-{connection_type}" - RNS.log(f"{self} no identity for {name}, using legacy address-based tracking", RNS.LOG_DEBUG) + # Compute lookup key using identity hash + identity_hash = self._compute_identity_hash(peer_identity) - # Check if unified interface already exists for this peer + # Check if interface already exists (MAC sorting should prevent this) if identity_hash in self.spawned_interfaces: - peer_if = self.spawned_interfaces[identity_hash] + RNS.log(f"{self} interface already exists for {name} ({identity_hash[:8]}), reusing", RNS.LOG_WARNING) + return self.spawned_interfaces[identity_hash] - # Add the new connection type to existing interface - if connection_type == "central": - peer_if.add_central_connection(client, mtu) - RNS.log(f"{self} added central connection to existing interface for {name} (now {peer_if._get_connection_state_str()})", RNS.LOG_INFO) - else: # peripheral - peer_if.add_peripheral_connection() - RNS.log(f"{self} added peripheral connection to existing interface for {name} (now {peer_if._get_connection_state_str()})", RNS.LOG_INFO) - - return peer_if - - # Create new unified interface - peer_if = BLEPeerInterface(self, address, name, peer_identity) + # Create new peer interface + peer_if = BLEPeerInterface(self, address, name, peer_identity, connection_type, client, mtu) peer_if.OUT = self.OUT peer_if.IN = self.IN peer_if.parent_interface = self @@ -1743,23 +1750,13 @@ class BLEInterface(Interface): peer_if.HW_MTU = self.HW_MTU peer_if.online = True - # Add the first connection - if connection_type == "central": - peer_if.add_central_connection(client, mtu) - else: # peripheral - peer_if.add_peripheral_connection() - # Register with transport RNS.Transport.interfaces.append(peer_if) - # Note: No tunnel registration needed - direct peer connections use - # RNS.Transport.interfaces[] only (same pattern as I2PInterface) - - # Store in unified tracking + # Store in tracking dict self.spawned_interfaces[identity_hash] = peer_if - identity_str = identity_hash[:8] if peer_identity else "legacy" - RNS.log(f"{self} created NEW unified interface for {name} ({identity_str}), state: {peer_if._get_connection_state_str()}", RNS.LOG_INFO) + RNS.log(f"{self} created peer interface for {name} ({identity_hash[:8]}), type={connection_type}", RNS.LOG_INFO) return peer_if @@ -1796,21 +1793,16 @@ class BLEInterface(Interface): # Log fragmentation statistics for this peer stats = reassembler.get_statistics() - # Get peer name from unified interface lookup + # Get peer name from interface lookup peer_identity = self.address_to_identity.get(peer_address, None) - peer_if = None + peer_name = peer_address[-8:] # Default to address if peer_identity: - # Protocol v2: identity-based lookup - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] + identity_hash = self._compute_identity_hash(peer_identity) peer_if = self.spawned_interfaces.get(identity_hash, None) - else: - # Protocol v1 fallback: try address-based lookup - peer_if = self.spawned_interfaces.get(f"{peer_address}-central", None) - if not peer_if: - peer_if = self.spawned_interfaces.get(f"{peer_address}-peripheral", None) + if peer_if: + peer_name = peer_if.peer_name - peer_name = peer_if.peer_name if peer_if else peer_address[-8:] RNS.log(f"{self} reassembled packet from {peer_name}: " f"total_packets={stats['packets_reassembled']}, " f"total_fragments={stats['fragments_received']}, " @@ -1821,18 +1813,16 @@ class BLEInterface(Interface): RNS.log(f"{self} error reassembling fragment from {peer_address}: {type(e).__name__}: {e}", RNS.LOG_ERROR) return - # If we have a complete packet, route to unified peer interface + # If we have a complete packet, route to peer interface if complete_packet: peer_identity = self.address_to_identity.get(peer_address, None) - peer_if = None - if peer_identity: - # Protocol v2: identity-based lookup - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - peer_if = self.spawned_interfaces.get(identity_hash, None) - else: - # Protocol v1 fallback: address-based lookup (try central first) - peer_if = self.spawned_interfaces.get(f"{peer_address}-central", None) + if not peer_identity: + RNS.log(f"{self} no identity for peer {peer_address}, packet dropped", RNS.LOG_WARNING) + return + + identity_hash = self._compute_identity_hash(peer_identity) + peer_if = self.spawned_interfaces.get(identity_hash, None) if peer_if: peer_if.process_incoming(complete_packet) @@ -1851,147 +1841,49 @@ class BLEInterface(Interface): """ RNS.log(f"{self} received {len(data)} bytes from central {sender_address}", RNS.LOG_EXTREME) - # Detect identity handshake (16 bytes, likely the first write from this peer) - # The central sends its own identity in the handshake to enable unified interface creation - # even when the peripheral hasn't discovered the central via scanning - if len(data) == 16: - central_identity = bytes(data) - central_identity_hash = RNS.Identity.full_hash(central_identity)[:16].hex()[:16] + # Get peer identity (should be set by handle_central_connected) + peer_identity = self.address_to_identity.get(sender_address) - # Store or verify identity mapping - if sender_address not in self.address_to_identity: - # First time seeing this identity for this address - self.address_to_identity[sender_address] = central_identity - self.identity_to_address[central_identity_hash] = sender_address - RNS.log(f"{self} received identity handshake from {sender_address}: {central_identity_hash}", RNS.LOG_INFO) - else: - # Already know identity - verify it matches - existing_identity = self.address_to_identity[sender_address] - if existing_identity == central_identity: - RNS.log(f"{self} received identity handshake confirmation from {sender_address}: {central_identity_hash}", RNS.LOG_DEBUG) - else: - RNS.log(f"{self} WARNING: identity mismatch for {sender_address}! Existing vs received", RNS.LOG_WARNING) + if not peer_identity: + RNS.log(f"{self} no identity for central {sender_address}, dropping data", RNS.LOG_WARNING) + return - # Check if we need to merge interfaces - legacy_conn_id = f"{sender_address}-peripheral" - if legacy_conn_id in self.spawned_interfaces: - # Legacy peripheral interface exists - need to migrate or merge - if central_identity_hash in self.spawned_interfaces: - # We already have an identity-based interface (from central connection) - # Add peripheral connection to it and remove legacy interface - identity_if = self.spawned_interfaces[central_identity_hash] - legacy_if = self.spawned_interfaces[legacy_conn_id] - - # Add peripheral connection to unified interface - identity_if.add_peripheral_connection() - - # Clean up legacy interface - legacy_if.detach() - del self.spawned_interfaces[legacy_conn_id] - - RNS.log(f"{self} merged legacy peripheral into identity-based interface {central_identity_hash} (now {identity_if._get_connection_state_str()})", RNS.LOG_INFO) - else: - # No identity-based interface yet - migrate the legacy one - legacy_if = self.spawned_interfaces[legacy_conn_id] - del self.spawned_interfaces[legacy_conn_id] - - legacy_if.peer_identity = central_identity - self.spawned_interfaces[central_identity_hash] = legacy_if - - RNS.log(f"{self} migrated interface from legacy ({legacy_conn_id}) to identity-based ({central_identity_hash})", RNS.LOG_INFO) - - # Create fragmenter/reassembler for peripheral connection to enable bidirectional data flow - # This is critical: fragmenters must exist for BOTH central and peripheral connections - frag_key = self._get_fragmenter_key(central_identity, sender_address) - with self.frag_lock: - if frag_key not in self.fragmenters: - # Get MTU from GATT server for this peripheral connection - mtu = self.gatt_server.get_central_mtu(sender_address) if self.gatt_server else 23 - self.fragmenters[frag_key] = BLEFragmenter(mtu=mtu) - self.reassemblers[frag_key] = BLEReassembler(timeout=self.connection_timeout) - RNS.log(f"{self} created fragmenter for peripheral connection (key: {frag_key[:16]}, MTU: {mtu})", RNS.LOG_DEBUG) - else: - RNS.log(f"{self} fragmenter already exists for peripheral connection (key: {frag_key[:16]})", RNS.LOG_EXTREME) - - return # Don't process handshake as data - - # Update fragmenter MTU if GATT server has learned a new MTU - # (MTU is provided by BlueZ in write callback options) - if self.gatt_server and hasattr(self.gatt_server, 'get_central_mtu'): - current_mtu = self.gatt_server.get_central_mtu(sender_address) - with self.frag_lock: - if sender_address in self.fragmenters: - existing_mtu = self.fragmenters[sender_address].mtu - if current_mtu != existing_mtu: - RNS.log(f"{self} updating fragmenter MTU for {sender_address}: {existing_mtu} -> {current_mtu}", RNS.LOG_INFO) - self.fragmenters[sender_address] = BLEFragmenter(mtu=current_mtu) + # Get fragmenter key + frag_key = self._get_fragmenter_key(peer_identity, sender_address) # Attempt reassembly complete_packet = None - with self.frag_lock: - if sender_address not in self.reassemblers: - # Create reassembler for this peer - self.reassemblers[sender_address] = BLEReassembler(timeout=self.connection_timeout) - - try: - # Ensure data is bytes (bluezero may pass different types) - data_bytes = bytes(data) if not isinstance(data, bytes) else data - complete_packet = self.reassemblers[sender_address].receive_fragment(data_bytes, sender_address) - - # Periodic cleanup - if complete_packet: - cleaned = self.reassemblers[sender_address].cleanup_stale_buffers() - if cleaned > 0: - RNS.log(f"{self} cleaned {cleaned} stale reassembly buffers for central {sender_address}", RNS.LOG_DEBUG) - - # Log fragmentation statistics for this central - stats = self.reassemblers[sender_address].get_statistics() - # Get peer name from unified interface lookup - peer_identity = self.address_to_identity.get(sender_address, None) - peer_if = None - - if peer_identity: - # Protocol v2: identity-based lookup - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - peer_if = self.spawned_interfaces.get(identity_hash, None) - else: - # Protocol v1 fallback: try address-based lookup - peer_if = self.spawned_interfaces.get(f"{sender_address}-peripheral", None) - if not peer_if: - peer_if = self.spawned_interfaces.get(f"{sender_address}-central", None) - - peer_name = peer_if.peer_name if peer_if else sender_address[-8:] - RNS.log(f"{self} reassembled packet from {peer_name}: " - f"total_packets={stats['packets_reassembled']}, " - f"total_fragments={stats['fragments_received']}, " - f"pending={stats['pending_packets']}, " - f"timeouts={stats['packets_timeout']}", RNS.LOG_DEBUG) - - except Exception as e: - RNS.log(f"{self} error reassembling fragment from central {sender_address}: {type(e).__name__}: {e}", RNS.LOG_ERROR) + if frag_key not in self.reassemblers: + RNS.log(f"{self} no reassembler for {sender_address}, dropping data", RNS.LOG_WARNING) return - # If we have a complete packet, route to unified peer interface - if complete_packet: - peer_identity = self.address_to_identity.get(sender_address, None) - peer_if = None + reassembler = self.reassemblers[frag_key] - if peer_identity: - # Protocol v2: identity-based lookup - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - peer_if = self.spawned_interfaces.get(identity_hash, None) - else: - # Protocol v1 fallback: address-based lookup (try peripheral first) - peer_if = self.spawned_interfaces.get(f"{sender_address}-peripheral", None) + try: + # Ensure data is bytes (bluezero may pass different types) + data_bytes = bytes(data) if not isinstance(data, bytes) else data + complete_packet = reassembler.receive_fragment(data_bytes, sender_address) + + # Periodic cleanup + if complete_packet: + cleaned = reassembler.cleanup_stale_buffers() + if cleaned > 0: + RNS.log(f"{self} cleaned {cleaned} stale reassembly buffers for {sender_address}", RNS.LOG_DEBUG) + + except Exception as e: + RNS.log(f"{self} error reassembling fragment from {sender_address}: {type(e).__name__}: {e}", RNS.LOG_ERROR) + return + + # Route complete packet to interface + if complete_packet: + identity_hash = self._compute_identity_hash(peer_identity) + peer_if = self.spawned_interfaces.get(identity_hash) if peer_if: - RNS.log(f"{self} DIAGNOSTIC: Routing packet to {peer_if}", RNS.LOG_DEBUG) peer_if.process_incoming(complete_packet) else: - RNS.log(f"{self} DIAGNOSTIC: No interface found for {sender_address}, packet dropped!", RNS.LOG_WARNING) - elif not complete_packet: - RNS.log(f"{self} DIAGNOSTIC: No complete packet yet from {sender_address} (waiting for more fragments)", RNS.LOG_DEBUG) + RNS.log(f"{self} no interface for {sender_address}, packet dropped", RNS.LOG_WARNING) def _create_peripheral_peer(self, address): """ @@ -2052,14 +1944,18 @@ class BLEInterface(Interface): """ RNS.log(f"{self} central {address} connected to our peripheral", RNS.LOG_INFO) - # Look up peer identity if we have it (from when we connected as central) + # Look up peer identity (should exist from discovery or handshake) peer_identity = self.address_to_identity.get(address, None) - # Create or update unified interface with peripheral connection - self._spawn_or_update_peer_interface( + if not peer_identity: + RNS.log(f"{self} cannot create interface for {address} - no identity available", RNS.LOG_ERROR) + return + + # Create peer interface with peripheral connection + self._spawn_peer_interface( address=address, - name=f"Central-{address[-8:]}", # Will be updated if we learn better name - peer_identity=peer_identity, # May be None if we haven't connected as central yet + name=f"Central-{address[-8:]}", + peer_identity=peer_identity, client=None, # No client for peripheral connections mtu=None, # MTU managed by GATT server connection_type="peripheral" @@ -2069,9 +1965,6 @@ class BLEInterface(Interface): """ Handle a central device disconnecting from our GATT server. - With unified interface architecture, this removes the peripheral connection - from the interface. The interface is only detached if no connections remain. - Args: address: BLE address of the central device """ @@ -2080,47 +1973,27 @@ class BLEInterface(Interface): # Look up peer identity peer_identity = self.address_to_identity.get(address, None) - if peer_identity: - # Protocol v2: Use identity-based lookup - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - if identity_hash in self.spawned_interfaces: - peer_if = self.spawned_interfaces[identity_hash] - peer_if.remove_peripheral_connection() + if not peer_identity: + RNS.log(f"{self} no identity for disconnected central {address}", RNS.LOG_WARNING) + return - # If no connections remain, detach and remove interface - if not peer_if.has_central_connection and not peer_if.has_peripheral_connection: - peer_if.detach() - del self.spawned_interfaces[identity_hash] - RNS.log(f"{self} detached unified interface for {address} (no connections remain)", RNS.LOG_DEBUG) - else: - # Protocol v1 fallback: Use address-based lookup - conn_id = f"{address}-peripheral" - if conn_id in self.spawned_interfaces: - peer_if = self.spawned_interfaces[conn_id] - peer_if.detach() - del self.spawned_interfaces[conn_id] - RNS.log(f"{self} cleaned up legacy peripheral peer interface for {address}", RNS.LOG_DEBUG) + # Find and detach interface + identity_hash = self._compute_identity_hash(peer_identity) + if identity_hash in self.spawned_interfaces: + peer_if = self.spawned_interfaces[identity_hash] + peer_if.detach() + del self.spawned_interfaces[identity_hash] + RNS.log(f"{self} detached interface for {address}", RNS.LOG_DEBUG) - # Clean up shared fragmenter/reassembler only if NO connections remain - # Check both v2 (identity-based) and v1 (address-based) tracking - should_cleanup = True - if peer_identity: - identity_hash = RNS.Identity.full_hash(peer_identity)[:16].hex()[:16] - if identity_hash in self.spawned_interfaces: - should_cleanup = False # Interface still exists - else: - # Check if any address-based interface exists - if f"{address}-central" in self.spawned_interfaces: - should_cleanup = False - - if should_cleanup: + # Clean up fragmenter/reassembler + frag_key = self._get_fragmenter_key(peer_identity, address) with self.frag_lock: - if address in self.reassemblers: - del self.reassemblers[address] - RNS.log(f"{self} cleaned up reassembler for {address} (no connections remain)", RNS.LOG_DEBUG) - if address in self.fragmenters: - del self.fragmenters[address] - RNS.log(f"{self} cleaned up fragmenter for {address} (no connections remain)", RNS.LOG_DEBUG) + if frag_key in self.reassemblers: + del self.reassemblers[frag_key] + RNS.log(f"{self} cleaned up reassembler for {address}", RNS.LOG_DEBUG) + if frag_key in self.fragmenters: + del self.fragmenters[frag_key] + RNS.log(f"{self} cleaned up fragmenter for {address}", RNS.LOG_DEBUG) def process_incoming(self, data): """ @@ -2243,19 +2116,18 @@ class BLEPeerInterface(Interface): interfaces for routing and statistics tracking. """ - def __init__(self, parent, peer_address, peer_name, peer_identity=None): + def __init__(self, parent, peer_address, peer_name, peer_identity=None, connection_type="central", client=None, mtu=None): """ Initialize peer interface. - This interface can now handle BOTH central and peripheral connections - to the same peer identity, eliminating duplicate interfaces and fixing - ACK routing issues. - Args: parent: Parent BLEInterface peer_address: BLE address of peer peer_name: Name of peer device peer_identity: 16-byte peer identity from GATT characteristic (optional, can be set later) + connection_type: "central" (we connected to them) or "peripheral" (they connected to us) + client: BleakClient reference (for central connections only) + mtu: Negotiated MTU (for central connections only) """ super().__init__() @@ -2263,13 +2135,12 @@ class BLEPeerInterface(Interface): self.peer_address = peer_address self.peer_name = peer_name self.peer_identity = peer_identity # 16-byte identity for stable tracking + self.connection_type = connection_type # "central" or "peripheral" self.online = True - # Dual connection state tracking - self.has_central_connection = False # True if we connected to them - self.has_peripheral_connection = False # True if they connected to us - self.central_client = None # BleakClient reference (if central connection exists) - self.central_mtu = None # MTU for central connection + # Connection references (central mode only) + self.central_client = client if connection_type == "central" else None + self.central_mtu = mtu if connection_type == "central" else None # Copy settings from parent self.HW_MTU = parent.HW_MTU @@ -2281,70 +2152,7 @@ class BLEPeerInterface(Interface): # Announce rate limiting (required by Transport.inbound announce processing) self.announce_rate_target = None # No announce rate limiting for BLE peer interfaces - RNS.log(f"BLEPeerInterface initialized for {peer_name} ({peer_address}), identity={'set' if peer_identity else 'pending'}", RNS.LOG_DEBUG) - - def add_central_connection(self, client, mtu): - """ - Add a central connection to this peer interface. - - Called when we successfully connect as a GATT client to this peer. - - Args: - client: BleakClient instance - mtu: Negotiated MTU for this connection - """ - self.has_central_connection = True - self.central_client = client - self.central_mtu = mtu - conn_state = self._get_connection_state_str() - RNS.log(f"{self} added central connection (MTU: {mtu}), state now: {conn_state}", RNS.LOG_DEBUG) - - def add_peripheral_connection(self): - """ - Add a peripheral connection to this peer interface. - - Called when this peer connects as a GATT client to our GATT server. - """ - self.has_peripheral_connection = True - conn_state = self._get_connection_state_str() - RNS.log(f"{self} added peripheral connection, state now: {conn_state}", RNS.LOG_DEBUG) - - def remove_central_connection(self): - """Remove the central connection from this peer interface.""" - if self.has_central_connection: - self.has_central_connection = False - self.central_client = None - self.central_mtu = None - conn_state = self._get_connection_state_str() - RNS.log(f"{self} removed central connection, state now: {conn_state}", RNS.LOG_DEBUG) - - # Mark offline if no connections remain - if not self.has_peripheral_connection: - self.online = False - RNS.log(f"{self} no connections remain, marking offline", RNS.LOG_DEBUG) - - def remove_peripheral_connection(self): - """Remove the peripheral connection from this peer interface.""" - if self.has_peripheral_connection: - self.has_peripheral_connection = False - conn_state = self._get_connection_state_str() - RNS.log(f"{self} removed peripheral connection, state now: {conn_state}", RNS.LOG_DEBUG) - - # Mark offline if no connections remain - if not self.has_central_connection: - self.online = False - RNS.log(f"{self} no connections remain, marking offline", RNS.LOG_DEBUG) - - def _get_connection_state_str(self): - """Get a string describing the current connection state.""" - if self.has_central_connection and self.has_peripheral_connection: - return "central+peripheral" - elif self.has_central_connection: - return "central only" - elif self.has_peripheral_connection: - return "peripheral only" - else: - return "no connections" + RNS.log(f"BLEPeerInterface initialized for {peer_name} ({peer_address}), type={connection_type}, identity={'set' if peer_identity else 'pending'}", RNS.LOG_DEBUG) def process_incoming(self, data): """ @@ -2360,25 +2168,13 @@ class BLEPeerInterface(Interface): # Log packet reception RNS.log(f"{self} RX: {len(data)} bytes from {self.peer_name}", RNS.LOG_DEBUG) - # DIAGNOSTIC: Log before calling Transport - RNS.log(f"DIAGNOSTIC: Calling owner.inbound() with {len(data)} bytes on interface {self}", RNS.LOG_DEBUG) - RNS.log(f"DIAGNOSTIC: Interface attributes - IN={self.IN}, OUT={self.OUT}, mode={getattr(self, 'mode', 'NOT_SET')}, online={self.online}", RNS.LOG_DEBUG) - RNS.log(f"DIAGNOSTIC: Packet first bytes (hex): {data[:10].hex()}", RNS.LOG_DEBUG) - # Pass to Reticulum transport self.parent_interface.owner.inbound(data, self) - RNS.log(f"DIAGNOSTIC: owner.inbound() returned for {self}", RNS.LOG_DEBUG) - def process_outgoing(self, data): """ Process outgoing data to send to this peer (with fragmentation). - This method intelligently selects the best available connection path: - - If both central and peripheral connections exist, prefer central (lower latency) - - If only one connection exists, use that path - - Falls back gracefully if one path fails - Args: data: Raw packet data to transmit """ @@ -2409,25 +2205,11 @@ class BLEPeerInterface(Interface): RNS.log(f"Failed to fragment data for {self.peer_name}: {e}", RNS.LOG_ERROR) return - # Intelligently route based on available connections - if self.has_central_connection and self.has_peripheral_connection: - # Both paths available - prefer central for lower latency - RNS.log(f"{self} using central path (both connections available)", RNS.LOG_EXTREME) - success = self._send_via_central(fragments) - if not success: - # Fallback to peripheral if central fails - RNS.log(f"{self} central send failed, falling back to peripheral", RNS.LOG_WARNING) - self._send_via_peripheral(fragments) - elif self.has_central_connection: - # Only central connection available - RNS.log(f"{self} using central path (only connection)", RNS.LOG_EXTREME) + # Route based on connection type + if self.connection_type == "central": self._send_via_central(fragments) - elif self.has_peripheral_connection: - # Only peripheral connection available - RNS.log(f"{self} using peripheral path (only connection)", RNS.LOG_EXTREME) + else: # peripheral self._send_via_peripheral(fragments) - else: - RNS.log(f"{self} no connections available for transmission!", RNS.LOG_ERROR) def _send_via_peripheral(self, fragments): """ @@ -2473,24 +2255,13 @@ class BLEPeerInterface(Interface): Returns: bool: True if all fragments sent successfully, False otherwise """ - # Use stored central_client if available (dual-connection architecture) - client = self.central_client if self.has_central_connection else None - - # Fallback to legacy peers dict lookup (for compatibility during transition) - if not client: - with self.parent_interface.peer_lock: - if self.peer_address not in self.parent_interface.peers: - RNS.log(f"{self} peer {self.peer_name} ({self.peer_address}) no longer connected", RNS.LOG_WARNING) - return False - - # Get reference to client and release lock immediately - client, _, _ = self.parent_interface.peers[self.peer_address] - - # Check if client is still connected before sending - if not client or not client.is_connected: - RNS.log(f"{self} peer {self.peer_name} ({self.peer_address}) disconnected before transmission", RNS.LOG_WARNING) + # Use stored central_client (set at initialization for central connections) + if not self.central_client or not self.central_client.is_connected: + RNS.log(f"{self} peer {self.peer_name} ({self.peer_address}) not connected or disconnected", RNS.LOG_WARNING) return False + client = self.central_client + # Send each fragment via BLE characteristic write for i, fragment in enumerate(fragments): try: @@ -2564,7 +2335,7 @@ class BLEPeerInterface(Interface): return f"{self.peer_address}" def __str__(self): - return f"BLEPeerInterface[{self.peer_name}/{self._get_connection_state_str()}]" + return f"BLEPeerInterface[{self.peer_name}/{self.connection_type}]" # Register interface for Reticulum