diff --git a/src/ble_reticulum/BLEInterface.py b/src/ble_reticulum/BLEInterface.py index ca9e576..781343d 100644 --- a/src/ble_reticulum/BLEInterface.py +++ b/src/ble_reticulum/BLEInterface.py @@ -1134,6 +1134,20 @@ class BLEInterface(Interface): central_identity = bytes(data) identity_hash = self._compute_identity_hash(central_identity) + # Check for duplicate identity (same identity already connected at different MAC) + # This prevents duplicate connections during MAC rotation overlap + if self._check_duplicate_identity(address, central_identity): + RNS.log( + f"{self} duplicate identity rejected for {address} in peripheral mode (MAC rotation)", + RNS.LOG_WARNING + ) + # Disconnect this connection - it's a duplicate + try: + self.driver.disconnect(address) + except Exception as e: + RNS.log(f"{self} failed to disconnect duplicate {address}: {e}", RNS.LOG_DEBUG) + return True # Consumed the handshake, rejected connection + self.address_to_identity[address] = central_identity self.identity_to_address[identity_hash] = address diff --git a/tests/test_mac_rotation_blacklist_bug.py b/tests/test_mac_rotation_blacklist_bug.py index e6376a1..7afee19 100644 --- a/tests/test_mac_rotation_blacklist_bug.py +++ b/tests/test_mac_rotation_blacklist_bug.py @@ -358,5 +358,98 @@ class TestErrorCallbackBlacklistBehavior: assert re.search(blacklist_regex, fixed_msg) is None, "Fixed message should not trigger blacklist" +class TestPeripheralModeDuplicateRejection: + """Test duplicate identity rejection in peripheral mode (_handle_identity_handshake).""" + + @pytest.fixture + def mock_ble_interface(self): + """Create a minimal mock of BLEInterface for peripheral mode testing.""" + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + interface = Mock(spec=BLEInterface) + interface.identity_to_address = {} + interface.address_to_identity = {} + + # Import the actual methods we want to test + from ble_reticulum.BLEInterface import BLEInterface as RealInterface + + interface._check_duplicate_identity = lambda addr, identity: RealInterface._check_duplicate_identity(interface, addr, identity) + interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + + return interface + + def test_peripheral_mode_rejects_duplicate_identity(self, mock_ble_interface): + """ + Test that _handle_identity_handshake rejects duplicate identity. + + In peripheral mode, when a central sends an identity handshake with an + identity that's already connected at a different MAC, the connection + should be rejected. + """ + interface = mock_ble_interface + + # Setup: identity already connected at MAC_OLD + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + + # Check: duplicate should be detected for MAC_NEW + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + + assert is_duplicate is True, ( + "Peripheral mode should detect duplicate identity when same identity " + "is already connected at different MAC" + ) + + def test_peripheral_mode_allows_new_identity(self, mock_ble_interface): + """ + Test that _handle_identity_handshake allows new identity. + + When a central sends an identity that's not already connected, + the connection should be allowed. + """ + interface = mock_ble_interface + + # Setup: no existing connections + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_new = "AA:BB:CC:DD:EE:02" + + # Check: should not be detected as duplicate + is_duplicate = interface._check_duplicate_identity(mac_new, identity) + + assert is_duplicate is False, ( + "Peripheral mode should allow new identity" + ) + + def test_peripheral_mode_allows_same_mac_identity_refresh(self, mock_ble_interface): + """ + Test that _handle_identity_handshake allows identity refresh from same MAC. + + When a central reconnects from the same MAC with the same identity, + it should be allowed (not considered duplicate). + """ + interface = mock_ble_interface + + # Setup: identity already connected at same MAC + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac = "AA:BB:CC:DD:EE:01" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac + + # Check: same MAC should not be duplicate + is_duplicate = interface._check_duplicate_identity(mac, identity) + + assert is_duplicate is False, ( + "Peripheral mode should allow identity refresh from same MAC" + ) + + if __name__ == "__main__": pytest.main([__file__, "-v"])