import os import sys import pytest REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) CPP_BUILD_DIR = os.path.join(REPO_ROOT, "migration", "protocol_core") sys.path.insert(0, CPP_BUILD_DIR) cpp = pytest.importorskip( "ble_protocol_core_cpp", reason=( "compiled pybind11 module missing; build with " "`python3 migration/protocol_core/setup.py build_ext --inplace`" ), ) def identity(base): return bytes((base + i) & 0xFF for i in range(16)) def snapshot(address, *, mtu=None): snap = cpp.ConnectionSnapshot() snap.current = cpp.ConnectionId(address) snap.local_role = cpp.LocalRole.Peripheral snap.negotiated_mtu = mtu return snap def action_types(result): return {action.type for action in result.actions} def test_import_module_succeeds(): assert cpp.BLEPeerSessionManager is not None assert cpp.ConnectionSnapshot is not None def test_non_16_byte_payload_passes_to_reassembler(): manager = cpp.BLEPeerSessionManager() result = manager.handle_identity_handshake(snapshot("AA:00"), b"abc", 1.0) assert result.decision == cpp.InputDecision.PassToReassembler assert result.consumed is False def test_new_16_byte_identity_sets_keys_and_consumes(): manager = cpp.BLEPeerSessionManager() peer_identity = identity(0x10) result = manager.handle_identity_handshake( snapshot("AA:01"), peer_identity, 2.0 ) assert result.decision == cpp.InputDecision.AcceptedNewIdentity assert result.consumed is True assert result.accepted is True assert result.identity_key == "1011121314151617" assert result.fragmenter_key == "101112131415161718191a1b1c1d1e1f" assert result.peer_identity == peer_identity def test_static_key_helpers(): peer_identity = bytes.fromhex("0102030405060708090a0b0c0d0e0f10") assert cpp.BLEPeerSessionManager.compute_identity_key(peer_identity) == "0102030405060708" assert ( cpp.BLEPeerSessionManager.compute_fragmenter_key(peer_identity) == "0102030405060708090a0b0c0d0e0f10" ) assert cpp.BLEPeerSessionManager.identity_from_payload(peer_identity) == peer_identity def test_identity_from_payload_rejects_non_16_byte_input(): with pytest.raises(Exception): cpp.BLEPeerSessionManager.identity_from_payload(b"abc") def test_known_identity_duplicate_same(): manager = cpp.BLEPeerSessionManager() peer_identity = identity(0x20) snap = snapshot("AA:02") snap.set_known_identity(peer_identity) result = manager.handle_identity_handshake(snap, peer_identity, 3.0) assert result.decision == cpp.InputDecision.ConsumedDuplicateSameIdentity assert result.consumed is True def test_known_identity_duplicate_mismatch(): manager = cpp.BLEPeerSessionManager() snap = snapshot("AA:03") snap.set_known_identity(identity(0x30)) result = manager.handle_identity_handshake(snap, identity(0x40), 4.0) assert result.decision == cpp.InputDecision.ConsumedDuplicateMismatchedIdentity assert result.consumed is True assert cpp.SessionActionType.Warn in action_types(result) def test_mtu_provided_is_preserved(): manager = cpp.BLEPeerSessionManager() peer_identity = identity(0x50) result = manager.handle_identity_handshake( snapshot("AA:04", mtu=185), peer_identity, 5.0 ) assert result.mtu == 185 view = manager.session_by_identity(peer_identity) assert view is not None assert view.mtu == 185 def test_mtu_missing_falls_back_to_23(): manager = cpp.BLEPeerSessionManager() result = manager.handle_identity_handshake( snapshot("AA:05"), identity(0x60), 6.0 ) assert result.mtu == 23 def test_duplicate_identity_active_elsewhere_requests_disconnect_current(): manager = cpp.BLEPeerSessionManager() peer_identity = identity(0x70) manager.handle_identity_handshake(snapshot("AA:06"), peer_identity, 7.0) duplicate = snapshot("AA:07") duplicate.existing_address_for_identity = "AA:06" duplicate.existing_address_connected = True duplicate.existing_address_in_peer_table = True result = manager.handle_identity_handshake(duplicate, peer_identity, 8.0) assert result.decision == cpp.InputDecision.RejectedDuplicateIdentity assert result.should_disconnect_current is True assert cpp.SessionActionType.DisconnectCurrentPeer in action_types(result) def test_pending_identity_timeout_can_be_marked_and_expired(): manager = cpp.BLEPeerSessionManager(30.0, 45.0) first = cpp.ConnectionId("AA:08", 8) second = cpp.ConnectionId("AA:09", 9) manager.mark_pending_identity(first, 100.0) manager.mark_pending_identity(second, 120.0) expired = manager.expired_pending_identities(131.0) assert len(expired) == 1 assert expired[0].address == "AA:08" assert expired[0].handle == 8 def test_peer_address_update_preserves_identity_and_fragmenter_key(): manager = cpp.BLEPeerSessionManager() peer_identity = identity(0x80) first_result = manager.handle_identity_handshake( snapshot("AA:0A"), peer_identity, 9.0 ) rotated = snapshot("AA:0B") rotated.existing_address_for_identity = "AA:0A" rotated.identity_has_pending_detach = True second_result = manager.handle_identity_handshake(rotated, peer_identity, 10.0) assert second_result.decision == cpp.InputDecision.AcceptedNewIdentity assert first_result.fragmenter_key == second_result.fragmenter_key assert manager.session_by_address("AA:0A") is None view = manager.session_by_address("AA:0B") assert view is not None assert view.identity == peer_identity assert view.identity_key == first_result.identity_key