2025-10-26 19:02:39 -04:00
|
|
|
"""
|
|
|
|
|
Unit tests for BLEPeerInterface class.
|
|
|
|
|
|
|
|
|
|
Tests the spawned peer interface that represents individual BLE connections,
|
|
|
|
|
including data flow, fragmentation, and both central/peripheral modes.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
import asyncio
|
fix(ble): Event-driven D-Bus monitoring to eliminate HCI errors on BCM43xx chips (#30)
* fix(ble): Increase D-Bus monitoring intervals to prevent HCI errors
The D-Bus monitoring threads were polling too frequently (0.5s and 30s),
causing HCI command collisions on BCM43xx single-radio chips. These chips
cannot handle concurrent BLE operations, and the frequent D-Bus activity
was interfering with scan/advertise cycles.
Changes:
- Increase D-Bus disconnect monitor interval from 0.5s to 5s
- Increase stale connection poll interval from 30s to 120s
This eliminates HCI errors (Opcode 0x2005/0x2006) while preserving
disconnect detection functionality with slightly higher latency.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* refactor(ble): Convert D-Bus monitoring to event-driven approach
Replace polling-based D-Bus monitoring with true event-driven pattern:
1. D-Bus monitor thread:
- Use asyncio.Event instead of periodic sleep
- Store loop reference for thread-safe shutdown
- Use call_soon_threadsafe to wake loop on stop
2. Stale poll thread:
- Replace busy-wait loop (240 x 0.5s) with single Event.wait()
- Increase interval from 120s to 300s (safety net only)
- Immediate response to stop signal
Benefits:
- Zero CPU usage while waiting (no periodic wakeups)
- Immediate shutdown response (ms instead of 5s)
- Cleaner, simpler code
- Maintains disconnect detection via D-Bus signals
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* test(ble): Add comprehensive unit tests for HCI error fixes
Add 26 new unit tests for the event-driven D-Bus monitoring fixes
that eliminated HCI errors on BCM43xx single-radio chips.
Test coverage:
- TestEventDrivenDBusMonitor: Tests asyncio.Event usage, immediate
wake response, call_soon_threadsafe cross-thread signaling
- TestStalePollImprovements: Tests threading.Event.wait() usage,
300s interval, immediate stop response
- TestStopShutdownBehavior: Tests stop() async signaling, RuntimeError
handling, shutdown latency improvement
- TestIntegrationScenarios: Tests full lifecycle, multiple stop calls
- TestCodeVerification: Verifies actual source patterns match expected
All 26 tests pass without requiring pytest-asyncio plugin.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Use threading.RLock instead of asyncio.Lock in test fixtures
In Python 3.8/3.9, asyncio.Lock() requires a running event loop. When
test_hci_error_fixes.py runs first (alphabetically) and uses asyncio.run(),
it closes the event loop after each test. Subsequent test fixtures that
create asyncio.Lock() then fail with "no current event loop" errors.
Since these are mock fixtures that don't need async semantics, use
threading.RLock() instead.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Replace all asyncio.Lock() with threading.RLock() in test mocks
asyncio.Lock() requires a running event loop in Python 3.8/3.9. When
test files using asyncio.run() execute first, the event loop is closed,
causing subsequent test fixtures to fail when creating asyncio.Lock().
Fixed in:
- test_peripheral_disconnect_cleanup.py (mock_gatt_server fixture)
- test_bluez_state_cleanup.py (mock_driver fixture)
- test_ble_peer_interface.py (create_mock_peer_interface helper)
- conftest.py (create_mock_interface helper)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
---------
Co-authored-by: torlando-tech <torlando-tech@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 00:46:06 -05:00
|
|
|
import threading
|
2025-10-26 19:02:39 -04:00
|
|
|
from unittest.mock import Mock, AsyncMock, patch, MagicMock
|
|
|
|
|
|
|
|
|
|
# Import fragmentation for testing
|
|
|
|
|
try:
|
2025-12-29 23:30:07 -05:00
|
|
|
from ble_reticulum.BLEFragmentation import BLEFragmenter, BLEReassembler
|
2025-10-26 19:02:39 -04:00
|
|
|
except ImportError:
|
|
|
|
|
BLEFragmenter = None
|
|
|
|
|
BLEReassembler = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Helper: Create Mock BLEPeerInterface
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
def create_mock_peer_interface(peer_address="AA:BB:CC:DD:EE:FF", peer_name="TestPeer", is_peripheral=False):
|
|
|
|
|
"""Create a mock BLEPeerInterface for testing."""
|
|
|
|
|
# Mock parent interface
|
|
|
|
|
parent = Mock()
|
|
|
|
|
parent.name = "TestBLEInterface"
|
|
|
|
|
parent.owner = Mock()
|
|
|
|
|
parent.owner.inbound = Mock()
|
|
|
|
|
parent.online = True
|
|
|
|
|
parent.HW_MTU = 500
|
|
|
|
|
parent.bitrate = 700000
|
|
|
|
|
parent.rxb = 0
|
|
|
|
|
parent.txb = 0
|
|
|
|
|
parent.peers = {peer_address: (Mock(is_connected=True), 0, 185)}
|
|
|
|
|
parent.fragmenters = {peer_address: BLEFragmenter(mtu=185) if BLEFragmenter else Mock()}
|
|
|
|
|
parent.reassemblers = {peer_address: BLEReassembler() if BLEReassembler else Mock()}
|
fix(ble): Event-driven D-Bus monitoring to eliminate HCI errors on BCM43xx chips (#30)
* fix(ble): Increase D-Bus monitoring intervals to prevent HCI errors
The D-Bus monitoring threads were polling too frequently (0.5s and 30s),
causing HCI command collisions on BCM43xx single-radio chips. These chips
cannot handle concurrent BLE operations, and the frequent D-Bus activity
was interfering with scan/advertise cycles.
Changes:
- Increase D-Bus disconnect monitor interval from 0.5s to 5s
- Increase stale connection poll interval from 30s to 120s
This eliminates HCI errors (Opcode 0x2005/0x2006) while preserving
disconnect detection functionality with slightly higher latency.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* refactor(ble): Convert D-Bus monitoring to event-driven approach
Replace polling-based D-Bus monitoring with true event-driven pattern:
1. D-Bus monitor thread:
- Use asyncio.Event instead of periodic sleep
- Store loop reference for thread-safe shutdown
- Use call_soon_threadsafe to wake loop on stop
2. Stale poll thread:
- Replace busy-wait loop (240 x 0.5s) with single Event.wait()
- Increase interval from 120s to 300s (safety net only)
- Immediate response to stop signal
Benefits:
- Zero CPU usage while waiting (no periodic wakeups)
- Immediate shutdown response (ms instead of 5s)
- Cleaner, simpler code
- Maintains disconnect detection via D-Bus signals
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* test(ble): Add comprehensive unit tests for HCI error fixes
Add 26 new unit tests for the event-driven D-Bus monitoring fixes
that eliminated HCI errors on BCM43xx single-radio chips.
Test coverage:
- TestEventDrivenDBusMonitor: Tests asyncio.Event usage, immediate
wake response, call_soon_threadsafe cross-thread signaling
- TestStalePollImprovements: Tests threading.Event.wait() usage,
300s interval, immediate stop response
- TestStopShutdownBehavior: Tests stop() async signaling, RuntimeError
handling, shutdown latency improvement
- TestIntegrationScenarios: Tests full lifecycle, multiple stop calls
- TestCodeVerification: Verifies actual source patterns match expected
All 26 tests pass without requiring pytest-asyncio plugin.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Use threading.RLock instead of asyncio.Lock in test fixtures
In Python 3.8/3.9, asyncio.Lock() requires a running event loop. When
test_hci_error_fixes.py runs first (alphabetically) and uses asyncio.run(),
it closes the event loop after each test. Subsequent test fixtures that
create asyncio.Lock() then fail with "no current event loop" errors.
Since these are mock fixtures that don't need async semantics, use
threading.RLock() instead.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Replace all asyncio.Lock() with threading.RLock() in test mocks
asyncio.Lock() requires a running event loop in Python 3.8/3.9. When
test files using asyncio.run() execute first, the event loop is closed,
causing subsequent test fixtures to fail when creating asyncio.Lock().
Fixed in:
- test_peripheral_disconnect_cleanup.py (mock_gatt_server fixture)
- test_bluez_state_cleanup.py (mock_driver fixture)
- test_ble_peer_interface.py (create_mock_peer_interface helper)
- conftest.py (create_mock_interface helper)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
---------
Co-authored-by: torlando-tech <torlando-tech@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 00:46:06 -05:00
|
|
|
parent.frag_lock = threading.RLock() # Use threading lock for mock
|
|
|
|
|
parent.peer_lock = threading.RLock() # Use threading lock for mock
|
2026-01-18 01:26:57 -05:00
|
|
|
# Create a new event loop for testing (Python 3.10+ requires this)
|
|
|
|
|
parent.loop = asyncio.new_event_loop()
|
2025-10-26 19:02:39 -04:00
|
|
|
parent.gatt_server = Mock()
|
|
|
|
|
parent.gatt_server.send_notification = AsyncMock(return_value=True)
|
|
|
|
|
|
|
|
|
|
# Mock peer interface
|
|
|
|
|
peer_if = Mock()
|
|
|
|
|
peer_if.parent_interface = parent
|
|
|
|
|
peer_if.peer_address = peer_address
|
|
|
|
|
peer_if.peer_name = peer_name
|
|
|
|
|
peer_if.online = True
|
|
|
|
|
peer_if.is_peripheral_connection = is_peripheral
|
|
|
|
|
peer_if.HW_MTU = parent.HW_MTU
|
|
|
|
|
peer_if.bitrate = parent.bitrate
|
|
|
|
|
peer_if.rxb = 0
|
|
|
|
|
peer_if.txb = 0
|
|
|
|
|
|
|
|
|
|
return peer_if, parent
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Basic Operations Tests
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(BLEFragmenter is None, reason="BLEFragmentation not available")
|
|
|
|
|
class TestPeerInterfaceBasics:
|
|
|
|
|
"""Test basic BLEPeerInterface operations."""
|
|
|
|
|
|
|
|
|
|
def test_peer_interface_initialization(self):
|
|
|
|
|
"""Test that peer interface initializes with correct attributes."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(
|
|
|
|
|
peer_address="AA:BB:CC:DD:EE:FF",
|
|
|
|
|
peer_name="TestDevice"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert peer_if.parent_interface == parent
|
|
|
|
|
assert peer_if.peer_address == "AA:BB:CC:DD:EE:FF"
|
|
|
|
|
assert peer_if.peer_name == "TestDevice"
|
|
|
|
|
assert peer_if.online is True
|
|
|
|
|
assert peer_if.HW_MTU == 500
|
|
|
|
|
assert peer_if.bitrate == 700000
|
|
|
|
|
|
|
|
|
|
def test_process_incoming_updates_stats(self):
|
|
|
|
|
"""Test that processing incoming data updates statistics."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface()
|
|
|
|
|
|
|
|
|
|
# Simulate incoming data
|
|
|
|
|
test_data = b"Hello, BLE!" * 10
|
|
|
|
|
initial_rxb = peer_if.rxb
|
|
|
|
|
|
|
|
|
|
# Mock the process_incoming behavior
|
|
|
|
|
peer_if.rxb += len(test_data)
|
|
|
|
|
parent.rxb += len(test_data)
|
|
|
|
|
|
|
|
|
|
# Verify stats updated
|
|
|
|
|
assert peer_if.rxb == initial_rxb + len(test_data)
|
|
|
|
|
assert parent.rxb == len(test_data)
|
|
|
|
|
|
|
|
|
|
def test_process_outgoing_updates_stats(self):
|
|
|
|
|
"""Test that sending data updates statistics."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface()
|
|
|
|
|
|
|
|
|
|
# Simulate outgoing data
|
|
|
|
|
test_data = b"Hello, BLE!" * 10
|
|
|
|
|
initial_txb = peer_if.txb
|
|
|
|
|
|
|
|
|
|
# Mock the process_outgoing behavior (fragmenting)
|
|
|
|
|
fragmenter = parent.fragmenters[peer_if.peer_address]
|
|
|
|
|
if hasattr(fragmenter, 'fragment_packet'):
|
|
|
|
|
fragments = fragmenter.fragment_packet(test_data)
|
|
|
|
|
for frag in fragments:
|
|
|
|
|
peer_if.txb += len(frag)
|
|
|
|
|
parent.txb += len(frag)
|
|
|
|
|
|
|
|
|
|
# Verify stats updated
|
|
|
|
|
assert peer_if.txb > initial_txb
|
|
|
|
|
assert parent.txb > 0
|
|
|
|
|
|
|
|
|
|
def test_detach_cleanup(self):
|
|
|
|
|
"""Test that detach properly cleans up."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface()
|
|
|
|
|
|
|
|
|
|
# Simulate detach
|
|
|
|
|
peer_if.online = False
|
|
|
|
|
|
|
|
|
|
# Verify state
|
|
|
|
|
assert peer_if.online is False
|
|
|
|
|
|
|
|
|
|
def test_should_ingress_limit_inheritance(self):
|
|
|
|
|
"""Test that ingress limiting inherits from parent."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface()
|
|
|
|
|
|
|
|
|
|
# Mock parent's should_ingress_limit
|
|
|
|
|
parent.should_ingress_limit = Mock(return_value=True)
|
|
|
|
|
|
|
|
|
|
# Peer interface should return same value
|
|
|
|
|
# (In real code, this would be: peer_if.should_ingress_limit())
|
|
|
|
|
assert parent.should_ingress_limit() is True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Central Mode Send Tests
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(BLEFragmenter is None, reason="BLEFragmentation not available")
|
|
|
|
|
class TestCentralModeSend:
|
|
|
|
|
"""Test sending data in central mode (via GATT write)."""
|
|
|
|
|
|
|
|
|
|
def test_send_via_central_single_fragment(self):
|
|
|
|
|
"""Test sending data that fits in one fragment."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
|
|
|
|
|
|
|
|
|
|
# Small data that fits in one fragment
|
|
|
|
|
test_data = b"Small packet"
|
|
|
|
|
fragmenter = parent.fragmenters[peer_if.peer_address]
|
|
|
|
|
|
|
|
|
|
# Fragment the data
|
|
|
|
|
fragments = fragmenter.fragment_packet(test_data)
|
|
|
|
|
|
|
|
|
|
# Should be only 1 fragment for small data
|
|
|
|
|
assert len(fragments) == 1
|
|
|
|
|
|
|
|
|
|
def test_send_via_central_multiple_fragments(self):
|
|
|
|
|
"""Test sending data that requires multiple fragments."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
|
|
|
|
|
|
|
|
|
|
# Large data that needs fragmentation
|
|
|
|
|
test_data = b"X" * 500 # 500 bytes > MTU(185)
|
|
|
|
|
fragmenter = parent.fragmenters[peer_if.peer_address]
|
|
|
|
|
|
|
|
|
|
# Fragment the data
|
|
|
|
|
fragments = fragmenter.fragment_packet(test_data)
|
|
|
|
|
|
|
|
|
|
# Should be multiple fragments
|
|
|
|
|
assert len(fragments) > 1
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_send_via_central_timeout(self):
|
|
|
|
|
"""Test handling of write timeout in central mode."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
|
|
|
|
|
|
|
|
|
|
# Get mock client
|
|
|
|
|
client, _, _ = parent.peers[peer_if.peer_address]
|
|
|
|
|
|
|
|
|
|
# Configure client to timeout
|
|
|
|
|
async def timeout_write(*args, **kwargs):
|
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
raise asyncio.TimeoutError("Write timeout")
|
|
|
|
|
|
|
|
|
|
client.write_gatt_char = AsyncMock(side_effect=timeout_write)
|
|
|
|
|
|
|
|
|
|
# Attempt write should timeout
|
|
|
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
|
|
|
await client.write_gatt_char("dummy-uuid", b"data")
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_send_via_central_connection_error(self):
|
|
|
|
|
"""Test handling of connection loss during send."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
|
|
|
|
|
|
|
|
|
|
# Get mock client
|
|
|
|
|
client, _, _ = parent.peers[peer_if.peer_address]
|
|
|
|
|
|
|
|
|
|
# Simulate disconnection
|
|
|
|
|
client.is_connected = False
|
|
|
|
|
|
|
|
|
|
# Verify disconnection is detected
|
|
|
|
|
assert client.is_connected is False
|
|
|
|
|
|
|
|
|
|
def test_send_via_central_no_fragmenter(self):
|
|
|
|
|
"""Test handling when fragmenter is missing."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=False)
|
|
|
|
|
|
|
|
|
|
# Remove fragmenter
|
|
|
|
|
del parent.fragmenters[peer_if.peer_address]
|
|
|
|
|
|
|
|
|
|
# Verify fragmenter is missing
|
|
|
|
|
assert peer_if.peer_address not in parent.fragmenters
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Peripheral Mode Send Tests
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(BLEFragmenter is None, reason="BLEFragmentation not available")
|
|
|
|
|
class TestPeripheralModeSend:
|
|
|
|
|
"""Test sending data in peripheral mode (via GATT notifications)."""
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_send_via_peripheral_single_fragment(self):
|
|
|
|
|
"""Test sending notification with single fragment."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
|
|
|
|
|
|
|
|
|
|
# Small data that fits in one fragment
|
|
|
|
|
test_data = b"Small notification"
|
|
|
|
|
fragmenter = parent.fragmenters[peer_if.peer_address]
|
|
|
|
|
fragments = fragmenter.fragment_packet(test_data)
|
|
|
|
|
|
|
|
|
|
# Should be 1 fragment
|
|
|
|
|
assert len(fragments) == 1
|
|
|
|
|
|
|
|
|
|
# Send notification
|
|
|
|
|
result = await parent.gatt_server.send_notification(fragments[0], peer_if.peer_address)
|
|
|
|
|
assert result is True
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_send_via_peripheral_multiple_fragments(self):
|
|
|
|
|
"""Test sending multiple notifications."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
|
|
|
|
|
|
|
|
|
|
# Large data needing fragmentation
|
|
|
|
|
test_data = b"Y" * 500
|
|
|
|
|
fragmenter = parent.fragmenters[peer_if.peer_address]
|
|
|
|
|
fragments = fragmenter.fragment_packet(test_data)
|
|
|
|
|
|
|
|
|
|
# Should be multiple fragments
|
|
|
|
|
assert len(fragments) > 1
|
|
|
|
|
|
|
|
|
|
# Send all fragments
|
|
|
|
|
for frag in fragments:
|
|
|
|
|
result = await parent.gatt_server.send_notification(frag, peer_if.peer_address)
|
|
|
|
|
assert result is True
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_send_via_peripheral_no_server(self):
|
|
|
|
|
"""Test handling when GATT server is not available."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
|
|
|
|
|
|
|
|
|
|
# Remove server
|
|
|
|
|
parent.gatt_server = None
|
|
|
|
|
|
|
|
|
|
# Verify no server
|
|
|
|
|
assert parent.gatt_server is None
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_send_via_peripheral_timeout(self):
|
|
|
|
|
"""Test notification timeout handling."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
|
|
|
|
|
|
|
|
|
|
# Configure server to timeout
|
|
|
|
|
async def timeout_notification(*args, **kwargs):
|
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
raise asyncio.TimeoutError("Notification timeout")
|
|
|
|
|
|
|
|
|
|
parent.gatt_server.send_notification = AsyncMock(side_effect=timeout_notification)
|
|
|
|
|
|
|
|
|
|
# Should timeout
|
|
|
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
|
|
|
await parent.gatt_server.send_notification(b"data", peer_if.peer_address)
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_send_via_peripheral_central_disconnected(self):
|
|
|
|
|
"""Test handling when target central is not connected."""
|
|
|
|
|
peer_if, parent = create_mock_peer_interface(is_peripheral=True)
|
|
|
|
|
|
|
|
|
|
# Configure server to return False (not connected)
|
|
|
|
|
parent.gatt_server.send_notification = AsyncMock(return_value=False)
|
|
|
|
|
|
|
|
|
|
# Should return False
|
|
|
|
|
result = await parent.gatt_server.send_notification(b"data", peer_if.peer_address)
|
|
|
|
|
assert result is False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
pytest.main([__file__, "-v"])
|