From 46299f3147e8c8c32b87aa3a67aee666a3db5b78 Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Tue, 30 Dec 2025 12:38:08 -0500 Subject: [PATCH 1/7] fix: add identity cache to prevent data loss on reconnection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When Python's disconnect callback fires but the driver layer (Android/Kotlin) maintains or quickly re-establishes the GATT connection, data was being dropped because address_to_identity was cleared. Changes: - Add _identity_cache with 60-second TTL to preserve identities after disconnect - Cache identity in _device_disconnected_callback before cleanup - Check cache in _handle_ble_data and restore identity if found - Add on_address_changed callback for dual connection deduplication - Add _address_changed_callback to migrate identity mappings - Support driver.request_identity_resync() for fallback recovery This fixes the "no identity for peer X, dropping data" warning that occurred when the Python layer lost track of a peer that was still connected at the driver level. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/ble_reticulum/BLEInterface.py | 109 +++++++++++++++++++++++++- src/ble_reticulum/bluetooth_driver.py | 1 + 2 files changed, 106 insertions(+), 4 deletions(-) diff --git a/src/ble_reticulum/BLEInterface.py b/src/ble_reticulum/BLEInterface.py index 7c4b1db..1f67909 100644 --- a/src/ble_reticulum/BLEInterface.py +++ b/src/ble_reticulum/BLEInterface.py @@ -376,6 +376,10 @@ class BLEInterface(Interface): self.spawned_interfaces = {} # identity_hash (16 hex chars) -> BLEPeerInterface self.address_to_identity = {} # address -> peer_identity (16-byte identity) self.identity_to_address = {} # identity_hash -> address (for reverse lookup) + # Cache for recently disconnected identities (address -> (identity, timestamp)) + # Used to restore identity when peer reconnects before cache expiry (60s) + self._identity_cache = {} + self._identity_cache_ttl = 60 # seconds # Fragmentation self.fragmenters = {} # address -> BLEFragmenter (per MTU) @@ -410,6 +414,7 @@ class BLEInterface(Interface): self.driver.on_device_disconnected = self._device_disconnected_callback self.driver.on_error = self._error_callback self.driver.on_duplicate_identity_detected = self._check_duplicate_identity + self.driver.on_address_changed = self._address_changed_callback # Redirect Python logging to RNS logging for proper formatting self._setup_logging_redirect() @@ -985,6 +990,12 @@ class BLEInterface(Interface): peer_identity = self.address_to_identity.get(address) if peer_identity: identity_hash = self._compute_identity_hash(peer_identity) + + # Cache identity before cleanup - allows restoration if peer reconnects + # without a full identity handshake (e.g., Android maintains GATT connection) + self._identity_cache[address] = (peer_identity, time.time()) + RNS.log(f"{self} cached identity for {address} (TTL {self._identity_cache_ttl}s)", RNS.LOG_DEBUG) + if identity_hash in self.spawned_interfaces: peer_if = self.spawned_interfaces[identity_hash] peer_if.detach() @@ -1054,6 +1065,66 @@ class BLEInterface(Interface): RNS.log(f"{self} cleaned up stale state for {old_address}", RNS.LOG_DEBUG) + def _address_changed_callback(self, old_address: str, new_address: str, identity_hash: str): + """ + Driver callback: Handle address change during dual connection deduplication. + + When the driver deduplicates a dual connection (same identity connected as both + central and peripheral), it closes one direction and notifies us to update + our address mappings. + + Args: + old_address: The address that was closed/removed + new_address: The address that remains active + identity_hash: The 32-char hex identity hash for this peer + """ + RNS.log( + f"{self} address changed for {identity_hash[:8]}: {old_address} -> {new_address}", + RNS.LOG_INFO + ) + + # Get peer identity from old address before cleanup + peer_identity = self.address_to_identity.get(old_address) + if not peer_identity: + # Try cache if not in active mapping + cached = self._identity_cache.get(old_address) + if cached: + peer_identity = cached[0] + del self._identity_cache[old_address] + + if peer_identity: + # Migrate address_to_identity mapping + if old_address in self.address_to_identity: + del self.address_to_identity[old_address] + self.address_to_identity[new_address] = peer_identity + + # Update identity_to_address to point to new address + computed_hash = self._compute_identity_hash(peer_identity) + self.identity_to_address[computed_hash] = new_address + + # Migrate fragmenter/reassembler from old to new key + old_frag_key = self._get_fragmenter_key(peer_identity, old_address) + new_frag_key = self._get_fragmenter_key(peer_identity, new_address) + with self.frag_lock: + if old_frag_key in self.fragmenters: + self.fragmenters[new_frag_key] = self.fragmenters.pop(old_frag_key) + if old_frag_key in self.reassemblers: + self.reassemblers[new_frag_key] = self.reassemblers.pop(old_frag_key) + + RNS.log(f"{self} migrated identity mappings from {old_address} to {new_address}", RNS.LOG_DEBUG) + else: + RNS.log(f"{self} no identity found for {old_address} during address change", RNS.LOG_WARNING) + + # Clean up old address from other state + if old_address in self.pending_mtu: + mtu = self.pending_mtu.pop(old_address) + self.pending_mtu[new_address] = mtu + + with self.peer_lock: + if old_address in self.peers: + peer_data = self.peers.pop(old_address) + self.peers[new_address] = peer_data + def _error_callback(self, severity: str, message: str, exc: Exception = None): """ Driver callback: Handle driver errors. @@ -1520,8 +1591,28 @@ class BLEInterface(Interface): # Look up peer identity to compute fragmenter key peer_identity = self.address_to_identity.get(peer_address) if not peer_identity: - RNS.log(f"{self} no identity for peer {peer_address}, dropping data", RNS.LOG_WARNING) - return + # Try identity cache - peer may have "disconnected" from Python's view + # but Android/driver layer maintains the GATT connection + cached = self._identity_cache.get(peer_address) + if cached and (time.time() - cached[1]) < self._identity_cache_ttl: + peer_identity = cached[0] + # Restore identity mapping - peer is still active + self.address_to_identity[peer_address] = peer_identity + identity_hash = self._compute_identity_hash(peer_identity) + self.identity_to_address[identity_hash] = peer_address + RNS.log(f"{self} restored identity from cache for {peer_address}", RNS.LOG_DEBUG) + # Remove from cache since it's now active again + del self._identity_cache[peer_address] + else: + # Neither active mapping nor cache has identity - request resync from driver + # This handles the case where Android maintained connection but Python lost state + if hasattr(self.driver, 'request_identity_resync'): + RNS.log(f"{self} requesting identity resync for {peer_address}", RNS.LOG_DEBUG) + self.driver.request_identity_resync(peer_address) + # Drop this packet - next one should work after resync completes + else: + RNS.log(f"{self} no identity for peer {peer_address}, dropping data", RNS.LOG_WARNING) + return # Compute identity-based fragmenter key (matches peripheral data handler) frag_key = self._get_fragmenter_key(peer_identity, peer_address) @@ -1577,8 +1668,18 @@ class BLEInterface(Interface): peer_identity = self.address_to_identity.get(peer_address, None) if not peer_identity: - RNS.log(f"{self} no identity for peer {peer_address}, packet dropped", RNS.LOG_WARNING) - return + # Fallback to cache (should already be restored above, but defensive) + cached = self._identity_cache.get(peer_address) + if cached and (time.time() - cached[1]) < self._identity_cache_ttl: + peer_identity = cached[0] + self.address_to_identity[peer_address] = peer_identity + identity_hash = self._compute_identity_hash(peer_identity) + self.identity_to_address[identity_hash] = peer_address + del self._identity_cache[peer_address] + RNS.log(f"{self} restored identity from cache for {peer_address} (reassembly)", RNS.LOG_DEBUG) + else: + RNS.log(f"{self} no identity for peer {peer_address}, packet dropped", RNS.LOG_WARNING) + return identity_hash = self._compute_identity_hash(peer_identity) peer_if = self.spawned_interfaces.get(identity_hash, None) diff --git a/src/ble_reticulum/bluetooth_driver.py b/src/ble_reticulum/bluetooth_driver.py index 0cdffec..c4153b0 100644 --- a/src/ble_reticulum/bluetooth_driver.py +++ b/src/ble_reticulum/bluetooth_driver.py @@ -49,6 +49,7 @@ class BLEDriverInterface(ABC): on_data_received: Optional[Callable[[str, bytes], None]] = None # address, data on_mtu_negotiated: Optional[Callable[[str, int], None]] = None # address, mtu on_error: Optional[Callable[[str, str, Optional[Exception]], None]] = None # severity, message, exception + on_address_changed: Optional[Callable[[str, str, str], None]] = None # old_address, new_address, identity_hash # --- Lifecycle & Configuration --- From d576dcce502988fc7186c1171ba8d20ec3af5c84 Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Tue, 30 Dec 2025 13:56:58 -0500 Subject: [PATCH 2/7] test: add unit tests for identity cache feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add comprehensive tests for: - Identity caching on disconnect (60s TTL) - Cache retrieval when data arrives from unknown peer - Cache expiry after TTL - Address change callback for dual connection deduplication - Driver identity resync fallback 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/test_identity_cache.py | 269 +++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 tests/test_identity_cache.py diff --git a/tests/test_identity_cache.py b/tests/test_identity_cache.py new file mode 100644 index 0000000..521df0e --- /dev/null +++ b/tests/test_identity_cache.py @@ -0,0 +1,269 @@ +""" +Tests for Identity Cache on Disconnect + +When a BLE peer disconnects, we cache its identity for 60 seconds to handle +cases where Python's disconnect callback fires but the driver layer maintains +or quickly re-establishes the GATT connection. + +This prevents data loss when: +1. App force-stop/restart causes temporary disconnect +2. Driver maintains connection while Python clears identity mapping +3. Data arrives from "unknown" peer that was actually still connected +""" + +import pytest +import time +from unittest.mock import Mock, MagicMock, patch +import sys +import os + +# Add src to path - conftest.py handles RNS mocking + + +class TestIdentityCache: + """Test identity caching for reconnection recovery.""" + + def test_identity_cached_on_disconnect(self): + """ + When a peer disconnects, its identity should be cached for later recovery. + """ + # Setup mock interface state + interface = Mock() + interface.address_to_identity = {} + interface.identity_to_address = {} + interface.spawned_interfaces = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + + # Simulate connected peer + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = "456c6978" + + interface.address_to_identity[mac] = identity + interface.identity_to_address[identity_hash] = mac + interface.spawned_interfaces[identity_hash] = Mock() + + # Simulate disconnect with caching (the fix) + peer_identity = interface.address_to_identity.get(mac) + if peer_identity: + # Cache identity before cleanup + interface._identity_cache[mac] = (peer_identity, time.time()) + + # Clean up active mappings + del interface.address_to_identity[mac] + del interface.identity_to_address[identity_hash] + del interface.spawned_interfaces[identity_hash] + + # Assert: Identity should be in cache + assert mac in interface._identity_cache + cached_identity, cached_time = interface._identity_cache[mac] + assert cached_identity == identity + assert time.time() - cached_time < 1 # Cached recently + + def test_cached_identity_restored_on_data_receive(self): + """ + When data arrives from a peer with no active identity mapping, + check the cache and restore if found. + """ + interface = Mock() + interface.address_to_identity = {} + interface.identity_to_address = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + + # Setup: Identity in cache (from recent disconnect) + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + interface._identity_cache[mac] = (identity, time.time()) + + # Simulate data arriving from "unknown" peer + peer_identity = interface.address_to_identity.get(mac) + assert peer_identity is None, "No active mapping" + + # Check cache (the fix) + cached = interface._identity_cache.get(mac) + if cached: + cached_identity, cached_time = cached + if time.time() - cached_time < interface._identity_cache_ttl: + # Restore from cache + peer_identity = cached_identity + interface.address_to_identity[mac] = peer_identity + + # Assert: Identity restored from cache + assert peer_identity == identity + assert mac in interface.address_to_identity + + def test_cache_expires_after_ttl(self): + """ + Cached identities should expire after TTL (60 seconds). + """ + interface = Mock() + interface.address_to_identity = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + + # Cache with old timestamp (61 seconds ago) + old_time = time.time() - 61 + interface._identity_cache[mac] = (identity, old_time) + + # Try to restore from cache + peer_identity = None + cached = interface._identity_cache.get(mac) + if cached: + cached_identity, cached_time = cached + if time.time() - cached_time < interface._identity_cache_ttl: + peer_identity = cached_identity + + # Assert: Cache expired, identity not restored + assert peer_identity is None + + def test_cache_valid_within_ttl(self): + """ + Cached identities should be valid within TTL. + """ + interface = Mock() + interface.address_to_identity = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + + # Cache with recent timestamp (30 seconds ago) + recent_time = time.time() - 30 + interface._identity_cache[mac] = (identity, recent_time) + + # Try to restore from cache + peer_identity = None + cached = interface._identity_cache.get(mac) + if cached: + cached_identity, cached_time = cached + if time.time() - cached_time < interface._identity_cache_ttl: + peer_identity = cached_identity + + # Assert: Cache valid, identity restored + assert peer_identity == identity + + +class TestAddressChangedCallback: + """Test address migration during dual connection deduplication.""" + + def test_address_changed_migrates_identity_mapping(self): + """ + When driver closes one direction of a dual connection, + identity mapping should migrate to the remaining address. + """ + interface = Mock() + interface.address_to_identity = {} + interface.identity_to_address = {} + interface._identity_cache = {} + + # Peer connected on old address + old_mac = "54:8F:DF:44:79:2B" + new_mac = "6B:2B:EE:1A:5B:94" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = "456c6978" + + interface.address_to_identity[old_mac] = identity + interface.identity_to_address[identity_hash] = old_mac + + # Simulate address change callback + peer_identity = interface.address_to_identity.get(old_mac) + if peer_identity: + # Migrate to new address + del interface.address_to_identity[old_mac] + interface.address_to_identity[new_mac] = peer_identity + interface.identity_to_address[identity_hash] = new_mac + + # Assert: Mapping migrated + assert old_mac not in interface.address_to_identity + assert new_mac in interface.address_to_identity + assert interface.identity_to_address[identity_hash] == new_mac + + def test_address_changed_uses_cache_if_not_in_active_mapping(self): + """ + If old address not in active mapping, check cache for identity. + """ + interface = Mock() + interface.address_to_identity = {} + interface.identity_to_address = {} + interface._identity_cache = {} + + old_mac = "54:8F:DF:44:79:2B" + new_mac = "6B:2B:EE:1A:5B:94" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = "456c6978" + + # Identity in cache, not active mapping + interface._identity_cache[old_mac] = (identity, time.time()) + + # Simulate address change - check cache as fallback + peer_identity = interface.address_to_identity.get(old_mac) + if not peer_identity: + cached = interface._identity_cache.get(old_mac) + if cached: + peer_identity = cached[0] + del interface._identity_cache[old_mac] + + if peer_identity: + interface.address_to_identity[new_mac] = peer_identity + interface.identity_to_address[identity_hash] = new_mac + + # Assert: Identity recovered from cache and migrated + assert old_mac not in interface._identity_cache + assert new_mac in interface.address_to_identity + assert interface.identity_to_address[identity_hash] == new_mac + + +class TestDriverResync: + """Test driver-level identity resync fallback.""" + + def test_request_identity_resync_available(self): + """ + Driver should expose request_identity_resync method. + """ + driver = Mock() + driver.request_identity_resync = Mock(return_value=True) + + # Call resync + result = driver.request_identity_resync("54:8F:DF:44:79:2B") + + assert result is True + driver.request_identity_resync.assert_called_once_with("54:8F:DF:44:79:2B") + + def test_resync_used_when_cache_miss(self): + """ + When data arrives from unknown peer and cache miss, + try driver resync as last resort. + """ + interface = Mock() + interface.address_to_identity = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + interface.driver = Mock() + interface.driver.request_identity_resync = Mock(return_value=True) + + mac = "54:8F:DF:44:79:2B" + + # No active mapping + peer_identity = interface.address_to_identity.get(mac) + assert peer_identity is None + + # No cache hit + cached = interface._identity_cache.get(mac) + assert cached is None + + # Try driver resync as fallback + resync_result = interface.driver.request_identity_resync(mac) + + # Assert: Resync attempted + assert resync_result is True + interface.driver.request_identity_resync.assert_called_once_with(mac) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From b1206b3c6ea8fafdc045b72ff37db98ca9982742 Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Tue, 30 Dec 2025 14:07:24 -0500 Subject: [PATCH 3/7] fix: configure codecov token and coverage paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add CODECOV_TOKEN to codecov-action uploads (required for v4) - Change unit test coverage from single file to entire package - Add codecov.yml with coverage thresholds and flags Note: CODECOV_TOKEN secret must be added to repository settings. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .github/workflows/test.yml | 4 +++- codecov.yml | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 codecov.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7dfc077..1356c0f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -86,7 +86,7 @@ jobs: run: | # Run only unit tests (fragmentation and prioritization) python -m pytest tests/test_fragmentation.py tests/test_prioritization.py -v \ - --cov=src/ble_reticulum/BLEFragmentation.py \ + --cov=src/ble_reticulum \ --cov-report=term-missing \ --cov-report=xml:coverage-unit.xml continue-on-error: false @@ -95,6 +95,7 @@ jobs: if: matrix.python-version == '3.11' uses: codecov/codecov-action@v4 with: + token: ${{ secrets.CODECOV_TOKEN }} file: ./coverage-unit.xml flags: unit fail_ci_if_error: false @@ -151,6 +152,7 @@ jobs: if: matrix.python-version == '3.11' uses: codecov/codecov-action@v4 with: + token: ${{ secrets.CODECOV_TOKEN }} file: ./coverage-integration.xml flags: integration fail_ci_if_error: false diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..5048949 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,33 @@ +coverage: + precision: 2 + round: down + range: "60...100" + status: + project: + default: + target: auto + threshold: 5% + patch: + default: + target: 80% + threshold: 5% + +comment: + layout: "reach,diff,flags,files" + behavior: default + require_changes: true + +flags: + unit: + paths: + - src/ble_reticulum/ + carryforward: true + integration: + paths: + - src/ble_reticulum/ + carryforward: true + +ignore: + - "tests/**/*" + - "**/__pycache__/**" + - "**/conftest.py" From d332f9a9bb0ecf5f608905019058e0787593c36d Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Tue, 30 Dec 2025 14:14:50 -0500 Subject: [PATCH 4/7] test: add integration tests that exercise real BLEInterface code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add TestIdentityCacheIntegration class that imports and tests actual BLEInterface methods instead of just mocking the logic. This should provide codecov coverage on the changed lines. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/test_identity_cache.py | 131 ++++++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 2 deletions(-) diff --git a/tests/test_identity_cache.py b/tests/test_identity_cache.py index 521df0e..2490207 100644 --- a/tests/test_identity_cache.py +++ b/tests/test_identity_cache.py @@ -13,15 +13,142 @@ This prevents data loss when: import pytest import time -from unittest.mock import Mock, MagicMock, patch +import threading +from unittest.mock import Mock, MagicMock, patch, AsyncMock import sys import os +import asyncio # Add src to path - conftest.py handles RNS mocking +def create_minimal_ble_interface(): + """ + Create a minimal BLEInterface instance for testing identity cache. + + Mocks external dependencies (driver, RNS) while keeping the real + identity cache logic intact. + """ + try: + from ble_reticulum.BLEInterface import BLEInterface + except ImportError: + pytest.skip("BLEInterface not available") + + with patch('ble_reticulum.BLEInterface.Interface.__init__'): + # Create interface with mocked owner + interface = object.__new__(BLEInterface) + + # Initialize required attributes + interface.name = "TestBLE" + interface.owner = Mock() + interface.online = False + interface.driver = Mock() + interface.driver.on_device_connected = None + interface.driver.on_device_disconnected = None + interface.driver.on_data_received = None + interface.driver.on_identity_received = None + interface.driver.on_error = None + interface.driver.on_duplicate_identity_detected = None + interface.driver.on_address_changed = None + interface.driver.request_identity_resync = Mock(return_value=False) + + # Initialize state dictionaries + interface.peers = {} + interface.spawned_interfaces = {} + interface.address_to_identity = {} + interface.identity_to_address = {} + interface._identity_cache = {} + interface._identity_cache_ttl = 60 + interface.fragmenters = {} + interface.reassemblers = {} + interface.pending_mtu = {} + + # Locks + interface.peer_lock = threading.RLock() + interface.frag_lock = threading.RLock() + + # Other required attributes + interface.HW_MTU = 500 + interface.MIN_MTU = 20 + interface.bitrate = 700000 + interface.rxb = 0 + interface.txb = 0 + + return interface + + return None + + +class TestIdentityCacheIntegration: + """Test identity cache with real BLEInterface methods.""" + + def test_identity_cache_initialized(self): + """BLEInterface should have identity cache attributes.""" + interface = create_minimal_ble_interface() + if interface is None: + pytest.skip("Could not create interface") + + assert hasattr(interface, '_identity_cache') + assert hasattr(interface, '_identity_cache_ttl') + assert interface._identity_cache_ttl == 60 + assert isinstance(interface._identity_cache, dict) + + def test_disconnect_callback_caches_identity(self): + """_device_disconnected_callback should cache identity before cleanup.""" + interface = create_minimal_ble_interface() + if interface is None: + pytest.skip("Could not create interface") + + # Setup: Simulate connected peer + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = interface._compute_identity_hash(identity) if hasattr(interface, '_compute_identity_hash') else "456c6978" + + interface.address_to_identity[mac] = identity + interface.identity_to_address[identity_hash] = mac + + # Create mock peer interface + mock_peer_if = Mock() + mock_peer_if.detach = Mock() + interface.spawned_interfaces[identity_hash] = mock_peer_if + + # Call the disconnect callback + if hasattr(interface, '_device_disconnected_callback'): + interface._device_disconnected_callback(mac) + + # Assert: Identity should be cached + assert mac in interface._identity_cache + cached_identity, cached_time = interface._identity_cache[mac] + assert cached_identity == identity + assert time.time() - cached_time < 2 + + def test_address_changed_callback_migrates_mappings(self): + """_address_changed_callback should migrate identity to new address.""" + interface = create_minimal_ble_interface() + if interface is None: + pytest.skip("Could not create interface") + + old_mac = "54:8F:DF:44:79:2B" + new_mac = "6B:2B:EE:1A:5B:94" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = "456c6978" + + # Setup initial mapping + interface.address_to_identity[old_mac] = identity + interface.identity_to_address[identity_hash] = old_mac + + # Call address changed callback + if hasattr(interface, '_address_changed_callback'): + interface._address_changed_callback(old_mac, new_mac, identity_hash) + + # Assert: Mapping migrated to new address + assert old_mac not in interface.address_to_identity + assert new_mac in interface.address_to_identity + assert interface.identity_to_address[identity_hash] == new_mac + + class TestIdentityCache: - """Test identity caching for reconnection recovery.""" + """Test identity caching logic (unit tests with mocks).""" def test_identity_cached_on_disconnect(self): """ From b5518e379928bf2c2a3a1fffbd227935fdc58053 Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Tue, 30 Dec 2025 14:20:29 -0500 Subject: [PATCH 5/7] test: comprehensive integration tests for identity cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 14 tests that exercise the REAL BLEInterface code: - TestIdentityCacheOnDisconnect: verify caching on disconnect - TestIdentityCacheOnDataReceive: verify cache restoration on data - TestAddressChangedCallback: verify address migration - TestCacheTTL: verify 60-second TTL behavior - TestReassemblyCodePath: verify cache in reassembly path - TestEdgeCases: concurrent access, multiple disconnects Tests skip locally if RNS not installed but run in CI to provide actual line coverage on the identity cache changes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/test_identity_cache.py | 656 +++++++++++++++++++++-------------- 1 file changed, 387 insertions(+), 269 deletions(-) diff --git a/tests/test_identity_cache.py b/tests/test_identity_cache.py index 2490207..b796219 100644 --- a/tests/test_identity_cache.py +++ b/tests/test_identity_cache.py @@ -9,56 +9,86 @@ This prevents data loss when: 1. App force-stop/restart causes temporary disconnect 2. Driver maintains connection while Python clears identity mapping 3. Data arrives from "unknown" peer that was actually still connected + +These tests exercise the REAL BLEInterface code with mocked external dependencies. """ import pytest import time import threading -from unittest.mock import Mock, MagicMock, patch, AsyncMock +from unittest.mock import Mock, MagicMock, patch, PropertyMock import sys import os -import asyncio -# Add src to path - conftest.py handles RNS mocking +# Ensure src is in path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src')) -def create_minimal_ble_interface(): +@pytest.fixture +def mock_rns(): + """Mock RNS module for testing.""" + with patch.dict('sys.modules', {'RNS': MagicMock()}): + import sys + rns = sys.modules['RNS'] + rns.LOG_CRITICAL = 0 + rns.LOG_ERROR = 1 + rns.LOG_WARNING = 2 + rns.LOG_NOTICE = 3 + rns.LOG_INFO = 4 + rns.LOG_VERBOSE = 5 + rns.LOG_DEBUG = 6 + rns.LOG_EXTREME = 7 + rns.log = Mock() + rns.prettyhexrep = lambda x: x.hex() if isinstance(x, bytes) else str(x) + yield rns + + +@pytest.fixture +def mock_driver(): + """Create a mock BLE driver.""" + driver = Mock() + driver.on_device_connected = None + driver.on_device_disconnected = None + driver.on_data_received = None + driver.on_identity_received = None + driver.on_error = None + driver.on_duplicate_identity_detected = None + driver.on_address_changed = None + driver.request_identity_resync = Mock(return_value=False) + driver.get_cached_identity = Mock(return_value=None) + return driver + + +@pytest.fixture +def ble_interface(mock_rns, mock_driver): """ - Create a minimal BLEInterface instance for testing identity cache. + Create a BLEInterface instance with mocked dependencies. - Mocks external dependencies (driver, RNS) while keeping the real - identity cache logic intact. + This uses the REAL BLEInterface class but mocks external dependencies + to allow testing the identity cache logic. """ + # Try to import BLEInterface - this will work in CI where RNS is installed try: from ble_reticulum.BLEInterface import BLEInterface - except ImportError: - pytest.skip("BLEInterface not available") - with patch('ble_reticulum.BLEInterface.Interface.__init__'): - # Create interface with mocked owner + # Create interface without calling __init__ interface = object.__new__(BLEInterface) - # Initialize required attributes + # Initialize all required attributes manually interface.name = "TestBLE" interface.owner = Mock() - interface.online = False - interface.driver = Mock() - interface.driver.on_device_connected = None - interface.driver.on_device_disconnected = None - interface.driver.on_data_received = None - interface.driver.on_identity_received = None - interface.driver.on_error = None - interface.driver.on_duplicate_identity_detected = None - interface.driver.on_address_changed = None - interface.driver.request_identity_resync = Mock(return_value=False) + interface.online = True + interface.driver = mock_driver - # Initialize state dictionaries + # Identity mappings interface.peers = {} interface.spawned_interfaces = {} interface.address_to_identity = {} interface.identity_to_address = {} interface._identity_cache = {} interface._identity_cache_ttl = 60 + + # Fragmentation interface.fragmenters = {} interface.reassemblers = {} interface.pending_mtu = {} @@ -67,7 +97,7 @@ def create_minimal_ble_interface(): interface.peer_lock = threading.RLock() interface.frag_lock = threading.RLock() - # Other required attributes + # Other attributes interface.HW_MTU = 500 interface.MIN_MTU = 20 interface.bitrate = 700000 @@ -76,320 +106,408 @@ def create_minimal_ble_interface(): return interface - return None + except ImportError as e: + pytest.skip(f"Cannot import BLEInterface (RNS not installed): {e}") -class TestIdentityCacheIntegration: - """Test identity cache with real BLEInterface methods.""" +class TestIdentityCacheOnDisconnect: + """Test that identity is cached when peer disconnects.""" - def test_identity_cache_initialized(self): - """BLEInterface should have identity cache attributes.""" - interface = create_minimal_ble_interface() - if interface is None: - pytest.skip("Could not create interface") - - assert hasattr(interface, '_identity_cache') - assert hasattr(interface, '_identity_cache_ttl') - assert interface._identity_cache_ttl == 60 - assert isinstance(interface._identity_cache, dict) - - def test_disconnect_callback_caches_identity(self): - """_device_disconnected_callback should cache identity before cleanup.""" - interface = create_minimal_ble_interface() - if interface is None: - pytest.skip("Could not create interface") - - # Setup: Simulate connected peer + def test_disconnect_caches_identity(self, ble_interface, mock_rns): + """ + When _device_disconnected_callback is called, the peer's identity + should be cached before cleanup. + """ mac = "54:8F:DF:44:79:2B" identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") - identity_hash = interface._compute_identity_hash(identity) if hasattr(interface, '_compute_identity_hash') else "456c6978" + identity_hash = identity[:8].hex() # First 8 bytes = 16 hex chars - interface.address_to_identity[mac] = identity - interface.identity_to_address[identity_hash] = mac + # Setup: Peer is connected with identity mapping + ble_interface.address_to_identity[mac] = identity + ble_interface.identity_to_address[identity_hash] = mac # Create mock peer interface mock_peer_if = Mock() mock_peer_if.detach = Mock() - interface.spawned_interfaces[identity_hash] = mock_peer_if + ble_interface.spawned_interfaces[identity_hash] = mock_peer_if - # Call the disconnect callback - if hasattr(interface, '_device_disconnected_callback'): - interface._device_disconnected_callback(mac) + # Add _compute_identity_hash method + ble_interface._compute_identity_hash = lambda x: x[:8].hex() - # Assert: Identity should be cached - assert mac in interface._identity_cache - cached_identity, cached_time = interface._identity_cache[mac] - assert cached_identity == identity - assert time.time() - cached_time < 2 + # Call the real disconnect callback + ble_interface._device_disconnected_callback(mac) - def test_address_changed_callback_migrates_mappings(self): - """_address_changed_callback should migrate identity to new address.""" - interface = create_minimal_ble_interface() - if interface is None: - pytest.skip("Could not create interface") - - old_mac = "54:8F:DF:44:79:2B" - new_mac = "6B:2B:EE:1A:5B:94" - identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") - identity_hash = "456c6978" - - # Setup initial mapping - interface.address_to_identity[old_mac] = identity - interface.identity_to_address[identity_hash] = old_mac - - # Call address changed callback - if hasattr(interface, '_address_changed_callback'): - interface._address_changed_callback(old_mac, new_mac, identity_hash) - - # Assert: Mapping migrated to new address - assert old_mac not in interface.address_to_identity - assert new_mac in interface.address_to_identity - assert interface.identity_to_address[identity_hash] == new_mac - - -class TestIdentityCache: - """Test identity caching logic (unit tests with mocks).""" - - def test_identity_cached_on_disconnect(self): - """ - When a peer disconnects, its identity should be cached for later recovery. - """ - # Setup mock interface state - interface = Mock() - interface.address_to_identity = {} - interface.identity_to_address = {} - interface.spawned_interfaces = {} - interface._identity_cache = {} - interface._identity_cache_ttl = 60 - - # Simulate connected peer - mac = "54:8F:DF:44:79:2B" - identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") - identity_hash = "456c6978" - - interface.address_to_identity[mac] = identity - interface.identity_to_address[identity_hash] = mac - interface.spawned_interfaces[identity_hash] = Mock() - - # Simulate disconnect with caching (the fix) - peer_identity = interface.address_to_identity.get(mac) - if peer_identity: - # Cache identity before cleanup - interface._identity_cache[mac] = (peer_identity, time.time()) - - # Clean up active mappings - del interface.address_to_identity[mac] - del interface.identity_to_address[identity_hash] - del interface.spawned_interfaces[identity_hash] - - # Assert: Identity should be in cache - assert mac in interface._identity_cache - cached_identity, cached_time = interface._identity_cache[mac] + # Assert: Identity should be cached + assert mac in ble_interface._identity_cache + cached_identity, cached_time = ble_interface._identity_cache[mac] assert cached_identity == identity - assert time.time() - cached_time < 1 # Cached recently + assert time.time() - cached_time < 2 # Cached recently - def test_cached_identity_restored_on_data_receive(self): - """ - When data arrives from a peer with no active identity mapping, - check the cache and restore if found. - """ - interface = Mock() - interface.address_to_identity = {} - interface.identity_to_address = {} - interface._identity_cache = {} - interface._identity_cache_ttl = 60 + # Assert: Active mappings should be cleaned up + assert mac not in ble_interface.address_to_identity + assert identity_hash not in ble_interface.identity_to_address - # Setup: Identity in cache (from recent disconnect) + # Assert: Peer interface was detached + mock_peer_if.detach.assert_called_once() + + def test_disconnect_unknown_address_no_crash(self, ble_interface, mock_rns): + """ + Disconnecting an unknown address should not crash. + """ + # Call disconnect for unknown address + ble_interface._device_disconnected_callback("XX:XX:XX:XX:XX:XX") + + # Should not raise, cache should be empty + assert "XX:XX:XX:XX:XX:XX" not in ble_interface._identity_cache + + +class TestIdentityCacheOnDataReceive: + """Test that cached identity is restored when data arrives.""" + + def test_data_restores_identity_from_cache(self, ble_interface, mock_rns): + """ + When data arrives from a peer with cached (but not active) identity, + the identity should be restored from cache. + """ mac = "54:8F:DF:44:79:2B" identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") - interface._identity_cache[mac] = (identity, time.time()) + identity_hash = identity[:8].hex() - # Simulate data arriving from "unknown" peer - peer_identity = interface.address_to_identity.get(mac) - assert peer_identity is None, "No active mapping" + # Setup: Identity in cache (simulating recent disconnect) + ble_interface._identity_cache[mac] = (identity, time.time()) - # Check cache (the fix) - cached = interface._identity_cache.get(mac) - if cached: - cached_identity, cached_time = cached - if time.time() - cached_time < interface._identity_cache_ttl: - # Restore from cache - peer_identity = cached_identity - interface.address_to_identity[mac] = peer_identity + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}" - # Assert: Identity restored from cache - assert peer_identity == identity - assert mac in interface.address_to_identity + # Create mock fragmenter/reassembler + from unittest.mock import MagicMock + mock_reassembler = MagicMock() + mock_reassembler.add_fragment = Mock(return_value=(False, None)) - def test_cache_expires_after_ttl(self): + frag_key = f"{identity_hash}_{mac}" + ble_interface.reassemblers[frag_key] = mock_reassembler + + # Create mock peer interface + mock_peer_if = Mock() + ble_interface.spawned_interfaces[identity_hash] = mock_peer_if + + # Call _handle_ble_data with test data + test_data = b"\x00\x01test_packet" # Fragment with header + ble_interface._handle_ble_data(mac, test_data) + + # Assert: Identity should be restored from cache + assert mac in ble_interface.address_to_identity + assert ble_interface.address_to_identity[mac] == identity + assert ble_interface.identity_to_address[identity_hash] == mac + + # Assert: Cache entry should be removed (now active) + assert mac not in ble_interface._identity_cache + + def test_data_expired_cache_requests_resync(self, ble_interface, mock_rns): """ - Cached identities should expire after TTL (60 seconds). + When data arrives with expired cache entry, request identity resync. """ - interface = Mock() - interface.address_to_identity = {} - interface._identity_cache = {} - interface._identity_cache_ttl = 60 - mac = "54:8F:DF:44:79:2B" identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") - # Cache with old timestamp (61 seconds ago) - old_time = time.time() - 61 - interface._identity_cache[mac] = (identity, old_time) + # Setup: Expired cache entry (61 seconds ago) + ble_interface._identity_cache[mac] = (identity, time.time() - 61) - # Try to restore from cache - peer_identity = None - cached = interface._identity_cache.get(mac) - if cached: - cached_identity, cached_time = cached - if time.time() - cached_time < interface._identity_cache_ttl: - peer_identity = cached_identity + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() - # Assert: Cache expired, identity not restored - assert peer_identity is None + # Call _handle_ble_data + test_data = b"\x00\x01test" + ble_interface._handle_ble_data(mac, test_data) - def test_cache_valid_within_ttl(self): + # Assert: Should request resync from driver + ble_interface.driver.request_identity_resync.assert_called_once_with(mac) + + # Assert: Identity should NOT be restored (expired) + assert mac not in ble_interface.address_to_identity + + def test_data_no_cache_no_identity_requests_resync(self, ble_interface, mock_rns): """ - Cached identities should be valid within TTL. + When data arrives with no identity and no cache, request resync. """ - interface = Mock() - interface.address_to_identity = {} - interface._identity_cache = {} - interface._identity_cache_ttl = 60 - mac = "54:8F:DF:44:79:2B" - identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") - # Cache with recent timestamp (30 seconds ago) - recent_time = time.time() - 30 - interface._identity_cache[mac] = (identity, recent_time) + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() - # Try to restore from cache - peer_identity = None - cached = interface._identity_cache.get(mac) - if cached: - cached_identity, cached_time = cached - if time.time() - cached_time < interface._identity_cache_ttl: - peer_identity = cached_identity + # No cache, no identity mapping + assert mac not in ble_interface._identity_cache + assert mac not in ble_interface.address_to_identity - # Assert: Cache valid, identity restored - assert peer_identity == identity + # Call _handle_ble_data + test_data = b"\x00\x01test" + ble_interface._handle_ble_data(mac, test_data) + + # Assert: Should request resync + ble_interface.driver.request_identity_resync.assert_called_once_with(mac) class TestAddressChangedCallback: """Test address migration during dual connection deduplication.""" - def test_address_changed_migrates_identity_mapping(self): + def test_address_changed_migrates_mappings(self, ble_interface, mock_rns): """ - When driver closes one direction of a dual connection, - identity mapping should migrate to the remaining address. + When address changes, identity mappings should migrate to new address. """ - interface = Mock() - interface.address_to_identity = {} - interface.identity_to_address = {} - interface._identity_cache = {} - - # Peer connected on old address old_mac = "54:8F:DF:44:79:2B" new_mac = "6B:2B:EE:1A:5B:94" identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") - identity_hash = "456c6978" + identity_hash = identity[:8].hex() - interface.address_to_identity[old_mac] = identity - interface.identity_to_address[identity_hash] = old_mac + # Setup: Peer connected on old address + ble_interface.address_to_identity[old_mac] = identity + ble_interface.identity_to_address[identity_hash] = old_mac + ble_interface.peers[old_mac] = (Mock(), 0, 185) + ble_interface.pending_mtu[old_mac] = 185 - # Simulate address change callback - peer_identity = interface.address_to_identity.get(old_mac) - if peer_identity: - # Migrate to new address - del interface.address_to_identity[old_mac] - interface.address_to_identity[new_mac] = peer_identity - interface.identity_to_address[identity_hash] = new_mac + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}" - # Assert: Mapping migrated - assert old_mac not in interface.address_to_identity - assert new_mac in interface.address_to_identity - assert interface.identity_to_address[identity_hash] == new_mac + # Add fragmenter for old address + old_frag_key = f"{identity_hash}_{old_mac}" + ble_interface.fragmenters[old_frag_key] = Mock() + ble_interface.reassemblers[old_frag_key] = Mock() - def test_address_changed_uses_cache_if_not_in_active_mapping(self): + # Call address changed callback + ble_interface._address_changed_callback(old_mac, new_mac, identity_hash) + + # Assert: Old address cleaned up + assert old_mac not in ble_interface.address_to_identity + assert old_mac not in ble_interface.peers + assert old_mac not in ble_interface.pending_mtu + assert old_frag_key not in ble_interface.fragmenters + assert old_frag_key not in ble_interface.reassemblers + + # Assert: New address has mappings + assert new_mac in ble_interface.address_to_identity + assert ble_interface.address_to_identity[new_mac] == identity + assert ble_interface.identity_to_address[identity_hash] == new_mac + assert new_mac in ble_interface.peers + assert new_mac in ble_interface.pending_mtu + + # Assert: Fragmenter migrated + new_frag_key = f"{identity_hash}_{new_mac}" + assert new_frag_key in ble_interface.fragmenters + assert new_frag_key in ble_interface.reassemblers + + def test_address_changed_uses_cache_fallback(self, ble_interface, mock_rns): """ - If old address not in active mapping, check cache for identity. + When old address not in active mapping, check cache. """ - interface = Mock() - interface.address_to_identity = {} - interface.identity_to_address = {} - interface._identity_cache = {} - old_mac = "54:8F:DF:44:79:2B" new_mac = "6B:2B:EE:1A:5B:94" identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = identity[:8].hex() + + # Setup: Identity in cache, not active mapping + ble_interface._identity_cache[old_mac] = (identity, time.time()) + + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}" + + # Call address changed callback + ble_interface._address_changed_callback(old_mac, new_mac, identity_hash) + + # Assert: Cache was used and cleared + assert old_mac not in ble_interface._identity_cache + + # Assert: New address has identity + assert new_mac in ble_interface.address_to_identity + assert ble_interface.address_to_identity[new_mac] == identity + + def test_address_changed_no_identity_logs_warning(self, ble_interface, mock_rns): + """ + When no identity found for old address, log warning. + """ + old_mac = "54:8F:DF:44:79:2B" + new_mac = "6B:2B:EE:1A:5B:94" identity_hash = "456c6978" - # Identity in cache, not active mapping - interface._identity_cache[old_mac] = (identity, time.time()) + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}" - # Simulate address change - check cache as fallback - peer_identity = interface.address_to_identity.get(old_mac) - if not peer_identity: - cached = interface._identity_cache.get(old_mac) - if cached: - peer_identity = cached[0] - del interface._identity_cache[old_mac] + # No identity anywhere + assert old_mac not in ble_interface.address_to_identity + assert old_mac not in ble_interface._identity_cache - if peer_identity: - interface.address_to_identity[new_mac] = peer_identity - interface.identity_to_address[identity_hash] = new_mac + # Call address changed callback - should not crash + ble_interface._address_changed_callback(old_mac, new_mac, identity_hash) - # Assert: Identity recovered from cache and migrated - assert old_mac not in interface._identity_cache - assert new_mac in interface.address_to_identity - assert interface.identity_to_address[identity_hash] == new_mac + # Assert: Warning was logged + mock_rns.log.assert_called() + # Check that a warning about no identity was logged + calls = [str(c) for c in mock_rns.log.call_args_list] + assert any("no identity found" in c.lower() for c in calls) -class TestDriverResync: - """Test driver-level identity resync fallback.""" +class TestCacheTTL: + """Test cache TTL (time-to-live) behavior.""" - def test_request_identity_resync_available(self): - """ - Driver should expose request_identity_resync method. - """ - driver = Mock() - driver.request_identity_resync = Mock(return_value=True) - - # Call resync - result = driver.request_identity_resync("54:8F:DF:44:79:2B") - - assert result is True - driver.request_identity_resync.assert_called_once_with("54:8F:DF:44:79:2B") - - def test_resync_used_when_cache_miss(self): - """ - When data arrives from unknown peer and cache miss, - try driver resync as last resort. - """ - interface = Mock() - interface.address_to_identity = {} - interface._identity_cache = {} - interface._identity_cache_ttl = 60 - interface.driver = Mock() - interface.driver.request_identity_resync = Mock(return_value=True) + def test_cache_ttl_default_60_seconds(self, ble_interface): + """Cache TTL should default to 60 seconds.""" + assert ble_interface._identity_cache_ttl == 60 + def test_cache_valid_at_59_seconds(self, ble_interface, mock_rns): + """Cache should be valid at 59 seconds.""" mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = identity[:8].hex() - # No active mapping - peer_identity = interface.address_to_identity.get(mac) - assert peer_identity is None + # Cache entry from 59 seconds ago + ble_interface._identity_cache[mac] = (identity, time.time() - 59) - # No cache hit - cached = interface._identity_cache.get(mac) - assert cached is None + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}" - # Try driver resync as fallback - resync_result = interface.driver.request_identity_resync(mac) + # Create mock reassembler + mock_reassembler = MagicMock() + mock_reassembler.add_fragment = Mock(return_value=(False, None)) + frag_key = f"{identity_hash}_{mac}" + ble_interface.reassemblers[frag_key] = mock_reassembler - # Assert: Resync attempted - assert resync_result is True - interface.driver.request_identity_resync.assert_called_once_with(mac) + # Create mock peer interface + ble_interface.spawned_interfaces[identity_hash] = Mock() + + # Call _handle_ble_data + test_data = b"\x00\x01test" + ble_interface._handle_ble_data(mac, test_data) + + # Assert: Identity was restored (cache was valid) + assert mac in ble_interface.address_to_identity + assert ble_interface.driver.request_identity_resync.call_count == 0 + + def test_cache_expired_at_61_seconds(self, ble_interface, mock_rns): + """Cache should be expired at 61 seconds.""" + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + + # Cache entry from 61 seconds ago + ble_interface._identity_cache[mac] = (identity, time.time() - 61) + + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + + # Call _handle_ble_data + test_data = b"\x00\x01test" + ble_interface._handle_ble_data(mac, test_data) + + # Assert: Identity was NOT restored, resync was requested + assert mac not in ble_interface.address_to_identity + ble_interface.driver.request_identity_resync.assert_called_once_with(mac) + + +class TestReassemblyCodePath: + """Test identity cache in the reassembly code path.""" + + def test_reassembly_restores_identity_from_cache(self, ble_interface, mock_rns): + """ + The reassembly completion code should also check the cache. + """ + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = identity[:8].hex() + + # Setup: Identity in cache + ble_interface._identity_cache[mac] = (identity, time.time()) + + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + ble_interface._get_fragmenter_key = lambda ident, addr: f"{ident[:8].hex()}_{addr}" + + # Create mock reassembler that returns complete packet + mock_reassembler = MagicMock() + mock_reassembler.add_fragment = Mock(return_value=(True, b"complete_packet")) + frag_key = f"{identity_hash}_{mac}" + ble_interface.reassemblers[frag_key] = mock_reassembler + + # Create mock peer interface + mock_peer_if = Mock() + mock_peer_if.process_incoming = Mock() + ble_interface.spawned_interfaces[identity_hash] = mock_peer_if + + # Call _handle_ble_data with fragment + test_data = b"\x00\x01test" + ble_interface._handle_ble_data(mac, test_data) + + # Assert: Identity was restored + assert mac in ble_interface.address_to_identity + assert mac not in ble_interface._identity_cache + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def test_cache_handles_concurrent_access(self, ble_interface, mock_rns): + """ + Cache operations should be thread-safe. + """ + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + + results = [] + + def cache_and_retrieve(): + # Cache identity + ble_interface._identity_cache[mac] = (identity, time.time()) + time.sleep(0.001) + # Retrieve from cache + cached = ble_interface._identity_cache.get(mac) + results.append(cached is not None) + + # Run multiple threads + threads = [threading.Thread(target=cache_and_retrieve) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + # All operations should succeed + assert all(results) + + def test_multiple_disconnects_same_address(self, ble_interface, mock_rns): + """ + Multiple disconnects for same address should update cache timestamp. + """ + mac = "54:8F:DF:44:79:2B" + identity = bytes.fromhex("456c6978823c85e228a545259c241e0e") + identity_hash = identity[:8].hex() + + # Add required methods + ble_interface._compute_identity_hash = lambda x: x[:8].hex() + + # First disconnect + ble_interface.address_to_identity[mac] = identity + ble_interface.identity_to_address[identity_hash] = mac + ble_interface.spawned_interfaces[identity_hash] = Mock(detach=Mock()) + + ble_interface._device_disconnected_callback(mac) + first_cache_time = ble_interface._identity_cache[mac][1] + + time.sleep(0.1) + + # Second disconnect (peer reconnected and disconnected again) + ble_interface.address_to_identity[mac] = identity + ble_interface.identity_to_address[identity_hash] = mac + ble_interface.spawned_interfaces[identity_hash] = Mock(detach=Mock()) + + ble_interface._device_disconnected_callback(mac) + second_cache_time = ble_interface._identity_cache[mac][1] + + # Cache timestamp should be updated + assert second_cache_time > first_cache_time if __name__ == "__main__": From cb473b870479c1455c342e4527566d9fcf0864dc Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Tue, 30 Dec 2025 14:50:27 -0500 Subject: [PATCH 6/7] fix: patch RNS.log at module level in address_changed test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test was using mock_rns.log.assert_called() but the real BLEInterface code calls the actual RNS.log function, not the mock. Fixed by patching RNS.log at module level to capture actual log calls, then asserting the "no identity found" warning was logged. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/test_identity_cache.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/tests/test_identity_cache.py b/tests/test_identity_cache.py index b796219..33bc9c8 100644 --- a/tests/test_identity_cache.py +++ b/tests/test_identity_cache.py @@ -339,14 +339,22 @@ class TestAddressChangedCallback: assert old_mac not in ble_interface.address_to_identity assert old_mac not in ble_interface._identity_cache - # Call address changed callback - should not crash - ble_interface._address_changed_callback(old_mac, new_mac, identity_hash) + # Patch RNS.log at module level to capture calls + import RNS + original_log = RNS.log + log_calls = [] + RNS.log = lambda msg, level=4: log_calls.append((msg, level)) - # Assert: Warning was logged - mock_rns.log.assert_called() - # Check that a warning about no identity was logged - calls = [str(c) for c in mock_rns.log.call_args_list] - assert any("no identity found" in c.lower() for c in calls) + try: + # Call address changed callback - should not crash + ble_interface._address_changed_callback(old_mac, new_mac, identity_hash) + + # Assert: Warning was logged about no identity + assert len(log_calls) > 0, "Expected log calls" + assert any("no identity found" in str(msg).lower() for msg, level in log_calls), \ + f"Expected 'no identity found' warning, got: {log_calls}" + finally: + RNS.log = original_log class TestCacheTTL: From e30a73fd1b2bd18a8ac12d2af7f5350e6ebc0fe4 Mon Sep 17 00:00:00 2001 From: torlando-tech Date: Tue, 30 Dec 2025 14:59:28 -0500 Subject: [PATCH 7/7] fix: patch RNS.log in BLEInterface module's namespace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous patch changed RNS.log in the test's namespace, but BLEInterface.py imports RNS at module load time. To capture log calls from _address_changed_callback(), we need to patch RNS.log where it's used: ble_reticulum.BLEInterface.RNS.log 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/test_identity_cache.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/test_identity_cache.py b/tests/test_identity_cache.py index 33bc9c8..f723bdc 100644 --- a/tests/test_identity_cache.py +++ b/tests/test_identity_cache.py @@ -339,11 +339,15 @@ class TestAddressChangedCallback: assert old_mac not in ble_interface.address_to_identity assert old_mac not in ble_interface._identity_cache - # Patch RNS.log at module level to capture calls - import RNS - original_log = RNS.log + # Patch RNS.log in the BLEInterface module's namespace + from ble_reticulum import BLEInterface as ble_module log_calls = [] - RNS.log = lambda msg, level=4: log_calls.append((msg, level)) + + def capture_log(msg, level=4): + log_calls.append((msg, level)) + + original_log = ble_module.RNS.log + ble_module.RNS.log = capture_log try: # Call address changed callback - should not crash @@ -354,7 +358,7 @@ class TestAddressChangedCallback: assert any("no identity found" in str(msg).lower() for msg, level in log_calls), \ f"Expected 'no identity found' warning, got: {log_calls}" finally: - RNS.log = original_log + ble_module.RNS.log = original_log class TestCacheTTL: