test: comprehensive integration tests for identity cache
Add 14 tests that exercise the REAL BLEInterface code: - TestIdentityCacheOnDisconnect: verify caching on disconnect - TestIdentityCacheOnDataReceive: verify cache restoration on data - TestAddressChangedCallback: verify address migration - TestCacheTTL: verify 60-second TTL behavior - TestReassemblyCodePath: verify cache in reassembly path - TestEdgeCases: concurrent access, multiple disconnects Tests skip locally if RNS not installed but run in CI to provide actual line coverage on the identity cache changes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
d332f9a9bb
commit
b5518e3799
1 changed files with 387 additions and 269 deletions
|
|
@ -9,56 +9,86 @@ This prevents data loss when:
|
|||
1. App force-stop/restart causes temporary disconnect
|
||||
2. Driver maintains connection while Python clears identity mapping
|
||||
3. Data arrives from "unknown" peer that was actually still connected
|
||||
|
||||
These tests exercise the REAL BLEInterface code with mocked external dependencies.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import time
|
||||
import threading
|
||||
from unittest.mock import Mock, MagicMock, patch, AsyncMock
|
||||
from unittest.mock import Mock, MagicMock, patch, PropertyMock
|
||||
import sys
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
# Add src to path - conftest.py handles RNS mocking
|
||||
# Ensure src is in path
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
|
||||
|
||||
|
||||
def create_minimal_ble_interface():
|
||||
@pytest.fixture
|
||||
def mock_rns():
|
||||
"""Mock RNS module for testing."""
|
||||
with patch.dict('sys.modules', {'RNS': MagicMock()}):
|
||||
import sys
|
||||
rns = sys.modules['RNS']
|
||||
rns.LOG_CRITICAL = 0
|
||||
rns.LOG_ERROR = 1
|
||||
rns.LOG_WARNING = 2
|
||||
rns.LOG_NOTICE = 3
|
||||
rns.LOG_INFO = 4
|
||||
rns.LOG_VERBOSE = 5
|
||||
rns.LOG_DEBUG = 6
|
||||
rns.LOG_EXTREME = 7
|
||||
rns.log = Mock()
|
||||
rns.prettyhexrep = lambda x: x.hex() if isinstance(x, bytes) else str(x)
|
||||
yield rns
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_driver():
|
||||
"""Create a mock BLE driver."""
|
||||
driver = Mock()
|
||||
driver.on_device_connected = None
|
||||
driver.on_device_disconnected = None
|
||||
driver.on_data_received = None
|
||||
driver.on_identity_received = None
|
||||
driver.on_error = None
|
||||
driver.on_duplicate_identity_detected = None
|
||||
driver.on_address_changed = None
|
||||
driver.request_identity_resync = Mock(return_value=False)
|
||||
driver.get_cached_identity = Mock(return_value=None)
|
||||
return driver
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ble_interface(mock_rns, mock_driver):
|
||||
"""
|
||||
Create a minimal BLEInterface instance for testing identity cache.
|
||||
Create a BLEInterface instance with mocked dependencies.
|
||||
|
||||
Mocks external dependencies (driver, RNS) while keeping the real
|
||||
identity cache logic intact.
|
||||
This uses the REAL BLEInterface class but mocks external dependencies
|
||||
to allow testing the identity cache logic.
|
||||
"""
|
||||
# Try to import BLEInterface - this will work in CI where RNS is installed
|
||||
try:
|
||||
from ble_reticulum.BLEInterface import BLEInterface
|
||||
except ImportError:
|
||||
pytest.skip("BLEInterface not available")
|
||||
|
||||
with patch('ble_reticulum.BLEInterface.Interface.__init__'):
|
||||
# Create interface with mocked owner
|
||||
# Create interface without calling __init__
|
||||
interface = object.__new__(BLEInterface)
|
||||
|
||||
# Initialize required attributes
|
||||
# Initialize all required attributes manually
|
||||
interface.name = "TestBLE"
|
||||
interface.owner = Mock()
|
||||
interface.online = False
|
||||
interface.driver = Mock()
|
||||
interface.driver.on_device_connected = None
|
||||
interface.driver.on_device_disconnected = None
|
||||
interface.driver.on_data_received = None
|
||||
interface.driver.on_identity_received = None
|
||||
interface.driver.on_error = None
|
||||
interface.driver.on_duplicate_identity_detected = None
|
||||
interface.driver.on_address_changed = None
|
||||
interface.driver.request_identity_resync = Mock(return_value=False)
|
||||
interface.online = True
|
||||
interface.driver = mock_driver
|
||||
|
||||
# Initialize state dictionaries
|
||||
# Identity mappings
|
||||
interface.peers = {}
|
||||
interface.spawned_interfaces = {}
|
||||
interface.address_to_identity = {}
|
||||
interface.identity_to_address = {}
|
||||
interface._identity_cache = {}
|
||||
interface._identity_cache_ttl = 60
|
||||
|
||||
# Fragmentation
|
||||
interface.fragmenters = {}
|
||||
interface.reassemblers = {}
|
||||
interface.pending_mtu = {}
|
||||
|
|
@ -67,7 +97,7 @@ def create_minimal_ble_interface():
|
|||
interface.peer_lock = threading.RLock()
|
||||
interface.frag_lock = threading.RLock()
|
||||
|
||||
# Other required attributes
|
||||
# Other attributes
|
||||
interface.HW_MTU = 500
|
||||
interface.MIN_MTU = 20
|
||||
interface.bitrate = 700000
|
||||
|
|
@ -76,320 +106,408 @@ def create_minimal_ble_interface():
|
|||
|
||||
return interface
|
||||
|
||||
return None
|
||||
except ImportError as e:
|
||||
pytest.skip(f"Cannot import BLEInterface (RNS not installed): {e}")
|
||||
|
||||
|
||||
class TestIdentityCacheIntegration:
|
||||
"""Test identity cache with real BLEInterface methods."""
|
||||
class TestIdentityCacheOnDisconnect:
|
||||
"""Test that identity is cached when peer disconnects."""
|
||||
|
||||
def test_identity_cache_initialized(self):
|
||||
"""BLEInterface should have identity cache attributes."""
|
||||
interface = create_minimal_ble_interface()
|
||||
if interface is None:
|
||||
pytest.skip("Could not create interface")
|
||||
|
||||
assert hasattr(interface, '_identity_cache')
|
||||
assert hasattr(interface, '_identity_cache_ttl')
|
||||
assert interface._identity_cache_ttl == 60
|
||||
assert isinstance(interface._identity_cache, dict)
|
||||
|
||||
def test_disconnect_callback_caches_identity(self):
|
||||
"""_device_disconnected_callback should cache identity before cleanup."""
|
||||
interface = create_minimal_ble_interface()
|
||||
if interface is None:
|
||||
pytest.skip("Could not create interface")
|
||||
|
||||
# Setup: Simulate connected peer
|
||||
def test_disconnect_caches_identity(self, ble_interface, mock_rns):
|
||||
"""
|
||||
When _device_disconnected_callback is called, the peer's identity
|
||||
should be cached before cleanup.
|
||||
"""
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = interface._compute_identity_hash(identity) if hasattr(interface, '_compute_identity_hash') else "456c6978"
|
||||
identity_hash = identity[:8].hex() # First 8 bytes = 16 hex chars
|
||||
|
||||
interface.address_to_identity[mac] = identity
|
||||
interface.identity_to_address[identity_hash] = mac
|
||||
# Setup: Peer is connected with identity mapping
|
||||
ble_interface.address_to_identity[mac] = identity
|
||||
ble_interface.identity_to_address[identity_hash] = mac
|
||||
|
||||
# Create mock peer interface
|
||||
mock_peer_if = Mock()
|
||||
mock_peer_if.detach = Mock()
|
||||
interface.spawned_interfaces[identity_hash] = mock_peer_if
|
||||
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
|
||||
|
||||
# Call the disconnect callback
|
||||
if hasattr(interface, '_device_disconnected_callback'):
|
||||
interface._device_disconnected_callback(mac)
|
||||
# Add _compute_identity_hash method
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
|
||||
# Assert: Identity should be cached
|
||||
assert mac in interface._identity_cache
|
||||
cached_identity, cached_time = interface._identity_cache[mac]
|
||||
assert cached_identity == identity
|
||||
assert time.time() - cached_time < 2
|
||||
# Call the real disconnect callback
|
||||
ble_interface._device_disconnected_callback(mac)
|
||||
|
||||
def test_address_changed_callback_migrates_mappings(self):
|
||||
"""_address_changed_callback should migrate identity to new address."""
|
||||
interface = create_minimal_ble_interface()
|
||||
if interface is None:
|
||||
pytest.skip("Could not create interface")
|
||||
|
||||
old_mac = "54:8F:DF:44:79:2B"
|
||||
new_mac = "6B:2B:EE:1A:5B:94"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = "456c6978"
|
||||
|
||||
# Setup initial mapping
|
||||
interface.address_to_identity[old_mac] = identity
|
||||
interface.identity_to_address[identity_hash] = old_mac
|
||||
|
||||
# Call address changed callback
|
||||
if hasattr(interface, '_address_changed_callback'):
|
||||
interface._address_changed_callback(old_mac, new_mac, identity_hash)
|
||||
|
||||
# Assert: Mapping migrated to new address
|
||||
assert old_mac not in interface.address_to_identity
|
||||
assert new_mac in interface.address_to_identity
|
||||
assert interface.identity_to_address[identity_hash] == new_mac
|
||||
|
||||
|
||||
class TestIdentityCache:
|
||||
"""Test identity caching logic (unit tests with mocks)."""
|
||||
|
||||
def test_identity_cached_on_disconnect(self):
|
||||
"""
|
||||
When a peer disconnects, its identity should be cached for later recovery.
|
||||
"""
|
||||
# Setup mock interface state
|
||||
interface = Mock()
|
||||
interface.address_to_identity = {}
|
||||
interface.identity_to_address = {}
|
||||
interface.spawned_interfaces = {}
|
||||
interface._identity_cache = {}
|
||||
interface._identity_cache_ttl = 60
|
||||
|
||||
# Simulate connected peer
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = "456c6978"
|
||||
|
||||
interface.address_to_identity[mac] = identity
|
||||
interface.identity_to_address[identity_hash] = mac
|
||||
interface.spawned_interfaces[identity_hash] = Mock()
|
||||
|
||||
# Simulate disconnect with caching (the fix)
|
||||
peer_identity = interface.address_to_identity.get(mac)
|
||||
if peer_identity:
|
||||
# Cache identity before cleanup
|
||||
interface._identity_cache[mac] = (peer_identity, time.time())
|
||||
|
||||
# Clean up active mappings
|
||||
del interface.address_to_identity[mac]
|
||||
del interface.identity_to_address[identity_hash]
|
||||
del interface.spawned_interfaces[identity_hash]
|
||||
|
||||
# Assert: Identity should be in cache
|
||||
assert mac in interface._identity_cache
|
||||
cached_identity, cached_time = interface._identity_cache[mac]
|
||||
# Assert: Identity should be cached
|
||||
assert mac in ble_interface._identity_cache
|
||||
cached_identity, cached_time = ble_interface._identity_cache[mac]
|
||||
assert cached_identity == identity
|
||||
assert time.time() - cached_time < 1 # Cached recently
|
||||
assert time.time() - cached_time < 2 # Cached recently
|
||||
|
||||
def test_cached_identity_restored_on_data_receive(self):
|
||||
"""
|
||||
When data arrives from a peer with no active identity mapping,
|
||||
check the cache and restore if found.
|
||||
"""
|
||||
interface = Mock()
|
||||
interface.address_to_identity = {}
|
||||
interface.identity_to_address = {}
|
||||
interface._identity_cache = {}
|
||||
interface._identity_cache_ttl = 60
|
||||
# Assert: Active mappings should be cleaned up
|
||||
assert mac not in ble_interface.address_to_identity
|
||||
assert identity_hash not in ble_interface.identity_to_address
|
||||
|
||||
# Setup: Identity in cache (from recent disconnect)
|
||||
# Assert: Peer interface was detached
|
||||
mock_peer_if.detach.assert_called_once()
|
||||
|
||||
def test_disconnect_unknown_address_no_crash(self, ble_interface, mock_rns):
|
||||
"""
|
||||
Disconnecting an unknown address should not crash.
|
||||
"""
|
||||
# Call disconnect for unknown address
|
||||
ble_interface._device_disconnected_callback("XX:XX:XX:XX:XX:XX")
|
||||
|
||||
# Should not raise, cache should be empty
|
||||
assert "XX:XX:XX:XX:XX:XX" not in ble_interface._identity_cache
|
||||
|
||||
|
||||
class TestIdentityCacheOnDataReceive:
|
||||
"""Test that cached identity is restored when data arrives."""
|
||||
|
||||
def test_data_restores_identity_from_cache(self, ble_interface, mock_rns):
|
||||
"""
|
||||
When data arrives from a peer with cached (but not active) identity,
|
||||
the identity should be restored from cache.
|
||||
"""
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
interface._identity_cache[mac] = (identity, time.time())
|
||||
identity_hash = identity[:8].hex()
|
||||
|
||||
# Simulate data arriving from "unknown" peer
|
||||
peer_identity = interface.address_to_identity.get(mac)
|
||||
assert peer_identity is None, "No active mapping"
|
||||
# Setup: Identity in cache (simulating recent disconnect)
|
||||
ble_interface._identity_cache[mac] = (identity, time.time())
|
||||
|
||||
# Check cache (the fix)
|
||||
cached = interface._identity_cache.get(mac)
|
||||
if cached:
|
||||
cached_identity, cached_time = cached
|
||||
if time.time() - cached_time < interface._identity_cache_ttl:
|
||||
# Restore from cache
|
||||
peer_identity = cached_identity
|
||||
interface.address_to_identity[mac] = peer_identity
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}"
|
||||
|
||||
# Assert: Identity restored from cache
|
||||
assert peer_identity == identity
|
||||
assert mac in interface.address_to_identity
|
||||
# Create mock fragmenter/reassembler
|
||||
from unittest.mock import MagicMock
|
||||
mock_reassembler = MagicMock()
|
||||
mock_reassembler.add_fragment = Mock(return_value=(False, None))
|
||||
|
||||
def test_cache_expires_after_ttl(self):
|
||||
frag_key = f"{identity_hash}_{mac}"
|
||||
ble_interface.reassemblers[frag_key] = mock_reassembler
|
||||
|
||||
# Create mock peer interface
|
||||
mock_peer_if = Mock()
|
||||
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
|
||||
|
||||
# Call _handle_ble_data with test data
|
||||
test_data = b"\x00\x01test_packet" # Fragment with header
|
||||
ble_interface._handle_ble_data(mac, test_data)
|
||||
|
||||
# Assert: Identity should be restored from cache
|
||||
assert mac in ble_interface.address_to_identity
|
||||
assert ble_interface.address_to_identity[mac] == identity
|
||||
assert ble_interface.identity_to_address[identity_hash] == mac
|
||||
|
||||
# Assert: Cache entry should be removed (now active)
|
||||
assert mac not in ble_interface._identity_cache
|
||||
|
||||
def test_data_expired_cache_requests_resync(self, ble_interface, mock_rns):
|
||||
"""
|
||||
Cached identities should expire after TTL (60 seconds).
|
||||
When data arrives with expired cache entry, request identity resync.
|
||||
"""
|
||||
interface = Mock()
|
||||
interface.address_to_identity = {}
|
||||
interface._identity_cache = {}
|
||||
interface._identity_cache_ttl = 60
|
||||
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
|
||||
# Cache with old timestamp (61 seconds ago)
|
||||
old_time = time.time() - 61
|
||||
interface._identity_cache[mac] = (identity, old_time)
|
||||
# Setup: Expired cache entry (61 seconds ago)
|
||||
ble_interface._identity_cache[mac] = (identity, time.time() - 61)
|
||||
|
||||
# Try to restore from cache
|
||||
peer_identity = None
|
||||
cached = interface._identity_cache.get(mac)
|
||||
if cached:
|
||||
cached_identity, cached_time = cached
|
||||
if time.time() - cached_time < interface._identity_cache_ttl:
|
||||
peer_identity = cached_identity
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
|
||||
# Assert: Cache expired, identity not restored
|
||||
assert peer_identity is None
|
||||
# Call _handle_ble_data
|
||||
test_data = b"\x00\x01test"
|
||||
ble_interface._handle_ble_data(mac, test_data)
|
||||
|
||||
def test_cache_valid_within_ttl(self):
|
||||
# Assert: Should request resync from driver
|
||||
ble_interface.driver.request_identity_resync.assert_called_once_with(mac)
|
||||
|
||||
# Assert: Identity should NOT be restored (expired)
|
||||
assert mac not in ble_interface.address_to_identity
|
||||
|
||||
def test_data_no_cache_no_identity_requests_resync(self, ble_interface, mock_rns):
|
||||
"""
|
||||
Cached identities should be valid within TTL.
|
||||
When data arrives with no identity and no cache, request resync.
|
||||
"""
|
||||
interface = Mock()
|
||||
interface.address_to_identity = {}
|
||||
interface._identity_cache = {}
|
||||
interface._identity_cache_ttl = 60
|
||||
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
|
||||
# Cache with recent timestamp (30 seconds ago)
|
||||
recent_time = time.time() - 30
|
||||
interface._identity_cache[mac] = (identity, recent_time)
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
|
||||
# Try to restore from cache
|
||||
peer_identity = None
|
||||
cached = interface._identity_cache.get(mac)
|
||||
if cached:
|
||||
cached_identity, cached_time = cached
|
||||
if time.time() - cached_time < interface._identity_cache_ttl:
|
||||
peer_identity = cached_identity
|
||||
# No cache, no identity mapping
|
||||
assert mac not in ble_interface._identity_cache
|
||||
assert mac not in ble_interface.address_to_identity
|
||||
|
||||
# Assert: Cache valid, identity restored
|
||||
assert peer_identity == identity
|
||||
# Call _handle_ble_data
|
||||
test_data = b"\x00\x01test"
|
||||
ble_interface._handle_ble_data(mac, test_data)
|
||||
|
||||
# Assert: Should request resync
|
||||
ble_interface.driver.request_identity_resync.assert_called_once_with(mac)
|
||||
|
||||
|
||||
class TestAddressChangedCallback:
|
||||
"""Test address migration during dual connection deduplication."""
|
||||
|
||||
def test_address_changed_migrates_identity_mapping(self):
|
||||
def test_address_changed_migrates_mappings(self, ble_interface, mock_rns):
|
||||
"""
|
||||
When driver closes one direction of a dual connection,
|
||||
identity mapping should migrate to the remaining address.
|
||||
When address changes, identity mappings should migrate to new address.
|
||||
"""
|
||||
interface = Mock()
|
||||
interface.address_to_identity = {}
|
||||
interface.identity_to_address = {}
|
||||
interface._identity_cache = {}
|
||||
|
||||
# Peer connected on old address
|
||||
old_mac = "54:8F:DF:44:79:2B"
|
||||
new_mac = "6B:2B:EE:1A:5B:94"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = "456c6978"
|
||||
identity_hash = identity[:8].hex()
|
||||
|
||||
interface.address_to_identity[old_mac] = identity
|
||||
interface.identity_to_address[identity_hash] = old_mac
|
||||
# Setup: Peer connected on old address
|
||||
ble_interface.address_to_identity[old_mac] = identity
|
||||
ble_interface.identity_to_address[identity_hash] = old_mac
|
||||
ble_interface.peers[old_mac] = (Mock(), 0, 185)
|
||||
ble_interface.pending_mtu[old_mac] = 185
|
||||
|
||||
# Simulate address change callback
|
||||
peer_identity = interface.address_to_identity.get(old_mac)
|
||||
if peer_identity:
|
||||
# Migrate to new address
|
||||
del interface.address_to_identity[old_mac]
|
||||
interface.address_to_identity[new_mac] = peer_identity
|
||||
interface.identity_to_address[identity_hash] = new_mac
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}"
|
||||
|
||||
# Assert: Mapping migrated
|
||||
assert old_mac not in interface.address_to_identity
|
||||
assert new_mac in interface.address_to_identity
|
||||
assert interface.identity_to_address[identity_hash] == new_mac
|
||||
# Add fragmenter for old address
|
||||
old_frag_key = f"{identity_hash}_{old_mac}"
|
||||
ble_interface.fragmenters[old_frag_key] = Mock()
|
||||
ble_interface.reassemblers[old_frag_key] = Mock()
|
||||
|
||||
def test_address_changed_uses_cache_if_not_in_active_mapping(self):
|
||||
# Call address changed callback
|
||||
ble_interface._address_changed_callback(old_mac, new_mac, identity_hash)
|
||||
|
||||
# Assert: Old address cleaned up
|
||||
assert old_mac not in ble_interface.address_to_identity
|
||||
assert old_mac not in ble_interface.peers
|
||||
assert old_mac not in ble_interface.pending_mtu
|
||||
assert old_frag_key not in ble_interface.fragmenters
|
||||
assert old_frag_key not in ble_interface.reassemblers
|
||||
|
||||
# Assert: New address has mappings
|
||||
assert new_mac in ble_interface.address_to_identity
|
||||
assert ble_interface.address_to_identity[new_mac] == identity
|
||||
assert ble_interface.identity_to_address[identity_hash] == new_mac
|
||||
assert new_mac in ble_interface.peers
|
||||
assert new_mac in ble_interface.pending_mtu
|
||||
|
||||
# Assert: Fragmenter migrated
|
||||
new_frag_key = f"{identity_hash}_{new_mac}"
|
||||
assert new_frag_key in ble_interface.fragmenters
|
||||
assert new_frag_key in ble_interface.reassemblers
|
||||
|
||||
def test_address_changed_uses_cache_fallback(self, ble_interface, mock_rns):
|
||||
"""
|
||||
If old address not in active mapping, check cache for identity.
|
||||
When old address not in active mapping, check cache.
|
||||
"""
|
||||
interface = Mock()
|
||||
interface.address_to_identity = {}
|
||||
interface.identity_to_address = {}
|
||||
interface._identity_cache = {}
|
||||
|
||||
old_mac = "54:8F:DF:44:79:2B"
|
||||
new_mac = "6B:2B:EE:1A:5B:94"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = identity[:8].hex()
|
||||
|
||||
# Setup: Identity in cache, not active mapping
|
||||
ble_interface._identity_cache[old_mac] = (identity, time.time())
|
||||
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}"
|
||||
|
||||
# Call address changed callback
|
||||
ble_interface._address_changed_callback(old_mac, new_mac, identity_hash)
|
||||
|
||||
# Assert: Cache was used and cleared
|
||||
assert old_mac not in ble_interface._identity_cache
|
||||
|
||||
# Assert: New address has identity
|
||||
assert new_mac in ble_interface.address_to_identity
|
||||
assert ble_interface.address_to_identity[new_mac] == identity
|
||||
|
||||
def test_address_changed_no_identity_logs_warning(self, ble_interface, mock_rns):
|
||||
"""
|
||||
When no identity found for old address, log warning.
|
||||
"""
|
||||
old_mac = "54:8F:DF:44:79:2B"
|
||||
new_mac = "6B:2B:EE:1A:5B:94"
|
||||
identity_hash = "456c6978"
|
||||
|
||||
# Identity in cache, not active mapping
|
||||
interface._identity_cache[old_mac] = (identity, time.time())
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}"
|
||||
|
||||
# Simulate address change - check cache as fallback
|
||||
peer_identity = interface.address_to_identity.get(old_mac)
|
||||
if not peer_identity:
|
||||
cached = interface._identity_cache.get(old_mac)
|
||||
if cached:
|
||||
peer_identity = cached[0]
|
||||
del interface._identity_cache[old_mac]
|
||||
# No identity anywhere
|
||||
assert old_mac not in ble_interface.address_to_identity
|
||||
assert old_mac not in ble_interface._identity_cache
|
||||
|
||||
if peer_identity:
|
||||
interface.address_to_identity[new_mac] = peer_identity
|
||||
interface.identity_to_address[identity_hash] = new_mac
|
||||
# Call address changed callback - should not crash
|
||||
ble_interface._address_changed_callback(old_mac, new_mac, identity_hash)
|
||||
|
||||
# Assert: Identity recovered from cache and migrated
|
||||
assert old_mac not in interface._identity_cache
|
||||
assert new_mac in interface.address_to_identity
|
||||
assert interface.identity_to_address[identity_hash] == new_mac
|
||||
# Assert: Warning was logged
|
||||
mock_rns.log.assert_called()
|
||||
# Check that a warning about no identity was logged
|
||||
calls = [str(c) for c in mock_rns.log.call_args_list]
|
||||
assert any("no identity found" in c.lower() for c in calls)
|
||||
|
||||
|
||||
class TestDriverResync:
|
||||
"""Test driver-level identity resync fallback."""
|
||||
class TestCacheTTL:
|
||||
"""Test cache TTL (time-to-live) behavior."""
|
||||
|
||||
def test_request_identity_resync_available(self):
|
||||
"""
|
||||
Driver should expose request_identity_resync method.
|
||||
"""
|
||||
driver = Mock()
|
||||
driver.request_identity_resync = Mock(return_value=True)
|
||||
|
||||
# Call resync
|
||||
result = driver.request_identity_resync("54:8F:DF:44:79:2B")
|
||||
|
||||
assert result is True
|
||||
driver.request_identity_resync.assert_called_once_with("54:8F:DF:44:79:2B")
|
||||
|
||||
def test_resync_used_when_cache_miss(self):
|
||||
"""
|
||||
When data arrives from unknown peer and cache miss,
|
||||
try driver resync as last resort.
|
||||
"""
|
||||
interface = Mock()
|
||||
interface.address_to_identity = {}
|
||||
interface._identity_cache = {}
|
||||
interface._identity_cache_ttl = 60
|
||||
interface.driver = Mock()
|
||||
interface.driver.request_identity_resync = Mock(return_value=True)
|
||||
def test_cache_ttl_default_60_seconds(self, ble_interface):
|
||||
"""Cache TTL should default to 60 seconds."""
|
||||
assert ble_interface._identity_cache_ttl == 60
|
||||
|
||||
def test_cache_valid_at_59_seconds(self, ble_interface, mock_rns):
|
||||
"""Cache should be valid at 59 seconds."""
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = identity[:8].hex()
|
||||
|
||||
# No active mapping
|
||||
peer_identity = interface.address_to_identity.get(mac)
|
||||
assert peer_identity is None
|
||||
# Cache entry from 59 seconds ago
|
||||
ble_interface._identity_cache[mac] = (identity, time.time() - 59)
|
||||
|
||||
# No cache hit
|
||||
cached = interface._identity_cache.get(mac)
|
||||
assert cached is None
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}"
|
||||
|
||||
# Try driver resync as fallback
|
||||
resync_result = interface.driver.request_identity_resync(mac)
|
||||
# Create mock reassembler
|
||||
mock_reassembler = MagicMock()
|
||||
mock_reassembler.add_fragment = Mock(return_value=(False, None))
|
||||
frag_key = f"{identity_hash}_{mac}"
|
||||
ble_interface.reassemblers[frag_key] = mock_reassembler
|
||||
|
||||
# Assert: Resync attempted
|
||||
assert resync_result is True
|
||||
interface.driver.request_identity_resync.assert_called_once_with(mac)
|
||||
# Create mock peer interface
|
||||
ble_interface.spawned_interfaces[identity_hash] = Mock()
|
||||
|
||||
# Call _handle_ble_data
|
||||
test_data = b"\x00\x01test"
|
||||
ble_interface._handle_ble_data(mac, test_data)
|
||||
|
||||
# Assert: Identity was restored (cache was valid)
|
||||
assert mac in ble_interface.address_to_identity
|
||||
assert ble_interface.driver.request_identity_resync.call_count == 0
|
||||
|
||||
def test_cache_expired_at_61_seconds(self, ble_interface, mock_rns):
|
||||
"""Cache should be expired at 61 seconds."""
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
|
||||
# Cache entry from 61 seconds ago
|
||||
ble_interface._identity_cache[mac] = (identity, time.time() - 61)
|
||||
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
|
||||
# Call _handle_ble_data
|
||||
test_data = b"\x00\x01test"
|
||||
ble_interface._handle_ble_data(mac, test_data)
|
||||
|
||||
# Assert: Identity was NOT restored, resync was requested
|
||||
assert mac not in ble_interface.address_to_identity
|
||||
ble_interface.driver.request_identity_resync.assert_called_once_with(mac)
|
||||
|
||||
|
||||
class TestReassemblyCodePath:
|
||||
"""Test identity cache in the reassembly code path."""
|
||||
|
||||
def test_reassembly_restores_identity_from_cache(self, ble_interface, mock_rns):
|
||||
"""
|
||||
The reassembly completion code should also check the cache.
|
||||
"""
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = identity[:8].hex()
|
||||
|
||||
# Setup: Identity in cache
|
||||
ble_interface._identity_cache[mac] = (identity, time.time())
|
||||
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}"
|
||||
|
||||
# Create mock reassembler that returns complete packet
|
||||
mock_reassembler = MagicMock()
|
||||
mock_reassembler.add_fragment = Mock(return_value=(True, b"complete_packet"))
|
||||
frag_key = f"{identity_hash}_{mac}"
|
||||
ble_interface.reassemblers[frag_key] = mock_reassembler
|
||||
|
||||
# Create mock peer interface
|
||||
mock_peer_if = Mock()
|
||||
mock_peer_if.process_incoming = Mock()
|
||||
ble_interface.spawned_interfaces[identity_hash] = mock_peer_if
|
||||
|
||||
# Call _handle_ble_data with fragment
|
||||
test_data = b"\x00\x01test"
|
||||
ble_interface._handle_ble_data(mac, test_data)
|
||||
|
||||
# Assert: Identity was restored
|
||||
assert mac in ble_interface.address_to_identity
|
||||
assert mac not in ble_interface._identity_cache
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and error handling."""
|
||||
|
||||
def test_cache_handles_concurrent_access(self, ble_interface, mock_rns):
|
||||
"""
|
||||
Cache operations should be thread-safe.
|
||||
"""
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
|
||||
results = []
|
||||
|
||||
def cache_and_retrieve():
|
||||
# Cache identity
|
||||
ble_interface._identity_cache[mac] = (identity, time.time())
|
||||
time.sleep(0.001)
|
||||
# Retrieve from cache
|
||||
cached = ble_interface._identity_cache.get(mac)
|
||||
results.append(cached is not None)
|
||||
|
||||
# Run multiple threads
|
||||
threads = [threading.Thread(target=cache_and_retrieve) for _ in range(10)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# All operations should succeed
|
||||
assert all(results)
|
||||
|
||||
def test_multiple_disconnects_same_address(self, ble_interface, mock_rns):
|
||||
"""
|
||||
Multiple disconnects for same address should update cache timestamp.
|
||||
"""
|
||||
mac = "54:8F:DF:44:79:2B"
|
||||
identity = bytes.fromhex("456c6978823c85e228a545259c241e0e")
|
||||
identity_hash = identity[:8].hex()
|
||||
|
||||
# Add required methods
|
||||
ble_interface._compute_identity_hash = lambda x: x[:8].hex()
|
||||
|
||||
# First disconnect
|
||||
ble_interface.address_to_identity[mac] = identity
|
||||
ble_interface.identity_to_address[identity_hash] = mac
|
||||
ble_interface.spawned_interfaces[identity_hash] = Mock(detach=Mock())
|
||||
|
||||
ble_interface._device_disconnected_callback(mac)
|
||||
first_cache_time = ble_interface._identity_cache[mac][1]
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# Second disconnect (peer reconnected and disconnected again)
|
||||
ble_interface.address_to_identity[mac] = identity
|
||||
ble_interface.identity_to_address[identity_hash] = mac
|
||||
ble_interface.spawned_interfaces[identity_hash] = Mock(detach=Mock())
|
||||
|
||||
ble_interface._device_disconnected_callback(mac)
|
||||
second_cache_time = ble_interface._identity_cache[mac][1]
|
||||
|
||||
# Cache timestamp should be updated
|
||||
assert second_cache_time > first_cache_time
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue