diff --git a/tests/test_mac_rotation_blacklist_bug.py b/tests/test_mac_rotation_blacklist_bug.py index 7afee19..7798129 100644 --- a/tests/test_mac_rotation_blacklist_bug.py +++ b/tests/test_mac_rotation_blacklist_bug.py @@ -369,15 +369,25 @@ class TestPeripheralModeDuplicateRejection: except ImportError: pytest.skip("BLEInterface not available") - interface = Mock(spec=BLEInterface) + from unittest.mock import MagicMock + + interface = MagicMock(spec=BLEInterface) interface.identity_to_address = {} interface.address_to_identity = {} + # Mock driver for disconnect calls + interface.driver = MagicMock() + interface.driver.disconnect = MagicMock() + + # Configure __str__ for logging (MagicMock handles special methods) + interface.__str__ = MagicMock(return_value="BLEInterface[Test]") + # Import the actual methods we want to test from ble_reticulum.BLEInterface import BLEInterface as RealInterface interface._check_duplicate_identity = lambda addr, identity: RealInterface._check_duplicate_identity(interface, addr, identity) interface._compute_identity_hash = lambda identity: RealInterface._compute_identity_hash(interface, identity) + interface._handle_identity_handshake = lambda addr, data: RealInterface._handle_identity_handshake(interface, addr, data) return interface @@ -450,6 +460,180 @@ class TestPeripheralModeDuplicateRejection: "Peripheral mode should allow identity refresh from same MAC" ) + def test_handle_identity_handshake_rejects_duplicate_and_disconnects(self, mock_ble_interface): + """ + Test that _handle_identity_handshake actually disconnects when duplicate detected. + + This tests the full code path in _handle_identity_handshake that: + 1. Calls _check_duplicate_identity + 2. Logs warning + 3. Calls driver.disconnect() + 4. Returns True (handshake consumed) + """ + interface = mock_ble_interface + + # Setup: identity already connected at MAC_OLD + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + + # Mock the driver to track disconnect calls + disconnect_called = [] + interface.driver.disconnect = lambda addr: disconnect_called.append(addr) + + # Call _handle_identity_handshake with duplicate identity + result = interface._handle_identity_handshake(mac_new, identity) + + # Should return True (handshake consumed/rejected) + assert result is True, "Should return True when duplicate rejected" + + # Should have called disconnect on the new MAC + assert mac_new in disconnect_called, ( + f"Should disconnect duplicate connection. Called: {disconnect_called}" + ) + + # Should NOT have added the new MAC to mappings + assert mac_new not in interface.address_to_identity, ( + "Should not add duplicate identity to address_to_identity" + ) + + def test_handle_identity_handshake_disconnect_exception_handled(self, mock_ble_interface): + """ + Test that _handle_identity_handshake handles disconnect exceptions gracefully. + + If driver.disconnect() raises an exception, it should be caught and logged, + but the handshake should still return True (rejected). + """ + interface = mock_ble_interface + + # Setup: identity already connected at MAC_OLD + identity = b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10' + mac_old = "AA:BB:CC:DD:EE:01" + mac_new = "AA:BB:CC:DD:EE:02" + + identity_hash = interface._compute_identity_hash(identity) + interface.identity_to_address[identity_hash] = mac_old + + # Mock the driver to raise exception on disconnect + def raise_on_disconnect(addr): + raise Exception("Disconnect failed") + interface.driver.disconnect = raise_on_disconnect + + # Call _handle_identity_handshake - should not raise + result = interface._handle_identity_handshake(mac_new, identity) + + # Should still return True (handshake consumed/rejected) + assert result is True, "Should return True even if disconnect fails" + + +class TestLinuxDriverDuplicateIdentityErrorHandling: + """Tests for linux_bluetooth_driver duplicate identity exception handling.""" + + def test_duplicate_identity_error_logs_warning_not_error(self): + """ + Test that duplicate identity exceptions are logged as WARNING, not ERROR. + + When a connection fails due to duplicate identity detection, we want to + log it as a warning (expected behavior during MAC rotation) not an error. + """ + from ble_reticulum.linux_bluetooth_driver import LinuxBluetoothDriver + + # Create driver with minimal mocking + driver = LinuxBluetoothDriver.__new__(LinuxBluetoothDriver) + driver._log_messages = [] + + def capture_log(msg, level="INFO"): + driver._log_messages.append((msg, level)) + + driver._log = capture_log + + # Test the error message detection logic directly + error_str = "Duplicate identity rejected for AA:BB:CC:DD:EE:FF" + is_duplicate = "Duplicate identity" in error_str + + assert is_duplicate is True, "Should detect duplicate identity in error message" + + # Log with appropriate level based on detection + if is_duplicate: + driver._log(f"Duplicate identity rejected: {error_str}", "WARNING") + else: + driver._log(f"Connection failed: {error_str}", "ERROR") + + # Check the log level + assert len(driver._log_messages) == 1 + msg, level = driver._log_messages[0] + assert level == "WARNING", f"Expected WARNING level, got {level}" + assert "Duplicate identity" in msg + + def test_normal_connection_error_logs_as_error(self): + """ + Test that normal connection errors are still logged as ERROR. + """ + from ble_reticulum.linux_bluetooth_driver import LinuxBluetoothDriver + + driver = LinuxBluetoothDriver.__new__(LinuxBluetoothDriver) + driver._log_messages = [] + + def capture_log(msg, level="INFO"): + driver._log_messages.append((msg, level)) + + driver._log = capture_log + + # Normal connection error (not duplicate identity) + error_str = "Connection timeout to AA:BB:CC:DD:EE:FF" + is_duplicate = "Duplicate identity" in error_str + + assert is_duplicate is False, "Should not detect duplicate identity in timeout error" + + # Log with appropriate level + if is_duplicate: + driver._log(f"Duplicate identity rejected: {error_str}", "WARNING") + else: + driver._log(f"Connection failed: {error_str}", "ERROR") + + # Check the log level + assert len(driver._log_messages) == 1 + msg, level = driver._log_messages[0] + assert level == "ERROR", f"Expected ERROR level, got {level}" + + def test_error_callback_uses_info_for_duplicate_identity(self): + """ + Test that on_error callback uses 'info' severity for duplicate identity. + + This prevents the blacklist from being triggered for expected MAC rotation + rejections. + """ + from ble_reticulum.linux_bluetooth_driver import LinuxBluetoothDriver + + driver = LinuxBluetoothDriver.__new__(LinuxBluetoothDriver) + error_callbacks = [] + + def capture_error(severity, message, exception): + error_callbacks.append((severity, message, exception)) + + driver.on_error = capture_error + + # Simulate duplicate identity error handling + error = Exception("Duplicate identity detected") + error_str = str(error) + is_duplicate = "Duplicate identity" in error_str + address = "AA:BB:CC:DD:EE:FF" + + if is_duplicate: + driver.on_error("info", f"Duplicate identity rejected for {address} (MAC rotation)", error) + else: + driver.on_error("error", f"Connection failed to {address}: {error}", error) + + # Check callback was called with 'info' severity + assert len(error_callbacks) == 1 + severity, msg, exc = error_callbacks[0] + assert severity == "info", f"Expected 'info' severity for duplicate identity, got '{severity}'" + assert "Duplicate identity rejected" in msg + assert "MAC rotation" in msg + if __name__ == "__main__": pytest.main([__file__, "-v"])