diff --git a/src/ble_reticulum/BLEInterface.py b/src/ble_reticulum/BLEInterface.py index c361353..49ed348 100644 --- a/src/ble_reticulum/BLEInterface.py +++ b/src/ble_reticulum/BLEInterface.py @@ -392,6 +392,13 @@ class BLEInterface(Interface): self._pending_detach = {} self._pending_detach_grace_period = 2.0 # seconds + # Zombie connection detection (identity_hash -> timestamp of last real data) + # Connections that only exchange keepalives (no real data) for > zombie_timeout + # are considered "zombies" and won't block new connections from the same identity. + # This handles BLE link degradation where keepalives work but data doesn't. + self._last_real_data = {} + self._zombie_timeout = 30.0 # seconds - connection is zombie if no real data for this long + # Fragmentation self.fragmenters = {} # address -> BLEFragmenter (per MTU) self.reassemblers = {} # address -> BLEReassembler @@ -804,6 +811,9 @@ class BLEInterface(Interface): del self.spawned_interfaces[identity_hash] if identity_hash in self.identity_to_address: del self.identity_to_address[identity_hash] + # Clean up zombie detection tracking + if identity_hash in self._last_real_data: + del self._last_real_data[identity_hash] # Clean up fragmenter/reassembler now that interface is fully detached if peer_identity: frag_key = self._get_fragmenter_key(peer_identity, "") # Address unused in key computation @@ -1017,13 +1027,19 @@ class BLEInterface(Interface): This handles Android MAC randomization where the same device advertises with one MAC but connects with a different MAC. + IMPORTANT: Before rejecting as duplicate, we verify that the existing + connection is still alive. This prevents false rejections when: + - A peer disconnects but identity_to_address still has a stale entry + (cleanup happens after a 2-second grace period) + - The same identity reconnects with a new MAC (Android MAC rotation) + Args: address: MAC address attempting to connect peer_identity: 16-byte identity hash of the peer Returns: True if this identity is already connected via a different MAC (abort connection) - False if this is a new identity or same MAC (allow connection) + False if this is a new identity, same MAC, or stale entry (allow connection) """ if not peer_identity or len(peer_identity) != 16: return False @@ -1032,7 +1048,57 @@ class BLEInterface(Interface): existing_address = self.identity_to_address.get(identity_hash) if existing_address and existing_address != address: - # Same identity, different MAC - this is Android MAC rotation + # Same identity, different MAC - check if old connection is still alive + + # Check 1: Is there a pending detach for this identity? + # If so, the old connection is already gone - allow new connection + if identity_hash in self._pending_detach: + RNS.log( + f"{self} allowing reconnection from {address} - identity {identity_hash[:8]} " + f"has pending detach (old connection from {existing_address} is gone)", + RNS.LOG_DEBUG + ) + # Clean up stale address mappings to prepare for new connection + self._cleanup_stale_address(identity_hash, existing_address) + return False + + # Check 2: Is the existing address still connected? + # Check both driver.connected_peers and our peers dict + if existing_address not in self.driver.connected_peers: + if existing_address not in self.peers: + # Old connection is dead but cleanup hasn't happened yet + RNS.log( + f"{self} allowing reconnection from {address} - identity {identity_hash[:8]} " + f"old address {existing_address} is no longer connected", + RNS.LOG_DEBUG + ) + # Clean up stale address mappings to prepare for new connection + self._cleanup_stale_address(identity_hash, existing_address) + return False + + # Check 3: Is the existing connection a zombie? + # A "zombie" connection has keepalives working but no real data for zombie_timeout. + # This happens when BLE link degrades - 1-byte keepalives succeed but larger + # data packets fail. We allow new connections to replace zombies. + last_data_time = self._last_real_data.get(identity_hash, 0) + if last_data_time > 0: + time_since_data = time.time() - last_data_time + if time_since_data > self._zombie_timeout: + RNS.log( + f"{self} allowing reconnection from {address} - identity {identity_hash[:8]} " + f"old connection at {existing_address} is zombie (no real data for {time_since_data:.1f}s)", + RNS.LOG_WARNING + ) + # Clean up the zombie connection + self._cleanup_stale_address(identity_hash, existing_address) + # Disconnect the zombie to free up resources + try: + self.driver.disconnect(existing_address) + except Exception as e: + RNS.log(f"{self} failed to disconnect zombie {existing_address}: {e}", RNS.LOG_DEBUG) + return False + + # Existing connection is still alive and healthy - reject duplicate RNS.log( f"{self} duplicate identity detected: {identity_hash[:8]} already connected via {existing_address}, " f"rejecting connection from {address} (Android MAC rotation)", @@ -1120,20 +1186,44 @@ class BLEInterface(Interface): Returns: True if data was handled as identity handshake, False otherwise """ + # Identity handshake detection: exactly 16 bytes + if len(data) != 16: + return False # Not a handshake + # Check if we already have peer identity peer_identity = self.address_to_identity.get(address) if peer_identity: - return False # Already have identity, not a handshake - - # Identity handshake detection: exactly 16 bytes, no existing identity - if len(data) != 16: - return False # Not a handshake + # We already have identity for this address (probably set via Kotlin callback). + # The 16-byte handshake data may still arrive through the data channel. + # Check if it matches the identity we have - if so, consume it silently. + if data == peer_identity: + RNS.log(f"{self} received duplicate identity handshake from {address} (already known via callback)", RNS.LOG_DEBUG) + return True # Consume the data, don't pass to reassembler + else: + # 16 bytes but doesn't match known identity - log warning but still consume + # to avoid passing identity-like data to the reassembler + RNS.log(f"{self} received 16-byte data from {address} that differs from known identity, consuming as handshake", RNS.LOG_WARNING) + return True # Consume to prevent reassembler errors try: # Store central's identity central_identity = bytes(data) identity_hash = self._compute_identity_hash(central_identity) + # Check for duplicate identity (same identity already connected at different MAC) + # This prevents duplicate connections during MAC rotation overlap + if self._check_duplicate_identity(address, central_identity): + RNS.log( + f"{self} duplicate identity rejected for {address} in peripheral mode (MAC rotation)", + RNS.LOG_WARNING + ) + # Disconnect this connection - it's a duplicate + try: + self.driver.disconnect(address) + except Exception as e: + RNS.log(f"{self} failed to disconnect duplicate {address}: {e}", RNS.LOG_DEBUG) + return True # Consumed the handshake, rejected connection + self.address_to_identity[address] = central_identity self.identity_to_address[identity_hash] = address @@ -1175,6 +1265,9 @@ class BLEInterface(Interface): RNS.log(f"{self} identity handshake complete for {address}", RNS.LOG_INFO) + # Initialize zombie detection tracking - the 16-byte handshake counts as real data + self._last_real_data[identity_hash] = time.time() + # Remove from pending identity tracking (no longer waiting for handshake) if address in self._pending_identity_connections: del self._pending_identity_connections[address] @@ -1779,11 +1872,18 @@ class BLEInterface(Interface): """ Convert 16-byte identity to 16-character hex string for interface tracking. + Uses truncated 64-bit hash for map keys (spawned_interfaces, identity_to_address). + Collision risk: birthday problem collision at ~2^32 (~4 billion) identities. + For BLE mesh networks with <100 simultaneous peers, this is astronomically safe. + + Note: Fragmenter keys use full 32-char hex via _get_fragmenter_key() for + maximum precision in packet reassembly. + Args: peer_identity: 16-byte peer identity (already a hash from BLE handshake) Returns: - str: First 16 hex chars of identity (8 bytes) + str: First 16 hex chars of identity (8 bytes = 64 bits) """ # peer_identity is already the identity hash from BLE handshake # Just convert to hex, don't re-hash (that would corrupt the identity!) @@ -1847,6 +1947,9 @@ class BLEInterface(Interface): self.spawned_interfaces[identity_hash] = peer_if self.address_to_interface[address] = peer_if + # Initialize zombie detection tracking - interface creation counts as activity + self._last_real_data[identity_hash] = time.time() + RNS.log(f"{self} created peer interface for {name} ({identity_hash[:8]}), type={connection_type}", RNS.LOG_INFO) return peer_if @@ -1872,6 +1975,12 @@ class BLEInterface(Interface): if not peer_identity: # Try identity cache - peer may have "disconnected" from Python's view # but Android/driver layer maintains the GATT connection + # + # POTENTIAL RACE CONDITION: If MAC rotation occurred and data arrives from + # the OLD address before onAddressChanged callback fires, we could restore + # a stale mapping here. This is a very narrow window since onAddressChanged + # is invoked synchronously from Kotlin during deduplication. The cache entry + # for the old address gets cleaned up in _address_changed_callback(). cached = self._identity_cache.get(peer_address) if cached and (time.time() - cached[1]) < self._identity_cache_ttl: peer_identity = cached[0] @@ -1893,6 +2002,11 @@ class BLEInterface(Interface): RNS.log(f"{self} no identity for peer {peer_address}, dropping data", RNS.LOG_WARNING) return + # Track real data activity for zombie detection + # This proves the connection is alive and can carry actual data, not just keepalives + identity_hash = self._compute_identity_hash(peer_identity) + self._last_real_data[identity_hash] = time.time() + # Compute identity-based fragmenter key (matches peripheral data handler) frag_key = self._get_fragmenter_key(peer_identity, peer_address) @@ -1995,7 +2109,7 @@ class BLEInterface(Interface): try: # Store central's identity central_identity = bytes(data) - central_identity_hash = RNS.Identity.full_hash(central_identity)[:16].hex()[:16] + central_identity_hash = self._compute_identity_hash(central_identity) self.address_to_identity[sender_address] = central_identity self.identity_to_address[central_identity_hash] = sender_address diff --git a/src/ble_reticulum/linux_bluetooth_driver.py b/src/ble_reticulum/linux_bluetooth_driver.py index b2200c3..c14bc01 100644 --- a/src/ble_reticulum/linux_bluetooth_driver.py +++ b/src/ble_reticulum/linux_bluetooth_driver.py @@ -1204,7 +1204,14 @@ class LinuxBluetoothDriver(BLEDriverInterface): if self.on_error: self.on_error("warning", f"Connection timeout to {address}", None) except Exception as e: - self._log(f"Connection failed to {address}: {e}", "ERROR") + error_str = str(e) + is_duplicate_identity = "Duplicate identity" in error_str + + # Log with appropriate level + if is_duplicate_identity: + self._log(f"Duplicate identity rejected for {address}: {e}", "WARNING") + else: + self._log(f"Connection failed to {address}: {e}", "ERROR") # Clean up BlueZ state by explicitly disconnecting client try: @@ -1224,7 +1231,13 @@ class LinuxBluetoothDriver(BLEDriverInterface): self._log(f"Error removing BlueZ device {address} after failure: {removal_e}", "DEBUG") if self.on_error: - self.on_error("error", f"Connection failed to {address}: {e}", e) + if is_duplicate_identity: + # Use safe message format that doesn't trigger blacklist + # The blacklist regex matches "Connection failed to" and "Connection timeout to" + # Using "Duplicate identity rejected for" avoids the blacklist + self.on_error("info", f"Duplicate identity rejected for {address} (MAC rotation)", e) + else: + self.on_error("error", f"Connection failed to {address}: {e}", e) finally: # Backup cleanup (primary cleanup is via Future callback in connect()) # This provides defense-in-depth in case the callback doesn't execute diff --git a/tests/mock_ble_driver.py b/tests/mock_ble_driver.py index 994f967..111f965 100644 --- a/tests/mock_ble_driver.py +++ b/tests/mock_ble_driver.py @@ -80,7 +80,7 @@ class MockBLEDriver(BLEDriverInterface): # Callbacks (assigned by consumer) self.on_device_discovered: Optional[Callable[[BLEDevice], None]] = None - self.on_device_connected: Optional[Callable[[str], None]] = None + self.on_device_connected: Optional[Callable[[str, Optional[bytes]], None]] = None # address, peer_identity self.on_device_disconnected: Optional[Callable[[str], None]] = None self.on_data_received: Optional[Callable[[str, bytes], None]] = None self.on_mtu_negotiated: Optional[Callable[[str, int], None]] = None @@ -160,16 +160,21 @@ class MockBLEDriver(BLEDriverInterface): if address in self._connected_peers: return # Already connected + # Get peer identity if linked driver is set + peer_identity = None + if self._linked_driver and self._linked_driver.local_address == address: + peer_identity = self._linked_driver._identity + # Simulate connection with default MTU self._connected_peers[address] = { "role": "central", "mtu": 185, # Default MTU - "identity": None + "identity": peer_identity } - # Trigger callback + # Trigger callback with peer identity (central mode receives identity during connection) if self.on_device_connected: - self.on_device_connected(address) + self.on_device_connected(address, peer_identity) # Trigger MTU negotiation callback if self.on_mtu_negotiated: @@ -193,8 +198,9 @@ class MockBLEDriver(BLEDriverInterface): "identity": None } + # Peripheral role: identity is None because we receive it via handshake later if self.on_device_connected: - self.on_device_connected(address) + self.on_device_connected(address, None) if self.on_mtu_negotiated: self.on_mtu_negotiated(address, 185) diff --git a/tests/test_ble_duplicate_identity_stale.py b/tests/test_ble_duplicate_identity_stale.py new file mode 100644 index 0000000..7dc3380 --- /dev/null +++ b/tests/test_ble_duplicate_identity_stale.py @@ -0,0 +1,366 @@ +""" +TDD Test for BLE Duplicate Identity Stale Entry Bug + +BUG DESCRIPTION: +---------------- +When a peer disconnects, identity_to_address is NOT cleaned up immediately - it's only +removed after a 2-second grace period in _process_pending_detaches(). However, +_check_duplicate_identity() doesn't verify that the existing address is still connected. + +This causes NEW connections from the same identity (after MAC rotation or reconnection) +to be INCORRECTLY rejected as "duplicates" during the grace period. + +OBSERVED BEHAVIOR (from logs): +------------------------------ +1. Identity c9d09faa connects from MAC_OLD (47:9C:55:2F:85:5D) - SUCCESS +2. Connection disconnects (but identity_to_address still has entry due to grace period) +3. Same identity tries to connect from MAC_NEW (5C:D7:DD:D5:DD:7D) +4. _check_duplicate_identity sees identity_to_address[c9d09faa] = MAC_OLD +5. Since MAC_OLD != MAC_NEW, connection is REJECTED as duplicate +6. But MAC_OLD connection is DEAD - this is a BUG! + +EXPECTED BEHAVIOR: +----------------- +_check_duplicate_identity should: +1. Check if the existing address is still in the connected peers list +2. OR check if there's a pending detach scheduled for this identity +3. If the old connection is gone, ALLOW the new connection + +This test is written TDD-style: it should FAIL before the fix and PASS after. +""" + +import unittest +import sys +import os +import time +import hashlib + +# Add parent directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + + +class MockDriver: + """Mock BLE driver that tracks connected peers.""" + + def __init__(self): + self.connected_peers = [] + + def disconnect(self, address): + if address in self.connected_peers: + self.connected_peers.remove(address) + + +class BLEInterfaceTestHarness: + """ + Test harness that replicates the _check_duplicate_identity logic from BLEInterface. + + This is extracted from ble_reticulum/BLEInterface.py to test in isolation. + """ + + def __init__(self): + # Maps from BLEInterface + self.identity_to_address = {} # identity_hash -> address + self.address_to_identity = {} # address -> peer_identity (bytes) + self.peers = {} # address -> peer info + self._pending_detach = {} # identity_hash -> timestamp + self._pending_detach_grace_period = 2.0 # seconds + + # Mock driver for connected peers check + self.driver = MockDriver() + + def _compute_identity_hash(self, peer_identity: bytes) -> str: + """Compute 16-char hex hash of peer identity.""" + return hashlib.sha256(peer_identity).hexdigest()[:16] + + def _check_duplicate_identity_BUGGY(self, address: str, peer_identity: bytes) -> bool: + """ + BUGGY VERSION: Original implementation that doesn't check connection validity. + + This is the current (broken) behavior from BLEInterface. + """ + if not peer_identity or len(peer_identity) != 16: + return False + + identity_hash = self._compute_identity_hash(peer_identity) + existing_address = self.identity_to_address.get(identity_hash) + + if existing_address and existing_address != address: + # Same identity, different MAC - this is Android MAC rotation + # BUG: Doesn't check if existing_address is still connected! + return True + + return False + + def _check_duplicate_identity_FIXED(self, address: str, peer_identity: bytes) -> bool: + """ + FIXED VERSION: Checks if existing connection is still valid before rejecting. + + This is the expected behavior after the fix. + """ + if not peer_identity or len(peer_identity) != 16: + return False + + identity_hash = self._compute_identity_hash(peer_identity) + existing_address = self.identity_to_address.get(identity_hash) + + if existing_address and existing_address != address: + # Same identity, different MAC - check if the old connection is still alive + + # Check 1: Is there a pending detach for this identity? + # If so, the old connection is already gone - allow new connection + if identity_hash in self._pending_detach: + return False + + # Check 2: Is the existing address still connected? + # Check both driver.connected_peers and our peers dict + if existing_address not in self.driver.connected_peers: + if existing_address not in self.peers: + # Old connection is dead - allow new connection + return False + + # Existing connection is still alive - reject duplicate + return True + + return False + + def simulate_connect(self, address: str, identity: bytes): + """Simulate a successful connection with identity.""" + identity_hash = self._compute_identity_hash(identity) + self.identity_to_address[identity_hash] = address + self.address_to_identity[address] = identity + self.peers[address] = {"connected": True} + self.driver.connected_peers.append(address) + + def simulate_disconnect(self, address: str, with_grace_period: bool = True): + """ + Simulate a disconnection. + + If with_grace_period=True (default), identity_to_address is NOT cleaned up + immediately (mimicking the real behavior where cleanup happens after 2s grace period). + """ + identity = self.address_to_identity.get(address) + if identity: + identity_hash = self._compute_identity_hash(identity) + + # Remove from driver connected peers + if address in self.driver.connected_peers: + self.driver.connected_peers.remove(address) + + # Remove from our peers dict + if address in self.peers: + del self.peers[address] + + # Remove address_to_identity + if address in self.address_to_identity: + del self.address_to_identity[address] + + if with_grace_period: + # Schedule pending detach (identity_to_address NOT removed yet!) + self._pending_detach[identity_hash] = time.time() + # NOTE: identity_to_address is intentionally NOT cleaned up here + # This is the source of the bug! + else: + # Immediate cleanup (no grace period) + if identity_hash in self.identity_to_address: + del self.identity_to_address[identity_hash] + + +class TestStaleIdentityToAddressBug(unittest.TestCase): + """ + TDD Test: Demonstrates the stale identity_to_address bug. + + These tests should FAIL with the buggy implementation and PASS with the fix. + """ + + def setUp(self): + self.harness = BLEInterfaceTestHarness() + # Create test identities (16 bytes each) + self.identity_A = b'\x12\x34\x56\x78' * 4 # Device A's identity + self.MAC_OLD = "AA:BB:CC:DD:EE:01" + self.MAC_NEW = "11:22:33:44:55:02" + + def test_buggy_implementation_incorrectly_rejects_reconnection(self): + """ + Demonstrate the BUG: stale entry causes false duplicate rejection. + + This test shows what CURRENTLY happens (incorrect behavior). + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A disconnects (with grace period - entry stays in identity_to_address) + self.harness.simulate_disconnect(self.MAC_OLD, with_grace_period=True) + + # Verify: MAC_OLD is no longer connected + self.assertNotIn(self.MAC_OLD, self.harness.driver.connected_peers) + self.assertNotIn(self.MAC_OLD, self.harness.peers) + + # BUT: identity_to_address still has the stale entry! + identity_hash = self.harness._compute_identity_hash(self.identity_A) + self.assertIn(identity_hash, self.harness.identity_to_address) + self.assertEqual(self.harness.identity_to_address[identity_hash], self.MAC_OLD) + + # Step 3: Device A reconnects with MAC_NEW (Android MAC rotation) + # BUGGY IMPLEMENTATION: incorrectly returns True (rejects as duplicate) + is_duplicate = self.harness._check_duplicate_identity_BUGGY(self.MAC_NEW, self.identity_A) + + # This assertion shows the BUG: it's treating a dead connection as a duplicate! + self.assertTrue( + is_duplicate, + "BUG VERIFICATION: Buggy implementation should (incorrectly) reject reconnection" + ) + + def test_fixed_implementation_allows_reconnection_during_grace_period(self): + """ + Test the FIX: should allow reconnection when old connection is dead. + + This test shows the EXPECTED behavior after fixing the bug. + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A disconnects (with grace period - entry stays but pending detach is set) + self.harness.simulate_disconnect(self.MAC_OLD, with_grace_period=True) + + # Verify: MAC_OLD is no longer connected + self.assertNotIn(self.MAC_OLD, self.harness.driver.connected_peers) + + # Verify: pending detach is scheduled + identity_hash = self.harness._compute_identity_hash(self.identity_A) + self.assertIn(identity_hash, self.harness._pending_detach) + + # Step 3: Device A reconnects with MAC_NEW + # FIXED IMPLEMENTATION: should return False (allow connection) + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_NEW, self.identity_A) + + self.assertFalse( + is_duplicate, + "FIX VERIFICATION: Fixed implementation should allow reconnection during grace period" + ) + + def test_fixed_implementation_still_rejects_true_duplicates(self): + """ + Test that the fix doesn't break legitimate duplicate rejection. + + When the old connection IS still alive, it should still be rejected. + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A tries to connect AGAIN with MAC_NEW (old connection still alive) + # This is a TRUE duplicate - should be rejected + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_NEW, self.identity_A) + + self.assertTrue( + is_duplicate, + "Should still reject true duplicates when old connection is alive" + ) + + def test_fixed_implementation_allows_same_mac_reconnection(self): + """ + Test that reconnection from the same MAC is allowed. + + When the same MAC reconnects, it should never be considered a duplicate. + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A disconnects + self.harness.simulate_disconnect(self.MAC_OLD, with_grace_period=True) + + # Step 3: Device A reconnects with SAME MAC + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_OLD, self.identity_A) + + self.assertFalse( + is_duplicate, + "Should allow reconnection from same MAC" + ) + + def test_fixed_implementation_allows_when_no_pending_detach_and_not_connected(self): + """ + Test edge case: old entry exists but no pending detach and not connected. + + This can happen if pending detach was cleaned up but identity_to_address wasn't. + """ + # Manually set up a stale state (should not happen in practice, but defensive) + identity_hash = self.harness._compute_identity_hash(self.identity_A) + self.harness.identity_to_address[identity_hash] = self.MAC_OLD + # Note: NOT in peers, NOT in connected_peers, NOT in _pending_detach + + # Step: Device A connects with MAC_NEW + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_NEW, self.identity_A) + + self.assertFalse( + is_duplicate, + "Should allow connection when old entry is stale (not connected, no pending detach)" + ) + + +class TestRealWorldScenario(unittest.TestCase): + """ + Test the real-world scenario observed in logs. + + From logs: Identity c9d09faa kept getting rejected from multiple different MACs + because the original MAC entry was never cleaned up. + """ + + def setUp(self): + self.harness = BLEInterfaceTestHarness() + # Use realistic 16-byte identity + self.identity_c9d09f = bytes.fromhex("c9d09faa3ef0220447e0111b626ce641") + + # Sequence of MACs observed in logs (Android MAC rotation) + self.mac_sequence = [ + "47:9C:55:2F:85:5D", # First successful connection + "4A:FB:BF:EB:C3:11", # Rejected as duplicate + "46:EF:48:18:13:F2", # Rejected as duplicate + "76:E4:EE:9B:6D:10", # Rejected as duplicate + "5C:D7:DD:D5:DD:7D", # Rejected as duplicate + "58:58:43:B9:EB:CD", # Rejected as duplicate + ] + + def test_real_world_mac_rotation_with_buggy_impl(self): + """ + Demonstrate the real bug observed in logs. + + All subsequent connection attempts are incorrectly rejected. + """ + # First connection succeeds + first_mac = self.mac_sequence[0] + self.harness.simulate_connect(first_mac, self.identity_c9d09f) + + # Connection drops (with grace period) + self.harness.simulate_disconnect(first_mac, with_grace_period=True) + + # All subsequent attempts are incorrectly rejected (BUG) + for mac in self.mac_sequence[1:]: + is_duplicate = self.harness._check_duplicate_identity_BUGGY(mac, self.identity_c9d09f) + self.assertTrue( + is_duplicate, + f"BUG: MAC {mac} incorrectly rejected as duplicate" + ) + + def test_real_world_mac_rotation_with_fixed_impl(self): + """ + Verify the fix allows reconnection after disconnect. + + After the fix, subsequent connection attempts should succeed. + """ + # First connection succeeds + first_mac = self.mac_sequence[0] + self.harness.simulate_connect(first_mac, self.identity_c9d09f) + + # Connection drops (with grace period) + self.harness.simulate_disconnect(first_mac, with_grace_period=True) + + # All subsequent attempts should be allowed (FIX) + for mac in self.mac_sequence[1:]: + is_duplicate = self.harness._check_duplicate_identity_FIXED(mac, self.identity_c9d09f) + self.assertFalse( + is_duplicate, + f"FIX: MAC {mac} should be allowed to reconnect" + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_ble_peer_interface.py b/tests/test_ble_peer_interface.py index c3d0925..1473656 100644 --- a/tests/test_ble_peer_interface.py +++ b/tests/test_ble_peer_interface.py @@ -39,7 +39,8 @@ def create_mock_peer_interface(peer_address="AA:BB:CC:DD:EE:FF", peer_name="Test parent.reassemblers = {peer_address: BLEReassembler() if BLEReassembler else Mock()} parent.frag_lock = threading.RLock() # Use threading lock for mock parent.peer_lock = threading.RLock() # Use threading lock for mock - parent.loop = asyncio.get_event_loop() + # Create a new event loop for testing (Python 3.10+ requires this) + parent.loop = asyncio.new_event_loop() parent.gatt_server = Mock() parent.gatt_server.send_notification = AsyncMock(return_value=True) diff --git a/tests/test_identity_cache.py b/tests/test_identity_cache.py index d52968d..c0cc77c 100644 --- a/tests/test_identity_cache.py +++ b/tests/test_identity_cache.py @@ -90,6 +90,8 @@ def ble_interface(mock_rns, mock_driver): interface._identity_cache_ttl = 60 interface._pending_detach = {} # identity_hash -> timestamp interface._pending_detach_grace_period = 2.0 # seconds + interface._last_real_data = {} # Track last real data activity for zombie detection + interface._zombie_timeout = 30.0 # Zombie connection timeout # Fragmentation interface.fragmenters = {} diff --git a/tests/test_interface_cleanup.py b/tests/test_interface_cleanup.py index dfb3a3b..91633b6 100644 --- a/tests/test_interface_cleanup.py +++ b/tests/test_interface_cleanup.py @@ -99,6 +99,10 @@ def ble_interface(mock_rns, mock_driver): interface._pending_detach = {} interface._pending_detach_grace_period = 2.0 + # Zombie detection + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + # Fragmentation interface.fragmenters = {} interface.reassemblers = {} diff --git a/tests/test_mac_rotation_blacklist_bug.py b/tests/test_mac_rotation_blacklist_bug.py new file mode 100644 index 0000000..988ac02 --- /dev/null +++ b/tests/test_mac_rotation_blacklist_bug.py @@ -0,0 +1,839 @@ +""" +Tests for MAC rotation blacklist behavior. + +These tests verify that duplicate identity rejection during MAC rotation +does NOT incorrectly trigger the connection blacklist. + +Bug Description: +---------------- +When Android rotates a device's MAC address, the new MAC may try to connect +while the old MAC is still connected. The _check_duplicate_identity function +correctly rejects this as a duplicate. However, the rejection triggers an +error callback with "Connection failed to..." which matches the blacklist +regex pattern, incorrectly blacklisting the new MAC. + +This causes connectivity gaps because: +1. MAC_OLD connected with identity X +2. MAC_NEW tries to connect (same identity) -> rejected (correct) +3. Rejection triggers _record_connection_failure(MAC_NEW) (incorrect!) +4. After 3 rejections, MAC_NEW is blacklisted for 60s+ +5. MAC_OLD disconnects, identity cleared +6. MAC_NEW is blacklisted, can't reconnect for 60s+ (BUG!) + +Expected Behavior: +----------------- +Duplicate identity rejection should NOT count as a connection failure. +It's an intentional rejection, not a failure. +""" + +import pytest +import time +import re +import asyncio +import threading +from unittest.mock import Mock, MagicMock, AsyncMock, patch + + +class TestMacRotationBlacklistBug: + """Test that duplicate identity rejection doesn't trigger blacklist.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a minimal mock of BLEInterface with blacklist behavior.""" + try: + from ble_reticulum.BLEInterface import BLEInterface, DiscoveredPeer + except ImportError: + pytest.skip("BLEInterface not available") + + # Create mock interface with required attributes + interface = Mock(spec=BLEInterface) + interface.connection_blacklist = {} + interface.identity_to_address = {} + interface.address_to_identity = {} + interface.connection_retry_backoff = 60 + interface.max_connection_failures = 3 + interface.discovered_peers = {} + interface.peers = {} # Track connected peers + interface._pending_detach = {} # Track pending interface detachments + interface._last_real_data = {} # Track last real data activity for zombie detection + interface._zombie_timeout = 30.0 # Zombie connection timeout + + # Mock driver (needed for _record_connection_failure and connection checks) + interface.driver = Mock() + interface.driver._remove_bluez_device = None # hasattr will return False + interface.driver.connected_peers = [] # Track driver-level connected peers + + # Import the actual methods we want to test + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + # Bind real methods to our mock + interface._check_duplicate_identity = lambda addr, identity: RealInterface._check_duplicate_identity(interface, addr, identity) + interface._record_connection_failure = lambda addr: RealInterface._record_connection_failure(interface, addr) + interface._is_blacklisted = lambda addr: RealInterface._is_blacklisted(interface, addr) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + + return interface + + def test_duplicate_identity_detected_correctly(self, mock_ble_interface): + """Verify that _check_duplicate_identity returns True for duplicates.""" + interface = mock_ble_interface + + # Setup: identity already connected at MAC_OLD (with active connection) + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Action: MAC_NEW tries to connect with same identity + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + + # Assert: Should detect as duplicate (old connection is still alive) + assert is_duplicate is True, "Should detect duplicate identity when old connection is still active" + + def test_blacklist_mechanism_triggers_after_3_failures(self, mock_ble_interface): + """ + Test that the blacklist mechanism correctly triggers after 3 connection failures. + + This tests the MECHANISM, not the fix. The fix is in the driver layer which + now uses safe message formats that don't trigger the blacklist. + + This test proves: IF _record_connection_failure is called 3 times, + THEN the device is blacklisted (correct behavior for the mechanism). + """ + interface = mock_ble_interface + + # Setup: Create a peer to track + mac = "AA:BB:CC:DD:EE:02" + + try: + from ble_reticulum.BLEInterface import DiscoveredPeer + interface.discovered_peers[mac] = DiscoveredPeer(mac, "Test-Device", -50) + except ImportError: + pytest.skip("DiscoveredPeer not available") + + # Record 3 failures - should trigger blacklist + for i in range(3): + interface._record_connection_failure(mac) + + # After 3 failures, device should be blacklisted + is_blacklisted = interface._is_blacklisted(mac) + + assert is_blacklisted is True, ( + "Blacklist mechanism should trigger after 3 failures" + ) + + def test_duplicate_rejection_with_safe_message_does_not_trigger_blacklist(self, mock_ble_interface): + """ + Test that duplicate identity rejection with safe message format + does NOT trigger blacklist. + + The fix: linux_bluetooth_driver now uses severity "info" with message + "Duplicate identity rejected for {address}" which: + 1. Doesn't match the blacklist regex "Connection failed to" + 2. Uses severity "info" which doesn't set should_blacklist=True + + This test verifies the fix works by going through _error_callback + with the safe message format. + """ + interface = mock_ble_interface + + # Setup + mac_new = "AA:BB:CC:DD:EE:02" + + try: + from ble_reticulum.BLEInterface import DiscoveredPeer, BLEInterface as RealInterface + interface.discovered_peers[mac_new] = DiscoveredPeer(mac_new, "Test-Device", -50) + interface._error_callback = lambda severity, msg, exc: RealInterface._error_callback(interface, severity, msg, exc) + except ImportError: + pytest.skip("BLEInterface not available") + + # Simulate 3 duplicate rejections with SAFE message format (the fix) + for i in range(3): + # This is the NEW behavior - safe message format with "info" severity + interface._error_callback( + "info", + f"Duplicate identity rejected for {mac_new} (MAC rotation)", + None + ) + + # After 3 rejections with safe message, device should NOT be blacklisted + is_blacklisted = interface._is_blacklisted(mac_new) + + assert is_blacklisted is False, ( + "Duplicate identity rejection with safe message format " + "should NOT trigger blacklist!" + ) + + def test_error_message_pattern_matches_blacklist_regex(self): + """ + Verify that the duplicate identity error message matches the blacklist regex. + + This proves WHY the bug occurs - the error message format triggers blacklisting. + """ + # The error message generated by linux_bluetooth_driver.py + mac_new = "AA:BB:CC:DD:EE:02" + error_message = f"Connection failed to {mac_new}: Duplicate identity - already connected via different MAC (Android MAC rotation)" + + # The regex used in _error_callback to extract address for blacklisting + blacklist_regex = r'(?:Connection (?:failed|timeout) to|to) ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})' + + match = re.search(blacklist_regex, error_message) + + # This SHOULD NOT match for duplicate identity errors + # But currently it DOES match, which is the bug + assert match is not None, "Regex matches the error message (this is why the bug occurs)" + assert match.group(1).upper() == mac_new, "Extracted address matches MAC_NEW" + + def test_real_connection_failure_should_trigger_blacklist(self, mock_ble_interface): + """ + Verify that real connection failures (timeouts, errors) DO trigger blacklist. + + This ensures we don't break legitimate blacklisting while fixing the bug. + """ + interface = mock_ble_interface + mac = "AA:BB:CC:DD:EE:03" + + # Create discovered peer + try: + from ble_reticulum.BLEInterface import DiscoveredPeer + interface.discovered_peers[mac] = DiscoveredPeer(mac, "Test-Device", -50) + except ImportError: + pytest.skip("DiscoveredPeer not available") + + # Simulate 3 real connection failures + for i in range(3): + interface._record_connection_failure(mac) + + # Real failures SHOULD trigger blacklist + is_blacklisted = interface._is_blacklisted(mac) + assert is_blacklisted is True, "Real connection failures should trigger blacklist" + + +class TestDuplicateIdentityErrorClassification: + """Test that duplicate identity errors are classified differently from connection failures.""" + + def test_duplicate_identity_error_should_be_distinguishable(self): + """ + The fix should make duplicate identity errors distinguishable from real failures. + + Options: + 1. Use different error severity (e.g., "warning" instead of "error") + 2. Use different error message format that doesn't match blacklist regex + 3. Add explicit flag to skip blacklisting + 4. Check error message content before blacklisting + """ + # This test documents the expected fix approach + # The duplicate identity error should either: + # a) Not trigger on_error at all (just log and return) + # b) Use severity that doesn't trigger blacklist + # c) Use message format that doesn't match blacklist regex + + duplicate_error_msg = "Duplicate identity - already connected via different MAC" + real_failure_msg = "Connection failed to AA:BB:CC:DD:EE:02: timeout" + + # After fix, these should be distinguishable + # For now, just document the requirement + assert "Duplicate identity" in duplicate_error_msg + assert "Connection failed" in real_failure_msg + + def test_safe_error_message_formats_dont_trigger_blacklist(self): + """ + Verify that safe error message formats for duplicate identity + don't match the blacklist regex pattern. + + This test ensures that when we fix the bug (on both Linux and Android), + we use message formats that won't trigger blacklisting. + """ + mac = "AA:BB:CC:DD:EE:02" + + # The regex used in _error_callback to extract addresses for blacklisting + blacklist_regex = r'(?:Connection (?:failed|timeout) to|to) ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})' + + # Messages that SHOULD trigger blacklist (real failures) + unsafe_messages = [ + f"Connection failed to {mac}: timeout", + f"Connection timeout to {mac}", + f"Connection failed to {mac}: GATT error", + ] + + # Messages that SHOULD NOT trigger blacklist (duplicate identity) + safe_messages = [ + f"Duplicate identity rejected for {mac}", + f"Rejecting duplicate identity from {mac} (already connected)", + f"MAC rotation duplicate detected: {mac}", + f"Duplicate identity - already connected via different MAC", + f"Identity already connected at different address, rejecting {mac}", + ] + + # Verify unsafe messages match the regex + for msg in unsafe_messages: + match = re.search(blacklist_regex, msg) + assert match is not None, f"Unsafe message should match blacklist regex: {msg}" + + # Verify safe messages do NOT match the regex + for msg in safe_messages: + match = re.search(blacklist_regex, msg) + assert match is None, f"Safe message should NOT match blacklist regex: {msg}" + + +class TestErrorCallbackBlacklistBehavior: + """Test the actual _error_callback behavior with different severities and messages.""" + + @pytest.fixture + def mock_interface_for_error_callback(self): + """Create mock interface for testing _error_callback.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock() + interface.connection_blacklist = {} + interface.discovered_peers = {} + interface.connection_retry_backoff = 60 + interface.max_connection_failures = 3 + + # Mock driver + interface.driver = Mock() + interface.driver._remove_bluez_device = None + + # Import actual method + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._record_connection_failure = lambda addr: RealInterface._record_connection_failure(interface, addr) + interface._is_blacklisted = lambda addr: RealInterface._is_blacklisted(interface, addr) + + return interface + + def test_error_severity_info_does_not_trigger_blacklist(self, mock_interface_for_error_callback): + """ + Verify that errors with severity 'info' or 'debug' don't trigger blacklist. + + The fix can use severity 'info' for duplicate identity rejections. + """ + # _error_callback only triggers blacklist for: + # - severity == "error" or "critical" + # - severity == "warning" with "Connection timeout" in message + + # Safe severities that don't trigger blacklist: + safe_severities = ['info', 'debug'] + + # This documents the expected behavior - after implementing the fix, + # duplicate identity errors should use one of these safe severities + for severity in safe_severities: + assert severity in ['info', 'debug', 'notice'], f"Severity {severity} should be safe" + + def test_error_callback_with_duplicate_identity_message(self, mock_interface_for_error_callback): + """ + Test that _error_callback properly handles duplicate identity messages. + + After the fix, duplicate identity messages should NOT trigger blacklist + regardless of how they arrive (Linux driver or Android callback). + """ + interface = mock_interface_for_error_callback + mac = "AA:BB:CC:DD:EE:02" + + # Create discovered peer + try: + from ble_reticulum.BLEInterface import DiscoveredPeer + interface.discovered_peers[mac] = DiscoveredPeer(mac, "Test-Device", -50) + except ImportError: + pytest.skip("DiscoveredPeer not available") + + # Import actual _error_callback + try: + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + # We can't easily call _error_callback directly without more setup, + # but we can verify the regex behavior + except ImportError: + pytest.skip("BLEInterface not available") + + # The key insight: if we use a message that doesn't contain + # "Connection failed to" or "Connection timeout to", blacklist won't trigger + + # Current buggy message (triggers blacklist): + buggy_msg = f"Connection failed to {mac}: Duplicate identity" + + # Fixed message (won't trigger blacklist): + fixed_msg = f"Duplicate identity rejected for {mac}" + + blacklist_regex = r'(?:Connection (?:failed|timeout) to|to) ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})' + + assert re.search(blacklist_regex, buggy_msg) is not None, "Buggy message triggers blacklist" + assert re.search(blacklist_regex, fixed_msg) is None, "Fixed message should not trigger blacklist" + + +class TestPeripheralModeDuplicateRejection: + """Test duplicate identity rejection in peripheral mode (_handle_identity_handshake).""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a minimal mock of BLEInterface for peripheral mode testing.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + from unittest.mock import MagicMock + + interface = MagicMock(spec=BLEInterface) + interface.identity_to_address = {} + interface.address_to_identity = {} + interface.peers = {} # Track connected peers + interface._pending_detach = {} # Track pending interface detachments + interface._last_real_data = {} # Track last real data activity for zombie detection + interface._zombie_timeout = 30.0 # Zombie connection timeout + + # Mock driver for disconnect calls and connection checks + interface.driver = MagicMock() + interface.driver.disconnect = MagicMock() + interface.driver.connected_peers = [] # Track driver-level connected peers + + # Configure __str__ for logging (MagicMock handles special methods) + interface.__str__ = MagicMock(return_value="BLEInterface[Test]") + + # Import the actual methods we want to test + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._check_duplicate_identity = lambda addr, identity: RealInterface._check_duplicate_identity(interface, addr, identity) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._handle_identity_handshake = lambda addr, data: RealInterface._handle_identity_handshake(interface, addr, data) + + return interface + + def test_peripheral_mode_rejects_duplicate_identity(self, mock_ble_interface): + """ + Test that _handle_identity_handshake rejects duplicate identity. + + In peripheral mode, when a central sends an identity handshake with an + identity that's already connected at a different MAC, the connection + should be rejected. + """ + interface = mock_ble_interface + + # Setup: identity already connected at MAC_OLD (with active connection) + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Check: duplicate should be detected for MAC_NEW + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + + assert is_duplicate is True, ( + "Peripheral mode should detect duplicate identity when same identity " + "is already connected at different MAC" + ) + + def test_peripheral_mode_allows_new_identity(self, mock_ble_interface): + """ + Test that _handle_identity_handshake allows new identity. + + When a central sends an identity that's not already connected, + the connection should be allowed. + """ + interface = mock_ble_interface + + # Setup: no existing connections + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_new = "AA:BB:CC:DD:EE:02" + + # Check: should not be detected as duplicate + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + + assert is_duplicate is False, ( + "Peripheral mode should allow new identity" + ) + + def test_peripheral_mode_allows_same_mac_identity_refresh(self, mock_ble_interface): + """ + Test that _handle_identity_handshake allows identity refresh from same MAC. + + When a central reconnects from the same MAC with the same identity, + it should be allowed (not considered duplicate). + """ + interface = mock_ble_interface + + # Setup: identity already connected at same MAC + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac = "AA:BB:CC:DD:EE:01" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac + + # Check: same MAC should not be duplicate + is_duplicate = interface._check_duplicate_identity(mac, identity) + + assert is_duplicate is False, ( + "Peripheral mode should allow identity refresh from same MAC" + ) + + def test_handle_identity_handshake_rejects_duplicate_and_disconnects(self, mock_ble_interface): + """ + Test that _handle_identity_handshake actually disconnects when duplicate detected. + + This tests the full code path in _handle_identity_handshake that: + 1. Calls _check_duplicate_identity + 2. Logs warning + 3. Calls driver.disconnect() + 4. Returns True (handshake consumed) + """ + interface = mock_ble_interface + + # Setup: identity already connected at MAC_OLD (with active connection) + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Mock the driver to track disconnect calls + disconnect_called = [] + interface.driver.disconnect = lambda addr: disconnect_called.append(addr) + + # Call _handle_identity_handshake with duplicate identity + result = interface._handle_identity_handshake(mac_new, identity) + + # Should return True (handshake consumed/rejected) + assert result is True, "Should return True when duplicate rejected" + + # Should have called disconnect on the new MAC + assert mac_new in disconnect_called, ( + f"Should disconnect duplicate connection. Called: {disconnect_called}" + ) + + # Should NOT have added the new MAC to mappings + assert mac_new not in interface.address_to_identity, ( + "Should not add duplicate identity to address_to_identity" + ) + + def test_handle_identity_handshake_disconnect_exception_handled(self, mock_ble_interface): + """ + Test that _handle_identity_handshake handles disconnect exceptions gracefully. + + If driver.disconnect() raises an exception, it should be caught and logged, + but the handshake should still return True (rejected). + """ + interface = mock_ble_interface + + # Setup: identity already connected at MAC_OLD (with active connection) + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Mock the driver to raise exception on disconnect + def raise_on_disconnect(addr): + raise Exception("Disconnect failed") + interface.driver.disconnect = raise_on_disconnect + + # Call _handle_identity_handshake - should not raise + result = interface._handle_identity_handshake(mac_new, identity) + + # Should still return True (handshake consumed/rejected) + assert result is True, "Should return True even if disconnect fails" + + +class TestLinuxDriverDuplicateIdentityErrorHandling: + """Tests for linux_bluetooth_driver duplicate identity exception handling.""" + + def test_duplicate_identity_error_logs_warning_not_error(self): + """ + Test that duplicate identity exceptions are logged as WARNING, not ERROR. + + When a connection fails due to duplicate identity detection, we want to + log it as a warning (expected behavior during MAC rotation) not an error. + """ + from ble_reticulum.linux_bluetooth_driver import LinuxBluetoothDriver + + # Create driver with minimal mocking + driver = LinuxBluetoothDriver.__new__(LinuxBluetoothDriver) + driver._log_messages = [] + + def capture_log(msg, level="INFO"): + driver._log_messages.append((msg, level)) + + driver._log = capture_log + + # Test the error message detection logic directly + error_str = "Duplicate identity rejected for AA:BB:CC:DD:EE:FF" + is_duplicate = "Duplicate identity" in error_str + + assert is_duplicate is True, "Should detect duplicate identity in error message" + + # Log with appropriate level based on detection + if is_duplicate: + driver._log(f"Duplicate identity rejected: {error_str}", "WARNING") + else: + driver._log(f"Connection failed: {error_str}", "ERROR") + + # Check the log level + assert len(driver._log_messages) == 1 + msg, level = driver._log_messages[0] + assert level == "WARNING", f"Expected WARNING level, got {level}" + assert "Duplicate identity" in msg + + def test_normal_connection_error_logs_as_error(self): + """ + Test that normal connection errors are still logged as ERROR. + """ + from ble_reticulum.linux_bluetooth_driver import LinuxBluetoothDriver + + driver = LinuxBluetoothDriver.__new__(LinuxBluetoothDriver) + driver._log_messages = [] + + def capture_log(msg, level="INFO"): + driver._log_messages.append((msg, level)) + + driver._log = capture_log + + # Normal connection error (not duplicate identity) + error_str = "Connection timeout to AA:BB:CC:DD:EE:FF" + is_duplicate = "Duplicate identity" in error_str + + assert is_duplicate is False, "Should not detect duplicate identity in timeout error" + + # Log with appropriate level + if is_duplicate: + driver._log(f"Duplicate identity rejected: {error_str}", "WARNING") + else: + driver._log(f"Connection failed: {error_str}", "ERROR") + + # Check the log level + assert len(driver._log_messages) == 1 + msg, level = driver._log_messages[0] + assert level == "ERROR", f"Expected ERROR level, got {level}" + + def test_error_callback_uses_info_for_duplicate_identity(self): + """ + Test that on_error callback uses 'info' severity for duplicate identity. + + This prevents the blacklist from being triggered for expected MAC rotation + rejections. + """ + from ble_reticulum.linux_bluetooth_driver import LinuxBluetoothDriver + + driver = LinuxBluetoothDriver.__new__(LinuxBluetoothDriver) + error_callbacks = [] + + def capture_error(severity, message, exception): + error_callbacks.append((severity, message, exception)) + + driver.on_error = capture_error + + # Simulate duplicate identity error handling + error = Exception("Duplicate identity detected") + error_str = str(error) + is_duplicate = "Duplicate identity" in error_str + address = "AA:BB:CC:DD:EE:FF" + + if is_duplicate: + driver.on_error("info", f"Duplicate identity rejected for {address} (MAC rotation)", error) + else: + driver.on_error("error", f"Connection failed to {address}: {error}", error) + + # Check callback was called with 'info' severity + assert len(error_callbacks) == 1 + severity, msg, exc = error_callbacks[0] + assert severity == "info", f"Expected 'info' severity for duplicate identity, got '{severity}'" + assert "Duplicate identity rejected" in msg + assert "MAC rotation" in msg + + +class TestLinuxDriverDuplicateIdentityCodePath: + """ + Integration tests that exercise the actual code path in linux_bluetooth_driver.py. + + These tests patch BleakClient to trigger the duplicate identity exception handling + code in _connect_to_peer_async(), ensuring the actual driver code is covered. + """ + + @pytest.fixture + def driver_with_callbacks(self): + """Create a LinuxBluetoothDriver with minimal setup for testing.""" + from ble_reticulum.linux_bluetooth_driver import LinuxBluetoothDriver + + # Create driver instance using __new__ to skip __init__ + driver = LinuxBluetoothDriver.__new__(LinuxBluetoothDriver) + + # Set up minimal required attributes for _connect_to_peer + driver._log_messages = [] + driver._connecting_peers = set() + driver._connecting_lock = threading.RLock() + driver._connected_peers = {} + driver._peer_roles = {} + driver._peers = {} + driver._peers_lock = threading.RLock() + driver._connection_timeout = 10.0 + driver.connection_timeout = 10.0 # Property alias + driver._service_discovery_delay = 0.5 + driver.service_discovery_delay = 0.5 # Property alias + driver.service_uuid = "37145b00-442d-4a94-917f-8f42c5da28e3" + driver.tx_char_uuid = "37145b00-442d-4a94-917f-8f42c5da28e4" + driver.rx_char_uuid = "37145b00-442d-4a94-917f-8f42c5da28e5" + driver.identity_char_uuid = "37145b00-442d-4a94-917f-8f42c5da28e6" + driver._local_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + driver.adapter_path = "/org/bluez/hci0" + + # BlueZ version tracking (skip LE-specific connection path) + driver.bluez_version = None + driver.has_connect_device = False + + # Capture log messages + def capture_log(msg, level="INFO"): + driver._log_messages.append((msg, level)) + + driver._log = capture_log + + # Capture error callbacks + driver._error_callbacks = [] + + def capture_error(severity, message, exception): + driver._error_callbacks.append((severity, message, exception)) + + driver.on_error = capture_error + + # Mock _remove_bluez_device + driver._remove_bluez_device = AsyncMock(return_value=True) + + # Mock callbacks (not needed for this test path) + driver.on_device_connected = None + driver.on_device_disconnected = None + driver.on_mtu_negotiated = None + + return driver + + @pytest.mark.asyncio + async def test_duplicate_identity_exception_uses_warning_log(self, driver_with_callbacks): + """ + Test that when BleakClient raises a 'Duplicate identity' exception, + the driver logs it as WARNING instead of ERROR. + + This exercises the actual code path in linux_bluetooth_driver.py:1207-1214. + """ + driver = driver_with_callbacks + address = "AA:BB:CC:DD:EE:FF" + + # Create a mock client that raises duplicate identity exception on connect + mock_client = AsyncMock() + mock_client.is_connected = False + mock_client.connect = AsyncMock( + side_effect=Exception("Duplicate identity - already connected via different MAC (Android MAC rotation)") + ) + + # Patch BleakClient to return our mock + with patch('ble_reticulum.linux_bluetooth_driver.BleakClient', return_value=mock_client): + # Call the async connection method + await driver._connect_to_peer(address) + + # Verify the log message uses WARNING level + warning_logs = [(msg, lvl) for msg, lvl in driver._log_messages if lvl == "WARNING"] + error_logs = [(msg, lvl) for msg, lvl in driver._log_messages if lvl == "ERROR"] + + # Should have WARNING log for duplicate identity + assert any("Duplicate identity rejected" in msg for msg, _ in warning_logs), \ + f"Expected WARNING log with 'Duplicate identity rejected', got warnings: {warning_logs}" + + # Should NOT have ERROR log for duplicate identity + duplicate_errors = [msg for msg, _ in error_logs if "Duplicate identity" in msg] + assert len(duplicate_errors) == 0, \ + f"Should not log duplicate identity as ERROR, got: {duplicate_errors}" + + @pytest.mark.asyncio + async def test_duplicate_identity_exception_uses_info_severity_callback(self, driver_with_callbacks): + """ + Test that when BleakClient raises a 'Duplicate identity' exception, + the on_error callback is called with 'info' severity, not 'error'. + + This exercises the actual code path in linux_bluetooth_driver.py:1234-1238. + """ + driver = driver_with_callbacks + address = "AA:BB:CC:DD:EE:FF" + + # Create a mock client that raises duplicate identity exception on connect + mock_client = AsyncMock() + mock_client.is_connected = False + mock_client.connect = AsyncMock( + side_effect=Exception("Duplicate identity - already connected via different MAC") + ) + + # Patch BleakClient to return our mock + with patch('ble_reticulum.linux_bluetooth_driver.BleakClient', return_value=mock_client): + await driver._connect_to_peer(address) + + # Verify the on_error callback was called with 'info' severity + assert len(driver._error_callbacks) == 1, \ + f"Expected exactly 1 error callback, got {len(driver._error_callbacks)}" + + severity, message, exception = driver._error_callbacks[0] + + assert severity == "info", \ + f"Expected 'info' severity for duplicate identity, got '{severity}'" + + assert "Duplicate identity rejected" in message, \ + f"Expected 'Duplicate identity rejected' in message, got: {message}" + + assert "MAC rotation" in message, \ + f"Expected 'MAC rotation' in message, got: {message}" + + @pytest.mark.asyncio + async def test_normal_exception_still_uses_error_severity(self, driver_with_callbacks): + """ + Test that normal (non-duplicate-identity) exceptions still use 'error' severity. + + This ensures we didn't break normal error handling. + """ + driver = driver_with_callbacks + address = "AA:BB:CC:DD:EE:FF" + + # Create a mock client that raises a normal exception on connect + mock_client = AsyncMock() + mock_client.is_connected = False + mock_client.connect = AsyncMock( + side_effect=Exception("Connection refused by peer") + ) + mock_client.disconnect = AsyncMock() + + # Patch BleakClient to return our mock + with patch('ble_reticulum.linux_bluetooth_driver.BleakClient', return_value=mock_client): + await driver._connect_to_peer(address) + + # Verify the on_error callback was called with 'error' severity + assert len(driver._error_callbacks) == 1, \ + f"Expected exactly 1 error callback, got {len(driver._error_callbacks)}" + + severity, message, exception = driver._error_callbacks[0] + + assert severity == "error", \ + f"Expected 'error' severity for normal failure, got '{severity}'" + + assert "Connection failed to" in message, \ + f"Expected 'Connection failed to' in message, got: {message}" + + # Verify ERROR log was used + error_logs = [(msg, lvl) for msg, lvl in driver._log_messages if lvl == "ERROR"] + assert any("Connection failed to" in msg for msg, _ in error_logs), \ + f"Expected ERROR log with 'Connection failed to', got: {error_logs}" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_peer_address_mac_rotation.py b/tests/test_peer_address_mac_rotation.py index eb9abd5..4f9614c 100644 --- a/tests/test_peer_address_mac_rotation.py +++ b/tests/test_peer_address_mac_rotation.py @@ -243,6 +243,9 @@ class TestPeerAddressMacRotation: When we receive an identity handshake from a new address but already have an interface for that identity, we must update peer_address. + + Note: This simulates MAC rotation where the old connection has dropped + but the peer interface is still alive (waiting for reconnection). """ old_addr = ble_interface._old_address new_addr = ble_interface._new_address @@ -253,8 +256,16 @@ class TestPeerAddressMacRotation: assert mock_peer_interface.peer_address == old_addr assert identity_hash in ble_interface.spawned_interfaces - # Setup: peer connected at new address - but NO identity mapping yet - # This simulates a central reconnecting at a new MAC address + # Setup for MAC rotation: old connection is gone, new connection arrives + # Remove old address from peers (simulates old connection dropped) + del ble_interface.peers[old_addr] + # Remove old address from address_to_identity (cleaned up after disconnect) + del ble_interface.address_to_identity[old_addr] + # Remove old address from identity_to_address + # (this gets cleared during disconnect cleanup in real code) + del ble_interface.identity_to_address[identity_hash] + + # New peer connects at new address ble_interface.peers[new_addr] = (Mock(is_connected=True), 0, 185) # NOTE: Do NOT add new_addr to address_to_identity - the handshake does that diff --git a/tests/test_v2_2_identity_handshake.py b/tests/test_v2_2_identity_handshake.py index 0f9d87a..e83a272 100644 --- a/tests/test_v2_2_identity_handshake.py +++ b/tests/test_v2_2_identity_handshake.py @@ -53,54 +53,55 @@ if not hasattr(RNS, 'Identity'): RNS.Identity = MagicMock() RNS.Identity.full_hash = lambda x: (x * 2)[:16] # Simple mock -# Mock ble_reticulum.Interface (required by BLEInterface.py) -# First, ensure mock is in place BEFORE any imports that need it -rns_interfaces_mock = MagicMock() -_sys.modules['ble_reticulum'] = rns_interfaces_mock -_sys.modules['ble_reticulum.Interface'] = MagicMock() +# Mock ble_reticulum.Interface module (the base class module, not the whole namespace) +# We only mock the Interface.py module, allowing BLEInterface.py to be imported from src/ +if 'ble_reticulum.Interface' not in _sys.modules: + # Create mock Interface base class + class MockInterface: + MODE_FULL = 1 + def __init__(self): + self.IN = True + self.OUT = True + self.online = False -# Create mock Interface base class -class MockInterface: - MODE_FULL = 1 - def __init__(self): - self.IN = True - self.OUT = True - self.online = False + @staticmethod + def get_config_obj(configuration): + """Mock config object wrapper - just returns a dict-like object.""" + class ConfigObj: + def __init__(self, config): + self._config = config if config else {} - @staticmethod - def get_config_obj(configuration): - """Mock config object that returns dict values via attribute access.""" - class ConfigObj: - def __init__(self, config_dict): - self._config = config_dict if isinstance(config_dict, dict) else {} + def __getitem__(self, key): + return self._config.get(key) - def __getattr__(self, name): - return self._config.get(name) + def get(self, key, default=None): + return self._config.get(key, default) - def __contains__(self, key): - return key in self._config + def as_string(self, key, default=None): + val = self._config.get(key, default) + return str(val) if val is not None else default - def get(self, key, default=None): - return self._config.get(key, default) + def as_int(self, key, default=None): + val = self._config.get(key, default) + return int(val) if val is not None else default - def as_dict(self): - return self._config + def as_bool(self, key, default=False): + val = self._config.get(key, default) + if isinstance(val, bool): + return val + if isinstance(val, str): + return val.lower() in ('true', 'yes', '1', 'on') + return bool(val) if val is not None else default + return ConfigObj(configuration) - return ConfigObj(configuration) - -rns_interfaces_mock.Interface = MockInterface -_sys.modules['ble_reticulum.Interface'].Interface = MockInterface + # Create a mock module for ble_reticulum.Interface + interface_module = MagicMock() + interface_module.Interface = MockInterface + _sys.modules['ble_reticulum.Interface'] = interface_module from tests.mock_ble_driver import MockBLEDriver - -# Import BLEInterface directly using importlib to bypass RNS namespace conflicts -import importlib.util -_ble_interface_path = os.path.join(os.path.dirname(__file__), '..', 'src', 'RNS', 'Interfaces', 'BLEInterface.py') -_spec = importlib.util.spec_from_file_location("BLEInterface", _ble_interface_path) -_ble_module = importlib.util.module_from_spec(_spec) -_spec.loader.exec_module(_ble_module) -BLEInterface = _ble_module.BLEInterface -DiscoveredPeer = _ble_module.DiscoveredPeer +from ble_reticulum.BLEInterface import BLEInterface, DiscoveredPeer +from ble_reticulum.BLEFragmentation import BLEFragmenter, BLEReassembler import time @@ -172,8 +173,8 @@ class TestIdentityHandshakeBasics: # Create fragmenter and peer interface (simulating post-handshake state) frag_key = interface._get_fragmenter_key(existing_identity, central_address) - interface.fragmenters[frag_key] = interface._create_fragmenter(185) - interface.reassemblers[frag_key] = interface._create_reassembler() + interface.fragmenters[frag_key] = BLEFragmenter(mtu=185) + interface.reassemblers[frag_key] = BLEReassembler() # Receive 16-byte data packet (should be processed as data, not handshake) data_packet = b'\xaa\xbb\xcc\xdd\xee\xff\x11\x22\x33\x44\x55\x66\x77\x88\x99\x00' @@ -273,11 +274,7 @@ class TestIdentityHandshakeBidirectional: peripheral_driver = MockBLEDriver(local_address="BB:BB:BB:BB:BB:BB") MockBLEDriver.link_drivers(central_driver, peripheral_driver) - # Set peripheral identity - peripheral_identity = b'\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11' - peripheral_driver.set_identity(peripheral_identity) - - # Start both drivers + # Start both drivers first (sets up characteristic UUIDs) central_driver.start( service_uuid="test-uuid", rx_char_uuid="rx-uuid", @@ -291,6 +288,10 @@ class TestIdentityHandshakeBidirectional: identity_char_uuid="identity-uuid" ) + # Set peripheral identity (after start() so characteristic UUID is available) + peripheral_identity = b'\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11' + peripheral_driver.set_identity(peripheral_identity) + # Central connects to peripheral central_driver.connect(peripheral_driver.local_address) @@ -453,5 +454,144 @@ class TestReassemblerRaceCondition: assert identity_hash in interface.spawned_interfaces, "Central mode: interface should exist" +class TestDuplicateIdentityHandshakeRaceCondition: + """ + Test handling of duplicate identity handshake data. + + When Kotlin provides the identity via callback (from reading the identity characteristic), + the address_to_identity mapping gets set BEFORE the 16-byte handshake data arrives + through _data_received_callback. The fix ensures this duplicate handshake data is + consumed and not passed to the reassembler where it would cause "Invalid fragment type" errors. + """ + + def test_duplicate_handshake_matching_identity_consumed(self): + """Test that duplicate 16-byte handshake matching known identity is consumed.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + central_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Simulate identity being set via Kotlin callback (before handshake data arrives) + interface.address_to_identity[central_address] = central_identity + identity_hash = interface._compute_identity_hash(central_identity) + interface.identity_to_address[identity_hash] = central_address + + # Now the handshake data arrives through data channel + # This should be consumed (return True) and not passed to reassembler + result = interface._handle_identity_handshake(central_address, central_identity) + + assert result is True, "Duplicate handshake matching identity should be consumed" + # Identity should still be the same + assert interface.address_to_identity[central_address] == central_identity + + def test_duplicate_handshake_different_identity_still_consumed(self): + """Test that 16-byte data different from known identity is still consumed.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + known_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + different_16_bytes = b'\xff\xfe\xfd\xfc\xfb\xfa\xf9\xf8\xf7\xf6\xf5\xf4\xf3\xf2\xf1\xf0' + + # Simulate identity being set via Kotlin callback + interface.address_to_identity[central_address] = known_identity + identity_hash = interface._compute_identity_hash(known_identity) + interface.identity_to_address[identity_hash] = central_address + + # Different 16-byte data arrives - should still be consumed to prevent reassembler errors + result = interface._handle_identity_handshake(central_address, different_16_bytes) + + assert result is True, "Different 16-byte data should be consumed to prevent reassembler errors" + # Original identity should be preserved + assert interface.address_to_identity[central_address] == known_identity + + def test_non_16_byte_data_not_consumed_as_handshake(self): + """Test that non-16-byte data is not consumed as handshake even with known identity.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + known_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Set identity via callback + interface.address_to_identity[central_address] = known_identity + + # Non-16-byte data should not be consumed as handshake + result_15 = interface._handle_identity_handshake(central_address, b'\x00' * 15) + result_17 = interface._handle_identity_handshake(central_address, b'\x00' * 17) + result_10 = interface._handle_identity_handshake(central_address, b'\x00' * 10) + + assert result_15 is False, "15-byte data should not be consumed as handshake" + assert result_17 is False, "17-byte data should not be consumed as handshake" + assert result_10 is False, "10-byte data should not be consumed as handshake" + + def test_16_byte_data_without_known_identity_processed_as_handshake(self): + """Test that 16-byte data without known identity is processed as new handshake.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + central_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # No identity set - this should be processed as a new handshake + assert central_address not in interface.address_to_identity + + result = interface._handle_identity_handshake(central_address, central_identity) + + assert result is True, "16-byte data without known identity should be processed as handshake" + assert interface.address_to_identity[central_address] == central_identity + + def test_handshake_cleans_up_pending_identity_connection(self): + """Test that successful handshake cleans up _pending_identity_connections.""" + from unittest.mock import Mock + + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + # Mock get_peer_mtu which is needed during handshake processing + driver.get_peer_mtu = Mock(return_value=185) + + central_address = "11:22:33:44:55:66" + central_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Simulate that this address was waiting for identity handshake + interface._pending_identity_connections[central_address] = time.time() + + # Verify pending entry exists + assert central_address in interface._pending_identity_connections + + # Process handshake + result = interface._handle_identity_handshake(central_address, central_identity) + + # Verify handshake succeeded + assert result is True, "Handshake should succeed" + assert interface.address_to_identity[central_address] == central_identity + + # Verify pending identity connection was cleaned up + assert central_address not in interface._pending_identity_connections, \ + "Pending identity connection should be removed after successful handshake" + + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/test_v2_2_race_conditions.py b/tests/test_v2_2_race_conditions.py index afa13a2..460347b 100644 --- a/tests/test_v2_2_race_conditions.py +++ b/tests/test_v2_2_race_conditions.py @@ -64,11 +64,9 @@ if not hasattr(RNS, 'Identity'): RNS.Identity = MagicMock() RNS.Identity.full_hash = lambda x: (x * 2)[:16] -# Mock ble_reticulum.Interface (required by BLEInterface.py) -if 'ble_reticulum' not in _sys.modules: - rns_interfaces_mock = MagicMock() - _sys.modules['ble_reticulum'] = rns_interfaces_mock - +# Mock ble_reticulum.Interface module (the base class module, not the whole namespace) +# We only mock the Interface.py module, allowing BLEInterface.py to be imported from src/ +if 'ble_reticulum.Interface' not in _sys.modules: # Create mock Interface base class class MockInterface: MODE_FULL = 1 @@ -77,7 +75,40 @@ if 'ble_reticulum' not in _sys.modules: self.OUT = True self.online = False - rns_interfaces_mock.Interface = MockInterface + @staticmethod + def get_config_obj(configuration): + """Mock config object wrapper - just returns a dict-like object.""" + class ConfigObj: + def __init__(self, config): + self._config = config if config else {} + + def __getitem__(self, key): + return self._config.get(key) + + def get(self, key, default=None): + return self._config.get(key, default) + + def as_string(self, key, default=None): + val = self._config.get(key, default) + return str(val) if val is not None else default + + def as_int(self, key, default=None): + val = self._config.get(key, default) + return int(val) if val is not None else default + + def as_bool(self, key, default=False): + val = self._config.get(key, default) + if isinstance(val, bool): + return val + if isinstance(val, str): + return val.lower() in ('true', 'yes', '1', 'on') + return bool(val) if val is not None else default + return ConfigObj(configuration) + + # Create a mock module for ble_reticulum.Interface + interface_module = MagicMock() + interface_module.Interface = MockInterface + _sys.modules['ble_reticulum.Interface'] = interface_module from tests.mock_ble_driver import MockBLEDriver from ble_reticulum.BLEInterface import BLEInterface, DiscoveredPeer @@ -121,7 +152,8 @@ class TestRateLimiting: def test_connection_allowed_after_5_seconds(self): """Test that connection is allowed after 5-second cooldown.""" - driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + # Use local MAC lower than peer MAC so connection direction allows us to initiate + driver = MockBLEDriver(local_address="11:11:11:11:11:11") owner = MockOwner() config = {"name": "Test", "enable_central": True} @@ -129,7 +161,7 @@ class TestRateLimiting: interface.driver = driver interface.local_address = driver.local_address - peer_address = "11:22:33:44:55:66" + peer_address = "22:22:22:22:22:22" peer = DiscoveredPeer(peer_address, "TestPeer", -60) # Record connection attempt 6 seconds ago (past cooldown) @@ -146,7 +178,8 @@ class TestRateLimiting: def test_never_attempted_peer_allowed(self): """Test that peer with no prior attempts is allowed.""" - driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + # Use local MAC lower than peer MAC so connection direction allows us to initiate + driver = MockBLEDriver(local_address="11:11:11:11:11:11") owner = MockOwner() config = {"name": "Test", "enable_central": True} @@ -154,7 +187,7 @@ class TestRateLimiting: interface.driver = driver interface.local_address = driver.local_address - peer_address = "11:22:33:44:55:66" + peer_address = "22:22:22:22:22:22" peer = DiscoveredPeer(peer_address, "TestPeer", -60) # last_connection_attempt == 0 (never attempted) @@ -208,7 +241,8 @@ class TestDriverStateTracking: def test_multiple_rapid_discoveries_handled(self): """Test that rapid discovery callbacks don't cause duplicate connections.""" - driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + # Use local MAC lower than peer MAC so connection direction allows us to initiate + driver = MockBLEDriver(local_address="11:11:11:11:11:11") owner = MockOwner() config = {"name": "Test", "enable_central": True} @@ -216,21 +250,30 @@ class TestDriverStateTracking: interface.driver = driver interface.local_address = driver.local_address - peer_address = "11:22:33:44:55:66" + peer_address = "22:22:22:22:22:22" peer = DiscoveredPeer(peer_address, "TestPeer", -60) - # Simulate rapid discovery callbacks (5 times in quick succession) - for i in range(5): - interface.discovered_peers[peer_address] = peer - interface._select_peers_to_connect() + # Manually record the first connection attempt (simulating what _try_connect_to_peer does) + # This is needed because _select_peers_to_connect() only returns peers to connect, + # it doesn't actually initiate the connection or record the attempt + interface.discovered_peers[peer_address] = peer + first_selection = interface._select_peers_to_connect() - # After first selection, peer should have recorded attempt - # Subsequent selections should be rate-limited + # First selection should include the peer + assert len(first_selection) == 1 + assert first_selection[0].address == peer_address - # Check that last_connection_attempt was recorded + # Record the attempt (simulating what happens when connection is initiated) + peer.record_connection_attempt() + + # Subsequent rapid selections should be rate-limited + for i in range(4): + subsequent_selection = interface._select_peers_to_connect() + # Should be empty because peer was just attempted + assert len(subsequent_selection) == 0, f"Selection {i+2} should be empty due to rate limiting" + + # Verify timestamp was recorded assert peer.last_connection_attempt > 0 - - # Verify recent timestamp time_since = time.time() - peer.last_connection_attempt assert time_since < 1.0 # Should be very recent diff --git a/tests/test_zombie_connection_detection.py b/tests/test_zombie_connection_detection.py new file mode 100644 index 0000000..f5353df --- /dev/null +++ b/tests/test_zombie_connection_detection.py @@ -0,0 +1,700 @@ +""" +Tests for zombie connection detection. + +Zombie connections occur when the BLE link degrades - 1-byte keepalives still work +but larger data packets fail. Both sides think the connection is "alive" based on +keepalives, but data can't flow. This causes a deadlock where new connections are +rejected as "duplicates" even though the existing connection is non-functional. + +The zombie detection feature tracks when real data (not keepalives) was last +received, and allows new connections if the existing connection has been a +"zombie" (only keepalives) for longer than the timeout. +""" + +import pytest +import time +import threading +from unittest.mock import Mock, MagicMock, patch + + +class TestZombieConnectionDetection: + """Test zombie connection detection in _check_duplicate_identity.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface with real method bindings.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.identity_to_address = {} + interface.address_to_identity = {} + interface.address_to_interface = {} # For _cleanup_stale_address + interface.pending_mtu = {} # For _cleanup_stale_address + interface.fragmenters = {} # For _cleanup_stale_address + interface.reassemblers = {} # For _cleanup_stale_address + interface.frag_lock = threading.RLock() # For _cleanup_stale_address + interface.peers = {} + interface._pending_detach = {} + interface._last_real_data = {} + interface._zombie_timeout = 30.0 # 30 seconds default + interface.driver = Mock() + interface.driver.connected_peers = [] + interface.driver.disconnect = Mock() + + # Import the actual methods we want to test + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._check_duplicate_identity = lambda addr, identity: RealInterface._check_duplicate_identity(interface, addr, identity) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._cleanup_stale_address = lambda ih, addr: RealInterface._cleanup_stale_address(interface, ih, addr) + interface._get_fragmenter_key = lambda identity, addr: RealInterface._get_fragmenter_key(interface, identity, addr) + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + return interface + + def test_healthy_connection_rejects_duplicate(self, mock_ble_interface): + """Test that a healthy connection (recent real data) rejects duplicates.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Connection is healthy - real data received recently + interface._last_real_data[identity_hash] = time.time() - 10 # 10 seconds ago + + # Should reject as duplicate + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert is_duplicate, "Should reject duplicate when existing connection is healthy" + + def test_zombie_connection_allows_reconnection(self, mock_ble_interface): + """Test that a zombie connection (no real data for > timeout) allows reconnection.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Connection is zombie - no real data for > timeout + interface._last_real_data[identity_hash] = time.time() - 60 # 60 seconds ago (> 30s timeout) + + # Should allow reconnection + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection when existing connection is a zombie" + + def test_zombie_disconnects_old_connection(self, mock_ble_interface): + """Test that detecting a zombie triggers disconnect of the old connection.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.address_to_identity[mac_old] = identity + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Connection is zombie + interface._last_real_data[identity_hash] = time.time() - 60 + + # Check for duplicate + interface._check_duplicate_identity(mac_new, identity) + + # Should have called disconnect on old address + interface.driver.disconnect.assert_called_once_with(mac_old) + + def test_connection_at_zombie_threshold(self, mock_ble_interface): + """Test behavior at the zombie timeout threshold.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Just under the threshold - should still reject (needs to be > timeout) + # Use 29.5s to avoid timing flakiness (time passes between setting and checking) + interface._last_real_data[identity_hash] = time.time() - 29.5 # just under 30s + + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + # At under 30s, it should NOT be considered a zombie (needs to be > 30s) + assert is_duplicate, "Should reject duplicate when under threshold" + + def test_no_last_data_timestamp_does_not_zombie(self, mock_ble_interface): + """Test that missing last_real_data timestamp doesn't trigger zombie detection.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # No last_real_data entry - should not trigger zombie detection + # (This handles newly connected peers before any data is received) + + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + # Without timestamp, we can't determine zombie state - reject as duplicate + assert is_duplicate, "Should reject duplicate when no timestamp exists" + + def test_custom_zombie_timeout(self, mock_ble_interface): + """Test that custom zombie timeout is respected.""" + interface = mock_ble_interface + interface._zombie_timeout = 10.0 # Custom 10 second timeout + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # 15 seconds ago - should be zombie with 10s timeout + interface._last_real_data[identity_hash] = time.time() - 15 + + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection with custom zombie timeout" + + +class TestZombieTrackingOnDataReceive: + """Test that real data updates the _last_real_data timestamp.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface with real _handle_ble_data.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.address_to_identity = {} + interface.identity_to_address = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + interface.fragmenters = {} + interface.reassemblers = {} + interface.frag_lock = threading.RLock() + interface.driver = Mock() + interface.driver.request_identity_resync = Mock() + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._handle_ble_data = lambda addr, data: RealInterface._handle_ble_data(interface, addr, data) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._get_fragmenter_key = lambda identity, addr: RealInterface._get_fragmenter_key(interface, identity, addr) + + return interface + + def test_real_data_updates_timestamp(self, mock_ble_interface): + """Test that receiving real data (not keepalive) updates the timestamp.""" + interface = mock_ble_interface + + address = "AA:BB:CC:DD:EE:01" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + + interface.address_to_identity[address] = identity + interface.identity_to_address[identity_hash] = address + + # Setup reassembler + frag_key = interface._get_fragmenter_key(identity, address) + mock_reassembler = Mock() + mock_reassembler.add_fragment = Mock(return_value=None) + interface.reassemblers[frag_key] = mock_reassembler + + # Clear any existing timestamp + interface._last_real_data.clear() + + # Receive real data (10 bytes - not a keepalive) + real_data = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a' + before_time = time.time() + interface._handle_ble_data(address, real_data) + after_time = time.time() + + # Timestamp should be updated + assert identity_hash in interface._last_real_data, "Should track real data timestamp" + timestamp = interface._last_real_data[identity_hash] + assert before_time <= timestamp <= after_time, "Timestamp should be within receive window" + + def test_keepalive_does_not_update_timestamp(self, mock_ble_interface): + """Test that receiving keepalive (1 byte 0x00) does NOT update timestamp.""" + interface = mock_ble_interface + + address = "AA:BB:CC:DD:EE:01" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + + interface.address_to_identity[address] = identity + interface.identity_to_address[identity_hash] = address + + # Set an old timestamp + old_timestamp = time.time() - 100 + interface._last_real_data[identity_hash] = old_timestamp + + # Receive keepalive (1 byte 0x00) + keepalive = bytes([0x00]) + interface._handle_ble_data(address, keepalive) + + # Timestamp should NOT be updated + assert interface._last_real_data[identity_hash] == old_timestamp, \ + "Keepalive should NOT update timestamp" + + +class TestZombieTrackingOnConnect: + """Test that connection establishment initializes _last_real_data.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface for spawn testing.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.spawned_interfaces = {} + interface.address_to_interface = {} + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + interface.HW_MTU = 500 + interface.bitrate = 700000 + interface.mode = 0 # Interface.MODE_FULL + interface.ifac_size = 0 + interface.ifac_netname = None + interface.ifac_netkey = None + interface.announce_cap = 1.0 + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + return interface + + def test_spawn_initializes_timestamp(self, mock_ble_interface): + """Test that spawning a peer interface initializes the timestamp.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Compute identity_hash the same way BLEInterface does + identity_hash = identity.hex()[:16] + + # Clear timestamp + interface._last_real_data.clear() + + # Simulate what _spawn_peer_interface does - it initializes the timestamp + before_time = time.time() + interface._last_real_data[identity_hash] = time.time() + after_time = time.time() + + # Verify the timestamp was set + assert identity_hash in interface._last_real_data, \ + "Spawning interface should initialize timestamp" + timestamp = interface._last_real_data[identity_hash] + assert before_time <= timestamp <= after_time, \ + "Timestamp should be within expected range" + + +class TestPendingDetachAllowsReconnection: + """Test that pending detach (Check 1) allows reconnection using real BLEInterface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + # Import test utilities + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_pending_detach_allows_reconnection_from_new_mac(self, ble_interface): + """Test that pending detach allows reconnection from new MAC (Check 1 path).""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up state: old connection exists in identity_to_address but has pending detach + interface.identity_to_address[identity_hash] = mac_old + interface._pending_detach[identity_hash] = time.time() # Pending detach exists + + # Note: NOT in connected_peers or peers (connection is dead) + + # Should allow reconnection because pending detach exists (Check 1) + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection when pending detach exists" + + +class TestNotConnectedAllowsReconnection: + """Test that not-connected check (Check 2) allows reconnection using real BLEInterface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_not_connected_allows_reconnection(self, ble_interface): + """Test that stale entry without connection allows reconnection (Check 2 path).""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up state: old connection exists in identity_to_address + # but NOT in connected_peers, NOT in peers, and NO pending detach + interface.identity_to_address[identity_hash] = mac_old + # Note: _pending_detach is empty, driver.connected_peers is empty, peers is empty + + # Should allow reconnection because old address is not connected (Check 2) + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection when old address is not connected" + + def test_in_peers_but_not_connected_peers_still_rejects(self, ble_interface): + """Test that being in peers dict still rejects (connection considered alive).""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up state: NOT in connected_peers but IS in peers + # This simulates a state where the driver doesn't know about the peer + # but our internal tracking does - we should still reject + interface.identity_to_address[identity_hash] = mac_old + interface.peers[mac_old] = {"connected": True} + # Note: driver.connected_peers is empty + + # Should reject because old address is in peers dict + # The logic checks: if existing_address not in self.driver.connected_peers + # AND existing_address not in self.peers + # Here the second condition fails, so it falls through to zombie check + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + # Since no timestamp exists and no pending detach, it should reject + assert is_duplicate, "Should reject when old address is still in peers" + + +class TestZombieDisconnectExceptionHandling: + """Test exception handling when zombie disconnect fails using real BLEInterface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_zombie_disconnect_exception_still_allows_reconnection(self, ble_interface): + """Test that exception during zombie disconnect doesn't prevent reconnection.""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up zombie state: old connection in maps, in connected_peers, timestamp is old + interface.identity_to_address[identity_hash] = mac_old + interface.address_to_identity[mac_old] = identity + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + interface._last_real_data[identity_hash] = time.time() - 60 # 60 seconds ago (zombie) + + # Make disconnect raise an exception by patching the driver method + original_disconnect = interface.driver.disconnect + def raising_disconnect(addr): + raise Exception("BLE disconnect failed") + interface.driver.disconnect = raising_disconnect + + # Should still allow reconnection despite exception + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection even when zombie disconnect fails" + + # Restore original method + interface.driver.disconnect = original_disconnect + + +class TestZombieCleanupOnDetach: + """Test that _last_real_data is cleaned up when interface is detached.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface for detach testing.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.spawned_interfaces = {} + interface.identity_to_address = {} + interface.address_to_identity = {} + interface._pending_detach = {} + interface._pending_detach_grace_period = 2.0 + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + interface.fragmenters = {} + interface.reassemblers = {} + interface.frag_lock = threading.RLock() + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._process_pending_detaches = lambda: RealInterface._process_pending_detaches(interface) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._get_fragmenter_key = lambda identity, addr: RealInterface._get_fragmenter_key(interface, identity, addr) + + return interface + + def test_detach_cleans_up_timestamp(self, mock_ble_interface): + """Test that detaching an interface cleans up the timestamp.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + address = "AA:BB:CC:DD:EE:01" + + # Create mock peer interface + mock_peer_if = Mock() + mock_peer_if.peer_identity = identity + mock_peer_if.detach = Mock() + + interface.spawned_interfaces[identity_hash] = mock_peer_if + interface.identity_to_address[identity_hash] = address + interface._last_real_data[identity_hash] = time.time() - 100 + + # Schedule detach in the past + interface._pending_detach[identity_hash] = time.time() - 10 # 10 seconds ago + + # Setup fragmenter key + frag_key = interface._get_fragmenter_key(identity, "") + interface.fragmenters[frag_key] = Mock() + interface.reassemblers[frag_key] = Mock() + + # Process detaches + interface._process_pending_detaches() + + # Timestamp should be cleaned up + assert identity_hash not in interface._last_real_data, \ + "Detaching interface should clean up timestamp" + + +class TestIdentityHandshakeCoverage: + """Tests for _handle_identity_handshake to achieve full PR coverage.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + # Mock get_peer_mtu needed for handshake + driver.get_peer_mtu = Mock(return_value=185) + + return interface + + def test_non_16_byte_data_returns_false(self, ble_interface): + """Test that non-16-byte data returns False (not a handshake).""" + interface = ble_interface + address = "11:22:33:44:55:66" + + # 15 bytes - too short + result = interface._handle_identity_handshake(address, b'\x00' * 15) + assert result is False, "15-byte data should return False" + + # 17 bytes - too long + result = interface._handle_identity_handshake(address, b'\x00' * 17) + assert result is False, "17-byte data should return False" + + def test_duplicate_handshake_matching_identity_consumed(self, ble_interface): + """Test that duplicate 16-byte handshake matching known identity is consumed.""" + interface = ble_interface + address = "11:22:33:44:55:66" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Pre-set identity (simulating Kotlin callback) + interface.address_to_identity[address] = identity + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = address + + # Same 16 bytes arrives - should be consumed + result = interface._handle_identity_handshake(address, identity) + assert result is True, "Duplicate handshake should be consumed" + + def test_duplicate_handshake_different_data_still_consumed(self, ble_interface): + """Test that 16-byte data different from known identity is still consumed.""" + interface = ble_interface + address = "11:22:33:44:55:66" + known_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + different_data = b'\xff\xfe\xfd\xfc\xfb\xfa\xf9\xf8\xf7\xf6\xf5\xf4\xf3\xf2\xf1\xf0' + + # Pre-set identity + interface.address_to_identity[address] = known_identity + identity_hash = interface._compute_identity_hash(known_identity) + interface.identity_to_address[identity_hash] = address + + # Different 16 bytes arrives - should still be consumed + result = interface._handle_identity_handshake(address, different_data) + assert result is True, "Different 16-byte data should be consumed" + + def test_new_handshake_cleans_pending_identity(self, ble_interface): + """Test that successful handshake cleans up _pending_identity_connections.""" + interface = ble_interface + address = "11:22:33:44:55:66" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Set pending identity connection + interface._pending_identity_connections[address] = time.time() + + # Process handshake + result = interface._handle_identity_handshake(address, identity) + + assert result is True, "Handshake should succeed" + assert address not in interface._pending_identity_connections, \ + "Pending identity should be cleaned up" + + +class TestSpawnPeerInterfaceZombieTracking: + """Test zombie tracking initialization in _spawn_peer_interface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_spawn_initializes_zombie_tracking(self, ble_interface): + """Test that spawning a peer interface initializes zombie tracking.""" + interface = ble_interface + address = "11:22:33:44:55:66" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + + # Ensure no timestamp before spawn + assert identity_hash not in interface._last_real_data + + # Spawn peer interface + before_time = time.time() + interface._spawn_peer_interface( + address=address, + name="Test-Peer", + peer_identity=identity, + mtu=185 + ) + after_time = time.time() + + # Verify timestamp was initialized + assert identity_hash in interface._last_real_data, \ + "Spawning should initialize zombie tracking" + timestamp = interface._last_real_data[identity_hash] + assert before_time <= timestamp <= after_time, \ + "Timestamp should be within spawn window"