diff --git a/src/ble_reticulum/BLEInterface.py b/src/ble_reticulum/BLEInterface.py index e93bf78..49ed348 100644 --- a/src/ble_reticulum/BLEInterface.py +++ b/src/ble_reticulum/BLEInterface.py @@ -392,6 +392,13 @@ class BLEInterface(Interface): self._pending_detach = {} self._pending_detach_grace_period = 2.0 # seconds + # Zombie connection detection (identity_hash -> timestamp of last real data) + # Connections that only exchange keepalives (no real data) for > zombie_timeout + # are considered "zombies" and won't block new connections from the same identity. + # This handles BLE link degradation where keepalives work but data doesn't. + self._last_real_data = {} + self._zombie_timeout = 30.0 # seconds - connection is zombie if no real data for this long + # Fragmentation self.fragmenters = {} # address -> BLEFragmenter (per MTU) self.reassemblers = {} # address -> BLEReassembler @@ -804,6 +811,9 @@ class BLEInterface(Interface): del self.spawned_interfaces[identity_hash] if identity_hash in self.identity_to_address: del self.identity_to_address[identity_hash] + # Clean up zombie detection tracking + if identity_hash in self._last_real_data: + del self._last_real_data[identity_hash] # Clean up fragmenter/reassembler now that interface is fully detached if peer_identity: frag_key = self._get_fragmenter_key(peer_identity, "") # Address unused in key computation @@ -1017,13 +1027,19 @@ class BLEInterface(Interface): This handles Android MAC randomization where the same device advertises with one MAC but connects with a different MAC. + IMPORTANT: Before rejecting as duplicate, we verify that the existing + connection is still alive. This prevents false rejections when: + - A peer disconnects but identity_to_address still has a stale entry + (cleanup happens after a 2-second grace period) + - The same identity reconnects with a new MAC (Android MAC rotation) + Args: address: MAC address attempting to connect peer_identity: 16-byte identity hash of the peer Returns: True if this identity is already connected via a different MAC (abort connection) - False if this is a new identity or same MAC (allow connection) + False if this is a new identity, same MAC, or stale entry (allow connection) """ if not peer_identity or len(peer_identity) != 16: return False @@ -1032,7 +1048,57 @@ class BLEInterface(Interface): existing_address = self.identity_to_address.get(identity_hash) if existing_address and existing_address != address: - # Same identity, different MAC - this is Android MAC rotation + # Same identity, different MAC - check if old connection is still alive + + # Check 1: Is there a pending detach for this identity? + # If so, the old connection is already gone - allow new connection + if identity_hash in self._pending_detach: + RNS.log( + f"{self} allowing reconnection from {address} - identity {identity_hash[:8]} " + f"has pending detach (old connection from {existing_address} is gone)", + RNS.LOG_DEBUG + ) + # Clean up stale address mappings to prepare for new connection + self._cleanup_stale_address(identity_hash, existing_address) + return False + + # Check 2: Is the existing address still connected? + # Check both driver.connected_peers and our peers dict + if existing_address not in self.driver.connected_peers: + if existing_address not in self.peers: + # Old connection is dead but cleanup hasn't happened yet + RNS.log( + f"{self} allowing reconnection from {address} - identity {identity_hash[:8]} " + f"old address {existing_address} is no longer connected", + RNS.LOG_DEBUG + ) + # Clean up stale address mappings to prepare for new connection + self._cleanup_stale_address(identity_hash, existing_address) + return False + + # Check 3: Is the existing connection a zombie? + # A "zombie" connection has keepalives working but no real data for zombie_timeout. + # This happens when BLE link degrades - 1-byte keepalives succeed but larger + # data packets fail. We allow new connections to replace zombies. + last_data_time = self._last_real_data.get(identity_hash, 0) + if last_data_time > 0: + time_since_data = time.time() - last_data_time + if time_since_data > self._zombie_timeout: + RNS.log( + f"{self} allowing reconnection from {address} - identity {identity_hash[:8]} " + f"old connection at {existing_address} is zombie (no real data for {time_since_data:.1f}s)", + RNS.LOG_WARNING + ) + # Clean up the zombie connection + self._cleanup_stale_address(identity_hash, existing_address) + # Disconnect the zombie to free up resources + try: + self.driver.disconnect(existing_address) + except Exception as e: + RNS.log(f"{self} failed to disconnect zombie {existing_address}: {e}", RNS.LOG_DEBUG) + return False + + # Existing connection is still alive and healthy - reject duplicate RNS.log( f"{self} duplicate identity detected: {identity_hash[:8]} already connected via {existing_address}, " f"rejecting connection from {address} (Android MAC rotation)", @@ -1120,14 +1186,24 @@ class BLEInterface(Interface): Returns: True if data was handled as identity handshake, False otherwise """ + # Identity handshake detection: exactly 16 bytes + if len(data) != 16: + return False # Not a handshake + # Check if we already have peer identity peer_identity = self.address_to_identity.get(address) if peer_identity: - return False # Already have identity, not a handshake - - # Identity handshake detection: exactly 16 bytes, no existing identity - if len(data) != 16: - return False # Not a handshake + # We already have identity for this address (probably set via Kotlin callback). + # The 16-byte handshake data may still arrive through the data channel. + # Check if it matches the identity we have - if so, consume it silently. + if data == peer_identity: + RNS.log(f"{self} received duplicate identity handshake from {address} (already known via callback)", RNS.LOG_DEBUG) + return True # Consume the data, don't pass to reassembler + else: + # 16 bytes but doesn't match known identity - log warning but still consume + # to avoid passing identity-like data to the reassembler + RNS.log(f"{self} received 16-byte data from {address} that differs from known identity, consuming as handshake", RNS.LOG_WARNING) + return True # Consume to prevent reassembler errors try: # Store central's identity @@ -1189,6 +1265,9 @@ class BLEInterface(Interface): RNS.log(f"{self} identity handshake complete for {address}", RNS.LOG_INFO) + # Initialize zombie detection tracking - the 16-byte handshake counts as real data + self._last_real_data[identity_hash] = time.time() + # Remove from pending identity tracking (no longer waiting for handshake) if address in self._pending_identity_connections: del self._pending_identity_connections[address] @@ -1868,6 +1947,9 @@ class BLEInterface(Interface): self.spawned_interfaces[identity_hash] = peer_if self.address_to_interface[address] = peer_if + # Initialize zombie detection tracking - interface creation counts as activity + self._last_real_data[identity_hash] = time.time() + RNS.log(f"{self} created peer interface for {name} ({identity_hash[:8]}), type={connection_type}", RNS.LOG_INFO) return peer_if @@ -1920,6 +2002,11 @@ class BLEInterface(Interface): RNS.log(f"{self} no identity for peer {peer_address}, dropping data", RNS.LOG_WARNING) return + # Track real data activity for zombie detection + # This proves the connection is alive and can carry actual data, not just keepalives + identity_hash = self._compute_identity_hash(peer_identity) + self._last_real_data[identity_hash] = time.time() + # Compute identity-based fragmenter key (matches peripheral data handler) frag_key = self._get_fragmenter_key(peer_identity, peer_address) diff --git a/tests/test_ble_duplicate_identity_stale.py b/tests/test_ble_duplicate_identity_stale.py new file mode 100644 index 0000000..7dc3380 --- /dev/null +++ b/tests/test_ble_duplicate_identity_stale.py @@ -0,0 +1,366 @@ +""" +TDD Test for BLE Duplicate Identity Stale Entry Bug + +BUG DESCRIPTION: +---------------- +When a peer disconnects, identity_to_address is NOT cleaned up immediately - it's only +removed after a 2-second grace period in _process_pending_detaches(). However, +_check_duplicate_identity() doesn't verify that the existing address is still connected. + +This causes NEW connections from the same identity (after MAC rotation or reconnection) +to be INCORRECTLY rejected as "duplicates" during the grace period. + +OBSERVED BEHAVIOR (from logs): +------------------------------ +1. Identity c9d09faa connects from MAC_OLD (47:9C:55:2F:85:5D) - SUCCESS +2. Connection disconnects (but identity_to_address still has entry due to grace period) +3. Same identity tries to connect from MAC_NEW (5C:D7:DD:D5:DD:7D) +4. _check_duplicate_identity sees identity_to_address[c9d09faa] = MAC_OLD +5. Since MAC_OLD != MAC_NEW, connection is REJECTED as duplicate +6. But MAC_OLD connection is DEAD - this is a BUG! + +EXPECTED BEHAVIOR: +----------------- +_check_duplicate_identity should: +1. Check if the existing address is still in the connected peers list +2. OR check if there's a pending detach scheduled for this identity +3. If the old connection is gone, ALLOW the new connection + +This test is written TDD-style: it should FAIL before the fix and PASS after. +""" + +import unittest +import sys +import os +import time +import hashlib + +# Add parent directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + + +class MockDriver: + """Mock BLE driver that tracks connected peers.""" + + def __init__(self): + self.connected_peers = [] + + def disconnect(self, address): + if address in self.connected_peers: + self.connected_peers.remove(address) + + +class BLEInterfaceTestHarness: + """ + Test harness that replicates the _check_duplicate_identity logic from BLEInterface. + + This is extracted from ble_reticulum/BLEInterface.py to test in isolation. + """ + + def __init__(self): + # Maps from BLEInterface + self.identity_to_address = {} # identity_hash -> address + self.address_to_identity = {} # address -> peer_identity (bytes) + self.peers = {} # address -> peer info + self._pending_detach = {} # identity_hash -> timestamp + self._pending_detach_grace_period = 2.0 # seconds + + # Mock driver for connected peers check + self.driver = MockDriver() + + def _compute_identity_hash(self, peer_identity: bytes) -> str: + """Compute 16-char hex hash of peer identity.""" + return hashlib.sha256(peer_identity).hexdigest()[:16] + + def _check_duplicate_identity_BUGGY(self, address: str, peer_identity: bytes) -> bool: + """ + BUGGY VERSION: Original implementation that doesn't check connection validity. + + This is the current (broken) behavior from BLEInterface. + """ + if not peer_identity or len(peer_identity) != 16: + return False + + identity_hash = self._compute_identity_hash(peer_identity) + existing_address = self.identity_to_address.get(identity_hash) + + if existing_address and existing_address != address: + # Same identity, different MAC - this is Android MAC rotation + # BUG: Doesn't check if existing_address is still connected! + return True + + return False + + def _check_duplicate_identity_FIXED(self, address: str, peer_identity: bytes) -> bool: + """ + FIXED VERSION: Checks if existing connection is still valid before rejecting. + + This is the expected behavior after the fix. + """ + if not peer_identity or len(peer_identity) != 16: + return False + + identity_hash = self._compute_identity_hash(peer_identity) + existing_address = self.identity_to_address.get(identity_hash) + + if existing_address and existing_address != address: + # Same identity, different MAC - check if the old connection is still alive + + # Check 1: Is there a pending detach for this identity? + # If so, the old connection is already gone - allow new connection + if identity_hash in self._pending_detach: + return False + + # Check 2: Is the existing address still connected? + # Check both driver.connected_peers and our peers dict + if existing_address not in self.driver.connected_peers: + if existing_address not in self.peers: + # Old connection is dead - allow new connection + return False + + # Existing connection is still alive - reject duplicate + return True + + return False + + def simulate_connect(self, address: str, identity: bytes): + """Simulate a successful connection with identity.""" + identity_hash = self._compute_identity_hash(identity) + self.identity_to_address[identity_hash] = address + self.address_to_identity[address] = identity + self.peers[address] = {"connected": True} + self.driver.connected_peers.append(address) + + def simulate_disconnect(self, address: str, with_grace_period: bool = True): + """ + Simulate a disconnection. + + If with_grace_period=True (default), identity_to_address is NOT cleaned up + immediately (mimicking the real behavior where cleanup happens after 2s grace period). + """ + identity = self.address_to_identity.get(address) + if identity: + identity_hash = self._compute_identity_hash(identity) + + # Remove from driver connected peers + if address in self.driver.connected_peers: + self.driver.connected_peers.remove(address) + + # Remove from our peers dict + if address in self.peers: + del self.peers[address] + + # Remove address_to_identity + if address in self.address_to_identity: + del self.address_to_identity[address] + + if with_grace_period: + # Schedule pending detach (identity_to_address NOT removed yet!) + self._pending_detach[identity_hash] = time.time() + # NOTE: identity_to_address is intentionally NOT cleaned up here + # This is the source of the bug! + else: + # Immediate cleanup (no grace period) + if identity_hash in self.identity_to_address: + del self.identity_to_address[identity_hash] + + +class TestStaleIdentityToAddressBug(unittest.TestCase): + """ + TDD Test: Demonstrates the stale identity_to_address bug. + + These tests should FAIL with the buggy implementation and PASS with the fix. + """ + + def setUp(self): + self.harness = BLEInterfaceTestHarness() + # Create test identities (16 bytes each) + self.identity_A = b'\x12\x34\x56\x78' * 4 # Device A's identity + self.MAC_OLD = "AA:BB:CC:DD:EE:01" + self.MAC_NEW = "11:22:33:44:55:02" + + def test_buggy_implementation_incorrectly_rejects_reconnection(self): + """ + Demonstrate the BUG: stale entry causes false duplicate rejection. + + This test shows what CURRENTLY happens (incorrect behavior). + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A disconnects (with grace period - entry stays in identity_to_address) + self.harness.simulate_disconnect(self.MAC_OLD, with_grace_period=True) + + # Verify: MAC_OLD is no longer connected + self.assertNotIn(self.MAC_OLD, self.harness.driver.connected_peers) + self.assertNotIn(self.MAC_OLD, self.harness.peers) + + # BUT: identity_to_address still has the stale entry! + identity_hash = self.harness._compute_identity_hash(self.identity_A) + self.assertIn(identity_hash, self.harness.identity_to_address) + self.assertEqual(self.harness.identity_to_address[identity_hash], self.MAC_OLD) + + # Step 3: Device A reconnects with MAC_NEW (Android MAC rotation) + # BUGGY IMPLEMENTATION: incorrectly returns True (rejects as duplicate) + is_duplicate = self.harness._check_duplicate_identity_BUGGY(self.MAC_NEW, self.identity_A) + + # This assertion shows the BUG: it's treating a dead connection as a duplicate! + self.assertTrue( + is_duplicate, + "BUG VERIFICATION: Buggy implementation should (incorrectly) reject reconnection" + ) + + def test_fixed_implementation_allows_reconnection_during_grace_period(self): + """ + Test the FIX: should allow reconnection when old connection is dead. + + This test shows the EXPECTED behavior after fixing the bug. + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A disconnects (with grace period - entry stays but pending detach is set) + self.harness.simulate_disconnect(self.MAC_OLD, with_grace_period=True) + + # Verify: MAC_OLD is no longer connected + self.assertNotIn(self.MAC_OLD, self.harness.driver.connected_peers) + + # Verify: pending detach is scheduled + identity_hash = self.harness._compute_identity_hash(self.identity_A) + self.assertIn(identity_hash, self.harness._pending_detach) + + # Step 3: Device A reconnects with MAC_NEW + # FIXED IMPLEMENTATION: should return False (allow connection) + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_NEW, self.identity_A) + + self.assertFalse( + is_duplicate, + "FIX VERIFICATION: Fixed implementation should allow reconnection during grace period" + ) + + def test_fixed_implementation_still_rejects_true_duplicates(self): + """ + Test that the fix doesn't break legitimate duplicate rejection. + + When the old connection IS still alive, it should still be rejected. + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A tries to connect AGAIN with MAC_NEW (old connection still alive) + # This is a TRUE duplicate - should be rejected + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_NEW, self.identity_A) + + self.assertTrue( + is_duplicate, + "Should still reject true duplicates when old connection is alive" + ) + + def test_fixed_implementation_allows_same_mac_reconnection(self): + """ + Test that reconnection from the same MAC is allowed. + + When the same MAC reconnects, it should never be considered a duplicate. + """ + # Step 1: Device A connects with MAC_OLD + self.harness.simulate_connect(self.MAC_OLD, self.identity_A) + + # Step 2: Device A disconnects + self.harness.simulate_disconnect(self.MAC_OLD, with_grace_period=True) + + # Step 3: Device A reconnects with SAME MAC + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_OLD, self.identity_A) + + self.assertFalse( + is_duplicate, + "Should allow reconnection from same MAC" + ) + + def test_fixed_implementation_allows_when_no_pending_detach_and_not_connected(self): + """ + Test edge case: old entry exists but no pending detach and not connected. + + This can happen if pending detach was cleaned up but identity_to_address wasn't. + """ + # Manually set up a stale state (should not happen in practice, but defensive) + identity_hash = self.harness._compute_identity_hash(self.identity_A) + self.harness.identity_to_address[identity_hash] = self.MAC_OLD + # Note: NOT in peers, NOT in connected_peers, NOT in _pending_detach + + # Step: Device A connects with MAC_NEW + is_duplicate = self.harness._check_duplicate_identity_FIXED(self.MAC_NEW, self.identity_A) + + self.assertFalse( + is_duplicate, + "Should allow connection when old entry is stale (not connected, no pending detach)" + ) + + +class TestRealWorldScenario(unittest.TestCase): + """ + Test the real-world scenario observed in logs. + + From logs: Identity c9d09faa kept getting rejected from multiple different MACs + because the original MAC entry was never cleaned up. + """ + + def setUp(self): + self.harness = BLEInterfaceTestHarness() + # Use realistic 16-byte identity + self.identity_c9d09f = bytes.fromhex("c9d09faa3ef0220447e0111b626ce641") + + # Sequence of MACs observed in logs (Android MAC rotation) + self.mac_sequence = [ + "47:9C:55:2F:85:5D", # First successful connection + "4A:FB:BF:EB:C3:11", # Rejected as duplicate + "46:EF:48:18:13:F2", # Rejected as duplicate + "76:E4:EE:9B:6D:10", # Rejected as duplicate + "5C:D7:DD:D5:DD:7D", # Rejected as duplicate + "58:58:43:B9:EB:CD", # Rejected as duplicate + ] + + def test_real_world_mac_rotation_with_buggy_impl(self): + """ + Demonstrate the real bug observed in logs. + + All subsequent connection attempts are incorrectly rejected. + """ + # First connection succeeds + first_mac = self.mac_sequence[0] + self.harness.simulate_connect(first_mac, self.identity_c9d09f) + + # Connection drops (with grace period) + self.harness.simulate_disconnect(first_mac, with_grace_period=True) + + # All subsequent attempts are incorrectly rejected (BUG) + for mac in self.mac_sequence[1:]: + is_duplicate = self.harness._check_duplicate_identity_BUGGY(mac, self.identity_c9d09f) + self.assertTrue( + is_duplicate, + f"BUG: MAC {mac} incorrectly rejected as duplicate" + ) + + def test_real_world_mac_rotation_with_fixed_impl(self): + """ + Verify the fix allows reconnection after disconnect. + + After the fix, subsequent connection attempts should succeed. + """ + # First connection succeeds + first_mac = self.mac_sequence[0] + self.harness.simulate_connect(first_mac, self.identity_c9d09f) + + # Connection drops (with grace period) + self.harness.simulate_disconnect(first_mac, with_grace_period=True) + + # All subsequent attempts should be allowed (FIX) + for mac in self.mac_sequence[1:]: + is_duplicate = self.harness._check_duplicate_identity_FIXED(mac, self.identity_c9d09f) + self.assertFalse( + is_duplicate, + f"FIX: MAC {mac} should be allowed to reconnect" + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_identity_cache.py b/tests/test_identity_cache.py index d52968d..c0cc77c 100644 --- a/tests/test_identity_cache.py +++ b/tests/test_identity_cache.py @@ -90,6 +90,8 @@ def ble_interface(mock_rns, mock_driver): interface._identity_cache_ttl = 60 interface._pending_detach = {} # identity_hash -> timestamp interface._pending_detach_grace_period = 2.0 # seconds + interface._last_real_data = {} # Track last real data activity for zombie detection + interface._zombie_timeout = 30.0 # Zombie connection timeout # Fragmentation interface.fragmenters = {} diff --git a/tests/test_interface_cleanup.py b/tests/test_interface_cleanup.py index dfb3a3b..91633b6 100644 --- a/tests/test_interface_cleanup.py +++ b/tests/test_interface_cleanup.py @@ -99,6 +99,10 @@ def ble_interface(mock_rns, mock_driver): interface._pending_detach = {} interface._pending_detach_grace_period = 2.0 + # Zombie detection + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + # Fragmentation interface.fragmenters = {} interface.reassemblers = {} diff --git a/tests/test_mac_rotation_blacklist_bug.py b/tests/test_mac_rotation_blacklist_bug.py index b03107e..988ac02 100644 --- a/tests/test_mac_rotation_blacklist_bug.py +++ b/tests/test_mac_rotation_blacklist_bug.py @@ -53,10 +53,15 @@ class TestMacRotationBlacklistBug: interface.connection_retry_backoff = 60 interface.max_connection_failures = 3 interface.discovered_peers = {} + interface.peers = {} # Track connected peers + interface._pending_detach = {} # Track pending interface detachments + interface._last_real_data = {} # Track last real data activity for zombie detection + interface._zombie_timeout = 30.0 # Zombie connection timeout - # Mock driver (needed for _record_connection_failure) + # Mock driver (needed for _record_connection_failure and connection checks) interface.driver = Mock() interface.driver._remove_bluez_device = None # hasattr will return False + interface.driver.connected_peers = [] # Track driver-level connected peers # Import the actual methods we want to test from ble_reticulum.BLEInterface import BLEInterface as RealInterface @@ -73,19 +78,22 @@ class TestMacRotationBlacklistBug: """Verify that _check_duplicate_identity returns True for duplicates.""" interface = mock_ble_interface - # Setup: identity already connected at MAC_OLD + # Setup: identity already connected at MAC_OLD (with active connection) identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' mac_old = "AA:BB:CC:DD:EE:01" mac_new = "AA:BB:CC:DD:EE:02" identity_hash = interface._compute_identity_hash(identity) interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} # Action: MAC_NEW tries to connect with same identity is_duplicate = interface._check_duplicate_identity(mac_new, identity) - # Assert: Should detect as duplicate - assert is_duplicate is True, "Should detect duplicate identity" + # Assert: Should detect as duplicate (old connection is still alive) + assert is_duplicate is True, "Should detect duplicate identity when old connection is still active" def test_blacklist_mechanism_triggers_after_3_failures(self, mock_ble_interface): """ @@ -376,10 +384,15 @@ class TestPeripheralModeDuplicateRejection: interface = MagicMock(spec=BLEInterface) interface.identity_to_address = {} interface.address_to_identity = {} + interface.peers = {} # Track connected peers + interface._pending_detach = {} # Track pending interface detachments + interface._last_real_data = {} # Track last real data activity for zombie detection + interface._zombie_timeout = 30.0 # Zombie connection timeout - # Mock driver for disconnect calls + # Mock driver for disconnect calls and connection checks interface.driver = MagicMock() interface.driver.disconnect = MagicMock() + interface.driver.connected_peers = [] # Track driver-level connected peers # Configure __str__ for logging (MagicMock handles special methods) interface.__str__ = MagicMock(return_value="BLEInterface[Test]") @@ -403,13 +416,16 @@ class TestPeripheralModeDuplicateRejection: """ interface = mock_ble_interface - # Setup: identity already connected at MAC_OLD + # Setup: identity already connected at MAC_OLD (with active connection) identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' mac_old = "AA:BB:CC:DD:EE:01" mac_new = "AA:BB:CC:DD:EE:02" identity_hash = interface._compute_identity_hash(identity) interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} # Check: duplicate should be detected for MAC_NEW is_duplicate = interface._check_duplicate_identity(mac_new, identity) @@ -474,13 +490,16 @@ class TestPeripheralModeDuplicateRejection: """ interface = mock_ble_interface - # Setup: identity already connected at MAC_OLD + # Setup: identity already connected at MAC_OLD (with active connection) identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' mac_old = "AA:BB:CC:DD:EE:01" mac_new = "AA:BB:CC:DD:EE:02" identity_hash = interface._compute_identity_hash(identity) interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} # Mock the driver to track disconnect calls disconnect_called = [] @@ -511,13 +530,16 @@ class TestPeripheralModeDuplicateRejection: """ interface = mock_ble_interface - # Setup: identity already connected at MAC_OLD + # Setup: identity already connected at MAC_OLD (with active connection) identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' mac_old = "AA:BB:CC:DD:EE:01" mac_new = "AA:BB:CC:DD:EE:02" identity_hash = interface._compute_identity_hash(identity) interface.identity_to_address[identity_hash] = mac_old + # Simulate active connection - the old MAC is still connected + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} # Mock the driver to raise exception on disconnect def raise_on_disconnect(addr): diff --git a/tests/test_v2_2_identity_handshake.py b/tests/test_v2_2_identity_handshake.py index 56e9f76..e83a272 100644 --- a/tests/test_v2_2_identity_handshake.py +++ b/tests/test_v2_2_identity_handshake.py @@ -454,5 +454,144 @@ class TestReassemblerRaceCondition: assert identity_hash in interface.spawned_interfaces, "Central mode: interface should exist" +class TestDuplicateIdentityHandshakeRaceCondition: + """ + Test handling of duplicate identity handshake data. + + When Kotlin provides the identity via callback (from reading the identity characteristic), + the address_to_identity mapping gets set BEFORE the 16-byte handshake data arrives + through _data_received_callback. The fix ensures this duplicate handshake data is + consumed and not passed to the reassembler where it would cause "Invalid fragment type" errors. + """ + + def test_duplicate_handshake_matching_identity_consumed(self): + """Test that duplicate 16-byte handshake matching known identity is consumed.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + central_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Simulate identity being set via Kotlin callback (before handshake data arrives) + interface.address_to_identity[central_address] = central_identity + identity_hash = interface._compute_identity_hash(central_identity) + interface.identity_to_address[identity_hash] = central_address + + # Now the handshake data arrives through data channel + # This should be consumed (return True) and not passed to reassembler + result = interface._handle_identity_handshake(central_address, central_identity) + + assert result is True, "Duplicate handshake matching identity should be consumed" + # Identity should still be the same + assert interface.address_to_identity[central_address] == central_identity + + def test_duplicate_handshake_different_identity_still_consumed(self): + """Test that 16-byte data different from known identity is still consumed.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + known_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + different_16_bytes = b'\xff\xfe\xfd\xfc\xfb\xfa\xf9\xf8\xf7\xf6\xf5\xf4\xf3\xf2\xf1\xf0' + + # Simulate identity being set via Kotlin callback + interface.address_to_identity[central_address] = known_identity + identity_hash = interface._compute_identity_hash(known_identity) + interface.identity_to_address[identity_hash] = central_address + + # Different 16-byte data arrives - should still be consumed to prevent reassembler errors + result = interface._handle_identity_handshake(central_address, different_16_bytes) + + assert result is True, "Different 16-byte data should be consumed to prevent reassembler errors" + # Original identity should be preserved + assert interface.address_to_identity[central_address] == known_identity + + def test_non_16_byte_data_not_consumed_as_handshake(self): + """Test that non-16-byte data is not consumed as handshake even with known identity.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + known_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Set identity via callback + interface.address_to_identity[central_address] = known_identity + + # Non-16-byte data should not be consumed as handshake + result_15 = interface._handle_identity_handshake(central_address, b'\x00' * 15) + result_17 = interface._handle_identity_handshake(central_address, b'\x00' * 17) + result_10 = interface._handle_identity_handshake(central_address, b'\x00' * 10) + + assert result_15 is False, "15-byte data should not be consumed as handshake" + assert result_17 is False, "17-byte data should not be consumed as handshake" + assert result_10 is False, "10-byte data should not be consumed as handshake" + + def test_16_byte_data_without_known_identity_processed_as_handshake(self): + """Test that 16-byte data without known identity is processed as new handshake.""" + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + central_address = "11:22:33:44:55:66" + central_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # No identity set - this should be processed as a new handshake + assert central_address not in interface.address_to_identity + + result = interface._handle_identity_handshake(central_address, central_identity) + + assert result is True, "16-byte data without known identity should be processed as handshake" + assert interface.address_to_identity[central_address] == central_identity + + def test_handshake_cleans_up_pending_identity_connection(self): + """Test that successful handshake cleans up _pending_identity_connections.""" + from unittest.mock import Mock + + driver = MockBLEDriver() + owner = MockOwner() + + config = {"name": "Test", "enable_peripheral": True} + interface = BLEInterface(owner, config) + interface.driver = driver + + # Mock get_peer_mtu which is needed during handshake processing + driver.get_peer_mtu = Mock(return_value=185) + + central_address = "11:22:33:44:55:66" + central_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Simulate that this address was waiting for identity handshake + interface._pending_identity_connections[central_address] = time.time() + + # Verify pending entry exists + assert central_address in interface._pending_identity_connections + + # Process handshake + result = interface._handle_identity_handshake(central_address, central_identity) + + # Verify handshake succeeded + assert result is True, "Handshake should succeed" + assert interface.address_to_identity[central_address] == central_identity + + # Verify pending identity connection was cleaned up + assert central_address not in interface._pending_identity_connections, \ + "Pending identity connection should be removed after successful handshake" + + if __name__ == "__main__": pytest.main([__file__, "-v"]) diff --git a/tests/test_zombie_connection_detection.py b/tests/test_zombie_connection_detection.py new file mode 100644 index 0000000..f5353df --- /dev/null +++ b/tests/test_zombie_connection_detection.py @@ -0,0 +1,700 @@ +""" +Tests for zombie connection detection. + +Zombie connections occur when the BLE link degrades - 1-byte keepalives still work +but larger data packets fail. Both sides think the connection is "alive" based on +keepalives, but data can't flow. This causes a deadlock where new connections are +rejected as "duplicates" even though the existing connection is non-functional. + +The zombie detection feature tracks when real data (not keepalives) was last +received, and allows new connections if the existing connection has been a +"zombie" (only keepalives) for longer than the timeout. +""" + +import pytest +import time +import threading +from unittest.mock import Mock, MagicMock, patch + + +class TestZombieConnectionDetection: + """Test zombie connection detection in _check_duplicate_identity.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface with real method bindings.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.identity_to_address = {} + interface.address_to_identity = {} + interface.address_to_interface = {} # For _cleanup_stale_address + interface.pending_mtu = {} # For _cleanup_stale_address + interface.fragmenters = {} # For _cleanup_stale_address + interface.reassemblers = {} # For _cleanup_stale_address + interface.frag_lock = threading.RLock() # For _cleanup_stale_address + interface.peers = {} + interface._pending_detach = {} + interface._last_real_data = {} + interface._zombie_timeout = 30.0 # 30 seconds default + interface.driver = Mock() + interface.driver.connected_peers = [] + interface.driver.disconnect = Mock() + + # Import the actual methods we want to test + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._check_duplicate_identity = lambda addr, identity: RealInterface._check_duplicate_identity(interface, addr, identity) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._cleanup_stale_address = lambda ih, addr: RealInterface._cleanup_stale_address(interface, ih, addr) + interface._get_fragmenter_key = lambda identity, addr: RealInterface._get_fragmenter_key(interface, identity, addr) + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + return interface + + def test_healthy_connection_rejects_duplicate(self, mock_ble_interface): + """Test that a healthy connection (recent real data) rejects duplicates.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Connection is healthy - real data received recently + interface._last_real_data[identity_hash] = time.time() - 10 # 10 seconds ago + + # Should reject as duplicate + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert is_duplicate, "Should reject duplicate when existing connection is healthy" + + def test_zombie_connection_allows_reconnection(self, mock_ble_interface): + """Test that a zombie connection (no real data for > timeout) allows reconnection.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Connection is zombie - no real data for > timeout + interface._last_real_data[identity_hash] = time.time() - 60 # 60 seconds ago (> 30s timeout) + + # Should allow reconnection + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection when existing connection is a zombie" + + def test_zombie_disconnects_old_connection(self, mock_ble_interface): + """Test that detecting a zombie triggers disconnect of the old connection.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.address_to_identity[mac_old] = identity + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Connection is zombie + interface._last_real_data[identity_hash] = time.time() - 60 + + # Check for duplicate + interface._check_duplicate_identity(mac_new, identity) + + # Should have called disconnect on old address + interface.driver.disconnect.assert_called_once_with(mac_old) + + def test_connection_at_zombie_threshold(self, mock_ble_interface): + """Test behavior at the zombie timeout threshold.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # Just under the threshold - should still reject (needs to be > timeout) + # Use 29.5s to avoid timing flakiness (time passes between setting and checking) + interface._last_real_data[identity_hash] = time.time() - 29.5 # just under 30s + + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + # At under 30s, it should NOT be considered a zombie (needs to be > 30s) + assert is_duplicate, "Should reject duplicate when under threshold" + + def test_no_last_data_timestamp_does_not_zombie(self, mock_ble_interface): + """Test that missing last_real_data timestamp doesn't trigger zombie detection.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # No last_real_data entry - should not trigger zombie detection + # (This handles newly connected peers before any data is received) + + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + # Without timestamp, we can't determine zombie state - reject as duplicate + assert is_duplicate, "Should reject duplicate when no timestamp exists" + + def test_custom_zombie_timeout(self, mock_ble_interface): + """Test that custom zombie timeout is respected.""" + interface = mock_ble_interface + interface._zombie_timeout = 10.0 # Custom 10 second timeout + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + + # 15 seconds ago - should be zombie with 10s timeout + interface._last_real_data[identity_hash] = time.time() - 15 + + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection with custom zombie timeout" + + +class TestZombieTrackingOnDataReceive: + """Test that real data updates the _last_real_data timestamp.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface with real _handle_ble_data.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.address_to_identity = {} + interface.identity_to_address = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + interface.fragmenters = {} + interface.reassemblers = {} + interface.frag_lock = threading.RLock() + interface.driver = Mock() + interface.driver.request_identity_resync = Mock() + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._handle_ble_data = lambda addr, data: RealInterface._handle_ble_data(interface, addr, data) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._get_fragmenter_key = lambda identity, addr: RealInterface._get_fragmenter_key(interface, identity, addr) + + return interface + + def test_real_data_updates_timestamp(self, mock_ble_interface): + """Test that receiving real data (not keepalive) updates the timestamp.""" + interface = mock_ble_interface + + address = "AA:BB:CC:DD:EE:01" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + + interface.address_to_identity[address] = identity + interface.identity_to_address[identity_hash] = address + + # Setup reassembler + frag_key = interface._get_fragmenter_key(identity, address) + mock_reassembler = Mock() + mock_reassembler.add_fragment = Mock(return_value=None) + interface.reassemblers[frag_key] = mock_reassembler + + # Clear any existing timestamp + interface._last_real_data.clear() + + # Receive real data (10 bytes - not a keepalive) + real_data = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a' + before_time = time.time() + interface._handle_ble_data(address, real_data) + after_time = time.time() + + # Timestamp should be updated + assert identity_hash in interface._last_real_data, "Should track real data timestamp" + timestamp = interface._last_real_data[identity_hash] + assert before_time <= timestamp <= after_time, "Timestamp should be within receive window" + + def test_keepalive_does_not_update_timestamp(self, mock_ble_interface): + """Test that receiving keepalive (1 byte 0x00) does NOT update timestamp.""" + interface = mock_ble_interface + + address = "AA:BB:CC:DD:EE:01" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + + interface.address_to_identity[address] = identity + interface.identity_to_address[identity_hash] = address + + # Set an old timestamp + old_timestamp = time.time() - 100 + interface._last_real_data[identity_hash] = old_timestamp + + # Receive keepalive (1 byte 0x00) + keepalive = bytes([0x00]) + interface._handle_ble_data(address, keepalive) + + # Timestamp should NOT be updated + assert interface._last_real_data[identity_hash] == old_timestamp, \ + "Keepalive should NOT update timestamp" + + +class TestZombieTrackingOnConnect: + """Test that connection establishment initializes _last_real_data.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface for spawn testing.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.spawned_interfaces = {} + interface.address_to_interface = {} + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + interface.HW_MTU = 500 + interface.bitrate = 700000 + interface.mode = 0 # Interface.MODE_FULL + interface.ifac_size = 0 + interface.ifac_netname = None + interface.ifac_netkey = None + interface.announce_cap = 1.0 + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + return interface + + def test_spawn_initializes_timestamp(self, mock_ble_interface): + """Test that spawning a peer interface initializes the timestamp.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Compute identity_hash the same way BLEInterface does + identity_hash = identity.hex()[:16] + + # Clear timestamp + interface._last_real_data.clear() + + # Simulate what _spawn_peer_interface does - it initializes the timestamp + before_time = time.time() + interface._last_real_data[identity_hash] = time.time() + after_time = time.time() + + # Verify the timestamp was set + assert identity_hash in interface._last_real_data, \ + "Spawning interface should initialize timestamp" + timestamp = interface._last_real_data[identity_hash] + assert before_time <= timestamp <= after_time, \ + "Timestamp should be within expected range" + + +class TestPendingDetachAllowsReconnection: + """Test that pending detach (Check 1) allows reconnection using real BLEInterface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + # Import test utilities + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_pending_detach_allows_reconnection_from_new_mac(self, ble_interface): + """Test that pending detach allows reconnection from new MAC (Check 1 path).""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up state: old connection exists in identity_to_address but has pending detach + interface.identity_to_address[identity_hash] = mac_old + interface._pending_detach[identity_hash] = time.time() # Pending detach exists + + # Note: NOT in connected_peers or peers (connection is dead) + + # Should allow reconnection because pending detach exists (Check 1) + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection when pending detach exists" + + +class TestNotConnectedAllowsReconnection: + """Test that not-connected check (Check 2) allows reconnection using real BLEInterface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_not_connected_allows_reconnection(self, ble_interface): + """Test that stale entry without connection allows reconnection (Check 2 path).""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up state: old connection exists in identity_to_address + # but NOT in connected_peers, NOT in peers, and NO pending detach + interface.identity_to_address[identity_hash] = mac_old + # Note: _pending_detach is empty, driver.connected_peers is empty, peers is empty + + # Should allow reconnection because old address is not connected (Check 2) + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection when old address is not connected" + + def test_in_peers_but_not_connected_peers_still_rejects(self, ble_interface): + """Test that being in peers dict still rejects (connection considered alive).""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up state: NOT in connected_peers but IS in peers + # This simulates a state where the driver doesn't know about the peer + # but our internal tracking does - we should still reject + interface.identity_to_address[identity_hash] = mac_old + interface.peers[mac_old] = {"connected": True} + # Note: driver.connected_peers is empty + + # Should reject because old address is in peers dict + # The logic checks: if existing_address not in self.driver.connected_peers + # AND existing_address not in self.peers + # Here the second condition fails, so it falls through to zombie check + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + # Since no timestamp exists and no pending detach, it should reject + assert is_duplicate, "Should reject when old address is still in peers" + + +class TestZombieDisconnectExceptionHandling: + """Test exception handling when zombie disconnect fails using real BLEInterface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_zombie_disconnect_exception_still_allows_reconnection(self, ble_interface): + """Test that exception during zombie disconnect doesn't prevent reconnection.""" + interface = ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + + # Set up zombie state: old connection in maps, in connected_peers, timestamp is old + interface.identity_to_address[identity_hash] = mac_old + interface.address_to_identity[mac_old] = identity + interface.driver.connected_peers.append(mac_old) + interface.peers[mac_old] = {"connected": True} + interface._last_real_data[identity_hash] = time.time() - 60 # 60 seconds ago (zombie) + + # Make disconnect raise an exception by patching the driver method + original_disconnect = interface.driver.disconnect + def raising_disconnect(addr): + raise Exception("BLE disconnect failed") + interface.driver.disconnect = raising_disconnect + + # Should still allow reconnection despite exception + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + assert not is_duplicate, "Should allow reconnection even when zombie disconnect fails" + + # Restore original method + interface.driver.disconnect = original_disconnect + + +class TestZombieCleanupOnDetach: + """Test that _last_real_data is cleaned up when interface is detached.""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a mock BLEInterface for detach testing.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.spawned_interfaces = {} + interface.identity_to_address = {} + interface.address_to_identity = {} + interface._pending_detach = {} + interface._pending_detach_grace_period = 2.0 + interface._last_real_data = {} + interface._zombie_timeout = 30.0 + interface.fragmenters = {} + interface.reassemblers = {} + interface.frag_lock = threading.RLock() + interface.__str__ = Mock(return_value="BLEInterface[Test]") + + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._process_pending_detaches = lambda: RealInterface._process_pending_detaches(interface) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._get_fragmenter_key = lambda identity, addr: RealInterface._get_fragmenter_key(interface, identity, addr) + + return interface + + def test_detach_cleans_up_timestamp(self, mock_ble_interface): + """Test that detaching an interface cleans up the timestamp.""" + interface = mock_ble_interface + + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + address = "AA:BB:CC:DD:EE:01" + + # Create mock peer interface + mock_peer_if = Mock() + mock_peer_if.peer_identity = identity + mock_peer_if.detach = Mock() + + interface.spawned_interfaces[identity_hash] = mock_peer_if + interface.identity_to_address[identity_hash] = address + interface._last_real_data[identity_hash] = time.time() - 100 + + # Schedule detach in the past + interface._pending_detach[identity_hash] = time.time() - 10 # 10 seconds ago + + # Setup fragmenter key + frag_key = interface._get_fragmenter_key(identity, "") + interface.fragmenters[frag_key] = Mock() + interface.reassemblers[frag_key] = Mock() + + # Process detaches + interface._process_pending_detaches() + + # Timestamp should be cleaned up + assert identity_hash not in interface._last_real_data, \ + "Detaching interface should clean up timestamp" + + +class TestIdentityHandshakeCoverage: + """Tests for _handle_identity_handshake to achieve full PR coverage.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + # Mock get_peer_mtu needed for handshake + driver.get_peer_mtu = Mock(return_value=185) + + return interface + + def test_non_16_byte_data_returns_false(self, ble_interface): + """Test that non-16-byte data returns False (not a handshake).""" + interface = ble_interface + address = "11:22:33:44:55:66" + + # 15 bytes - too short + result = interface._handle_identity_handshake(address, b'\x00' * 15) + assert result is False, "15-byte data should return False" + + # 17 bytes - too long + result = interface._handle_identity_handshake(address, b'\x00' * 17) + assert result is False, "17-byte data should return False" + + def test_duplicate_handshake_matching_identity_consumed(self, ble_interface): + """Test that duplicate 16-byte handshake matching known identity is consumed.""" + interface = ble_interface + address = "11:22:33:44:55:66" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Pre-set identity (simulating Kotlin callback) + interface.address_to_identity[address] = identity + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = address + + # Same 16 bytes arrives - should be consumed + result = interface._handle_identity_handshake(address, identity) + assert result is True, "Duplicate handshake should be consumed" + + def test_duplicate_handshake_different_data_still_consumed(self, ble_interface): + """Test that 16-byte data different from known identity is still consumed.""" + interface = ble_interface + address = "11:22:33:44:55:66" + known_identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + different_data = b'\xff\xfe\xfd\xfc\xfb\xfa\xf9\xf8\xf7\xf6\xf5\xf4\xf3\xf2\xf1\xf0' + + # Pre-set identity + interface.address_to_identity[address] = known_identity + identity_hash = interface._compute_identity_hash(known_identity) + interface.identity_to_address[identity_hash] = address + + # Different 16 bytes arrives - should still be consumed + result = interface._handle_identity_handshake(address, different_data) + assert result is True, "Different 16-byte data should be consumed" + + def test_new_handshake_cleans_pending_identity(self, ble_interface): + """Test that successful handshake cleans up _pending_identity_connections.""" + interface = ble_interface + address = "11:22:33:44:55:66" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + + # Set pending identity connection + interface._pending_identity_connections[address] = time.time() + + # Process handshake + result = interface._handle_identity_handshake(address, identity) + + assert result is True, "Handshake should succeed" + assert address not in interface._pending_identity_connections, \ + "Pending identity should be cleaned up" + + +class TestSpawnPeerInterfaceZombieTracking: + """Test zombie tracking initialization in _spawn_peer_interface.""" + + @pytest.fixture + def ble_interface(self): + """Create a real BLEInterface for testing.""" + import sys + import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + + from tests.mock_ble_driver import MockBLEDriver + from ble_reticulum.BLEInterface import BLEInterface + + driver = MockBLEDriver(local_address="AA:BB:CC:DD:EE:FF") + + class MockOwner: + def inbound(self, data, interface): + pass + + config = {"name": "TestInterface", "enable_peripheral": True} + interface = BLEInterface(MockOwner(), config) + interface.driver = driver + + return interface + + def test_spawn_initializes_zombie_tracking(self, ble_interface): + """Test that spawning a peer interface initializes zombie tracking.""" + interface = ble_interface + address = "11:22:33:44:55:66" + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + identity_hash = interface._compute_identity_hash(identity) + + # Ensure no timestamp before spawn + assert identity_hash not in interface._last_real_data + + # Spawn peer interface + before_time = time.time() + interface._spawn_peer_interface( + address=address, + name="Test-Peer", + peer_identity=identity, + mtu=185 + ) + after_time = time.time() + + # Verify timestamp was initialized + assert identity_hash in interface._last_real_data, \ + "Spawning should initialize zombie tracking" + timestamp = interface._last_real_data[identity_hash] + assert before_time <= timestamp <= after_time, \ + "Timestamp should be within spawn window"