2025-10-26 19:02:39 -04:00
|
|
|
"""
|
|
|
|
|
pytest configuration for BLE interface tests.
|
|
|
|
|
|
|
|
|
|
This file is automatically loaded by pytest before test collection begins.
|
|
|
|
|
It sets up the Python path to allow imports from src/ and Reticulum.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
# Calculate paths relative to this file's location
|
|
|
|
|
tests_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
project_root = os.path.dirname(tests_dir)
|
|
|
|
|
src_dir = os.path.join(project_root, 'src')
|
|
|
|
|
|
|
|
|
|
# Add src/ to path for BLE interface modules
|
2025-12-29 23:58:18 -05:00
|
|
|
# This allows tests to import from src/ble_reticulum/
|
2025-10-26 19:02:39 -04:00
|
|
|
if src_dir not in sys.path:
|
|
|
|
|
sys.path.insert(0, src_dir)
|
|
|
|
|
|
|
|
|
|
# Note: Some test files (test_gatt_integration.py, test_ble_integration.py) have
|
|
|
|
|
# import issues due to Python's namespace package limitations. They can't be run
|
|
|
|
|
# with 'pytest tests/' but work individually. This is expected until the BLE
|
|
|
|
|
# interface is fully integrated into the Reticulum repository.
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
import asyncio
|
fix(ble): Event-driven D-Bus monitoring to eliminate HCI errors on BCM43xx chips (#30)
* fix(ble): Increase D-Bus monitoring intervals to prevent HCI errors
The D-Bus monitoring threads were polling too frequently (0.5s and 30s),
causing HCI command collisions on BCM43xx single-radio chips. These chips
cannot handle concurrent BLE operations, and the frequent D-Bus activity
was interfering with scan/advertise cycles.
Changes:
- Increase D-Bus disconnect monitor interval from 0.5s to 5s
- Increase stale connection poll interval from 30s to 120s
This eliminates HCI errors (Opcode 0x2005/0x2006) while preserving
disconnect detection functionality with slightly higher latency.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* refactor(ble): Convert D-Bus monitoring to event-driven approach
Replace polling-based D-Bus monitoring with true event-driven pattern:
1. D-Bus monitor thread:
- Use asyncio.Event instead of periodic sleep
- Store loop reference for thread-safe shutdown
- Use call_soon_threadsafe to wake loop on stop
2. Stale poll thread:
- Replace busy-wait loop (240 x 0.5s) with single Event.wait()
- Increase interval from 120s to 300s (safety net only)
- Immediate response to stop signal
Benefits:
- Zero CPU usage while waiting (no periodic wakeups)
- Immediate shutdown response (ms instead of 5s)
- Cleaner, simpler code
- Maintains disconnect detection via D-Bus signals
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* test(ble): Add comprehensive unit tests for HCI error fixes
Add 26 new unit tests for the event-driven D-Bus monitoring fixes
that eliminated HCI errors on BCM43xx single-radio chips.
Test coverage:
- TestEventDrivenDBusMonitor: Tests asyncio.Event usage, immediate
wake response, call_soon_threadsafe cross-thread signaling
- TestStalePollImprovements: Tests threading.Event.wait() usage,
300s interval, immediate stop response
- TestStopShutdownBehavior: Tests stop() async signaling, RuntimeError
handling, shutdown latency improvement
- TestIntegrationScenarios: Tests full lifecycle, multiple stop calls
- TestCodeVerification: Verifies actual source patterns match expected
All 26 tests pass without requiring pytest-asyncio plugin.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Use threading.RLock instead of asyncio.Lock in test fixtures
In Python 3.8/3.9, asyncio.Lock() requires a running event loop. When
test_hci_error_fixes.py runs first (alphabetically) and uses asyncio.run(),
it closes the event loop after each test. Subsequent test fixtures that
create asyncio.Lock() then fail with "no current event loop" errors.
Since these are mock fixtures that don't need async semantics, use
threading.RLock() instead.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Replace all asyncio.Lock() with threading.RLock() in test mocks
asyncio.Lock() requires a running event loop in Python 3.8/3.9. When
test files using asyncio.run() execute first, the event loop is closed,
causing subsequent test fixtures to fail when creating asyncio.Lock().
Fixed in:
- test_peripheral_disconnect_cleanup.py (mock_gatt_server fixture)
- test_bluez_state_cleanup.py (mock_driver fixture)
- test_ble_peer_interface.py (create_mock_peer_interface helper)
- conftest.py (create_mock_interface helper)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
---------
Co-authored-by: torlando-tech <torlando-tech@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 00:46:06 -05:00
|
|
|
import threading
|
2025-10-26 19:02:39 -04:00
|
|
|
import time
|
|
|
|
|
from unittest.mock import Mock, AsyncMock, MagicMock, patch
|
|
|
|
|
from types import ModuleType
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Mock RNS module functions for testing
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
# Don't import the real RNS here as it may have crypto dependencies
|
|
|
|
|
# Instead, check if RNS stub exists in src/RNS/ and use that
|
|
|
|
|
rns_stub_path = os.path.join(src_dir, 'RNS')
|
|
|
|
|
if os.path.exists(os.path.join(rns_stub_path, '__init__.py')):
|
|
|
|
|
# RNS stub exists, we can import it
|
|
|
|
|
try:
|
|
|
|
|
import RNS
|
|
|
|
|
# Add mock functions if not already present
|
|
|
|
|
if not hasattr(RNS, 'LOG_INFO'):
|
|
|
|
|
RNS.LOG_CRITICAL = 0
|
|
|
|
|
RNS.LOG_ERROR = 1
|
|
|
|
|
RNS.LOG_WARNING = 2
|
|
|
|
|
RNS.LOG_NOTICE = 3
|
|
|
|
|
RNS.LOG_INFO = 4
|
|
|
|
|
RNS.LOG_VERBOSE = 5
|
|
|
|
|
RNS.LOG_DEBUG = 6
|
|
|
|
|
RNS.LOG_EXTREME = 7
|
|
|
|
|
|
|
|
|
|
if not hasattr(RNS, 'log'):
|
|
|
|
|
def mock_log(message, level=4):
|
|
|
|
|
pass
|
|
|
|
|
RNS.log = mock_log
|
|
|
|
|
|
|
|
|
|
if not hasattr(RNS, 'prettyhexrep'):
|
|
|
|
|
def mock_prettyhexrep(data):
|
|
|
|
|
return data.hex() if isinstance(data, bytes) else str(data)
|
|
|
|
|
RNS.prettyhexrep = mock_prettyhexrep
|
|
|
|
|
|
|
|
|
|
if not hasattr(RNS, 'hexrep'):
|
|
|
|
|
def mock_hexrep(data, delimit=True):
|
|
|
|
|
if isinstance(data, bytes):
|
|
|
|
|
hex_str = data.hex()
|
|
|
|
|
if delimit:
|
|
|
|
|
return ':'.join(hex_str[i:i+2] for i in range(0, len(hex_str), 2))
|
|
|
|
|
return hex_str
|
|
|
|
|
return str(data)
|
|
|
|
|
RNS.hexrep = mock_hexrep
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# If import fails, tests will handle RNS mocking individually
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Async Fixtures
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def event_loop():
|
|
|
|
|
"""Create an event loop for async tests."""
|
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
|
yield loop
|
|
|
|
|
loop.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
async def mock_event_loop():
|
|
|
|
|
"""Create a mock event loop that can be used in tests."""
|
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
yield loop
|
|
|
|
|
loop.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Mock BLE Components
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_bleak_client():
|
|
|
|
|
"""Create a mock BleakClient for testing central mode operations."""
|
|
|
|
|
client = AsyncMock()
|
|
|
|
|
client.address = "AA:BB:CC:DD:EE:FF"
|
|
|
|
|
client.is_connected = True
|
|
|
|
|
client.mtu_size = 185
|
|
|
|
|
client.connect = AsyncMock(return_value=True)
|
|
|
|
|
client.disconnect = AsyncMock(return_value=True)
|
|
|
|
|
client.start_notify = AsyncMock(return_value=True)
|
|
|
|
|
client.stop_notify = AsyncMock(return_value=True)
|
|
|
|
|
client.write_gatt_char = AsyncMock(return_value=True)
|
|
|
|
|
return client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_bleak_device():
|
|
|
|
|
"""Create a mock BLE device discovered during scan."""
|
|
|
|
|
device = Mock()
|
|
|
|
|
device.address = "AA:BB:CC:DD:EE:FF"
|
|
|
|
|
device.name = "Test-Device"
|
|
|
|
|
device.metadata = {
|
|
|
|
|
"uuids": ["00000001-5824-4f48-9e1a-3b3e8f0c1234"],
|
|
|
|
|
"rssi": -65
|
|
|
|
|
}
|
|
|
|
|
return device
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_bleak_scanner():
|
|
|
|
|
"""Create a mock BleakScanner for testing discovery."""
|
|
|
|
|
async def mock_discover(timeout=1.0):
|
|
|
|
|
"""Return mock discovered devices."""
|
|
|
|
|
device1 = Mock()
|
|
|
|
|
device1.address = "AA:BB:CC:DD:EE:01"
|
|
|
|
|
device1.name = "Device-1"
|
|
|
|
|
device1.metadata = {
|
|
|
|
|
"uuids": ["00000001-5824-4f48-9e1a-3b3e8f0c1234"],
|
|
|
|
|
"rssi": -50
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
device2 = Mock()
|
|
|
|
|
device2.address = "AA:BB:CC:DD:EE:02"
|
|
|
|
|
device2.name = "Device-2"
|
|
|
|
|
device2.metadata = {
|
|
|
|
|
"uuids": ["00000001-5824-4f48-9e1a-3b3e8f0c1234"],
|
|
|
|
|
"rssi": -70
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return [device1, device2]
|
|
|
|
|
|
|
|
|
|
with patch('bleak.BleakScanner.discover', side_effect=mock_discover):
|
|
|
|
|
yield
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_bless_server():
|
|
|
|
|
"""Create a mock BlessServer for testing GATT server operations."""
|
|
|
|
|
server = AsyncMock()
|
|
|
|
|
server.add_new_service = AsyncMock(return_value=True)
|
|
|
|
|
server.add_new_characteristic = AsyncMock(return_value=True)
|
|
|
|
|
server.update_value = AsyncMock(return_value=True)
|
|
|
|
|
server.start = AsyncMock(return_value=True)
|
|
|
|
|
server.stop = AsyncMock(return_value=True)
|
|
|
|
|
return server
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Mock RNS Components
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_rns_owner():
|
|
|
|
|
"""Create a mock Reticulum Transport owner for BLEInterface."""
|
|
|
|
|
owner = Mock()
|
|
|
|
|
owner.inbound = Mock()
|
|
|
|
|
return owner
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_rns_transport():
|
|
|
|
|
"""Mock RNS.Transport for interface registration."""
|
|
|
|
|
with patch('RNS.Transport') as mock_transport:
|
|
|
|
|
mock_transport.interfaces = []
|
|
|
|
|
yield mock_transport
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def mock_rns_identity():
|
|
|
|
|
"""Mock RNS.Identity for testing."""
|
|
|
|
|
with patch('RNS.Identity') as mock_identity:
|
|
|
|
|
mock_identity.full_hash = Mock(return_value=b'\x01\x02\x03\x04')
|
|
|
|
|
yield mock_identity
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Common Test Data
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def sample_packet_data():
|
|
|
|
|
"""Sample packet data for testing."""
|
|
|
|
|
return {
|
|
|
|
|
'small': b'Hello, BLE!' * 1, # ~11 bytes
|
|
|
|
|
'medium': b'Hello, BLE!' * 20, # ~220 bytes
|
|
|
|
|
'large': b'Hello, BLE!' * 100, # ~1100 bytes
|
|
|
|
|
'empty': b'',
|
|
|
|
|
'single_byte': b'\x42',
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def sample_configuration():
|
|
|
|
|
"""Sample BLEInterface configuration for testing."""
|
|
|
|
|
return {
|
|
|
|
|
'name': 'TestBLEInterface',
|
|
|
|
|
'enabled': True,
|
|
|
|
|
'service_uuid': '00000001-5824-4f48-9e1a-3b3e8f0c1234',
|
|
|
|
|
'device_name': 'Test-Node',
|
|
|
|
|
'discovery_interval': 5.0,
|
|
|
|
|
'max_connections': 7,
|
|
|
|
|
'min_rssi': -80,
|
|
|
|
|
'connection_timeout': 10.0,
|
|
|
|
|
'power_mode': 'balanced',
|
|
|
|
|
'enable_peripheral': True,
|
|
|
|
|
'connection_rotation_interval': 600,
|
|
|
|
|
'connection_retry_backoff': 60,
|
|
|
|
|
'max_connection_failures': 3,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def sample_discovered_peers():
|
|
|
|
|
"""Sample DiscoveredPeer objects for testing."""
|
|
|
|
|
try:
|
2025-12-29 23:30:07 -05:00
|
|
|
from ble_reticulum.BLEInterface import DiscoveredPeer
|
2025-10-26 19:02:39 -04:00
|
|
|
except ImportError:
|
|
|
|
|
# Create a simple mock DiscoveredPeer for testing
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
class DiscoveredPeer:
|
|
|
|
|
def __init__(self, address, name, rssi):
|
|
|
|
|
self.address = address
|
|
|
|
|
self.name = name
|
|
|
|
|
self.rssi = rssi
|
|
|
|
|
self.first_seen = time.time()
|
|
|
|
|
self.last_seen = time.time()
|
|
|
|
|
self.connection_attempts = 0
|
|
|
|
|
self.successful_connections = 0
|
|
|
|
|
self.failed_connections = 0
|
|
|
|
|
self.last_connection_attempt = 0
|
|
|
|
|
|
|
|
|
|
def update_rssi(self, rssi):
|
|
|
|
|
self.rssi = rssi
|
|
|
|
|
self.last_seen = time.time()
|
|
|
|
|
|
|
|
|
|
def record_connection_attempt(self):
|
|
|
|
|
self.connection_attempts += 1
|
|
|
|
|
self.last_connection_attempt = time.time()
|
|
|
|
|
|
|
|
|
|
def record_connection_success(self):
|
|
|
|
|
self.successful_connections += 1
|
|
|
|
|
|
|
|
|
|
def record_connection_failure(self):
|
|
|
|
|
self.failed_connections += 1
|
|
|
|
|
|
|
|
|
|
def get_success_rate(self):
|
|
|
|
|
if self.connection_attempts == 0:
|
|
|
|
|
return 0.0
|
|
|
|
|
return self.successful_connections / self.connection_attempts
|
|
|
|
|
|
|
|
|
|
peer1 = DiscoveredPeer("AA:BB:CC:DD:EE:01", "Device-1", -50)
|
|
|
|
|
peer2 = DiscoveredPeer("AA:BB:CC:DD:EE:02", "Device-2", -70)
|
|
|
|
|
peer3 = DiscoveredPeer("AA:BB:CC:DD:EE:03", "Device-3", -90)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'strong': peer1,
|
|
|
|
|
'medium': peer2,
|
|
|
|
|
'weak': peer3,
|
|
|
|
|
'all': [peer1, peer2, peer3]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# Helper Functions
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
def create_mock_ble_interface(owner=None, config=None):
|
|
|
|
|
"""
|
|
|
|
|
Create a mock BLEInterface instance for testing.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
owner: Mock RNS owner (optional)
|
|
|
|
|
config: Configuration dict (optional)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Mock BLEInterface with necessary attributes
|
|
|
|
|
"""
|
|
|
|
|
interface = Mock()
|
|
|
|
|
interface.name = config.get('name', 'TestBLE') if config else 'TestBLE'
|
|
|
|
|
interface.online = True
|
|
|
|
|
interface.owner = owner or Mock()
|
|
|
|
|
interface.peers = {}
|
|
|
|
|
interface.spawned_interfaces = {}
|
|
|
|
|
interface.discovered_peers = {}
|
|
|
|
|
interface.connection_blacklist = {}
|
|
|
|
|
interface.fragmenters = {}
|
|
|
|
|
interface.reassemblers = {}
|
fix(ble): Event-driven D-Bus monitoring to eliminate HCI errors on BCM43xx chips (#30)
* fix(ble): Increase D-Bus monitoring intervals to prevent HCI errors
The D-Bus monitoring threads were polling too frequently (0.5s and 30s),
causing HCI command collisions on BCM43xx single-radio chips. These chips
cannot handle concurrent BLE operations, and the frequent D-Bus activity
was interfering with scan/advertise cycles.
Changes:
- Increase D-Bus disconnect monitor interval from 0.5s to 5s
- Increase stale connection poll interval from 30s to 120s
This eliminates HCI errors (Opcode 0x2005/0x2006) while preserving
disconnect detection functionality with slightly higher latency.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* refactor(ble): Convert D-Bus monitoring to event-driven approach
Replace polling-based D-Bus monitoring with true event-driven pattern:
1. D-Bus monitor thread:
- Use asyncio.Event instead of periodic sleep
- Store loop reference for thread-safe shutdown
- Use call_soon_threadsafe to wake loop on stop
2. Stale poll thread:
- Replace busy-wait loop (240 x 0.5s) with single Event.wait()
- Increase interval from 120s to 300s (safety net only)
- Immediate response to stop signal
Benefits:
- Zero CPU usage while waiting (no periodic wakeups)
- Immediate shutdown response (ms instead of 5s)
- Cleaner, simpler code
- Maintains disconnect detection via D-Bus signals
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* test(ble): Add comprehensive unit tests for HCI error fixes
Add 26 new unit tests for the event-driven D-Bus monitoring fixes
that eliminated HCI errors on BCM43xx single-radio chips.
Test coverage:
- TestEventDrivenDBusMonitor: Tests asyncio.Event usage, immediate
wake response, call_soon_threadsafe cross-thread signaling
- TestStalePollImprovements: Tests threading.Event.wait() usage,
300s interval, immediate stop response
- TestStopShutdownBehavior: Tests stop() async signaling, RuntimeError
handling, shutdown latency improvement
- TestIntegrationScenarios: Tests full lifecycle, multiple stop calls
- TestCodeVerification: Verifies actual source patterns match expected
All 26 tests pass without requiring pytest-asyncio plugin.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Use threading.RLock instead of asyncio.Lock in test fixtures
In Python 3.8/3.9, asyncio.Lock() requires a running event loop. When
test_hci_error_fixes.py runs first (alphabetically) and uses asyncio.run(),
it closes the event loop after each test. Subsequent test fixtures that
create asyncio.Lock() then fail with "no current event loop" errors.
Since these are mock fixtures that don't need async semantics, use
threading.RLock() instead.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix(tests): Replace all asyncio.Lock() with threading.RLock() in test mocks
asyncio.Lock() requires a running event loop in Python 3.8/3.9. When
test files using asyncio.run() execute first, the event loop is closed,
causing subsequent test fixtures to fail when creating asyncio.Lock().
Fixed in:
- test_peripheral_disconnect_cleanup.py (mock_gatt_server fixture)
- test_bluez_state_cleanup.py (mock_driver fixture)
- test_ble_peer_interface.py (create_mock_peer_interface helper)
- conftest.py (create_mock_interface helper)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
---------
Co-authored-by: torlando-tech <torlando-tech@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 00:46:06 -05:00
|
|
|
interface.peer_lock = threading.RLock() # Use threading lock for mock
|
|
|
|
|
interface.frag_lock = threading.RLock() # Use threading lock for mock
|
2025-10-26 19:02:39 -04:00
|
|
|
interface.loop = asyncio.get_event_loop()
|
|
|
|
|
interface.max_peers = config.get('max_connections', 7) if config else 7
|
|
|
|
|
interface.min_rssi = config.get('min_rssi', -80) if config else -80
|
|
|
|
|
return interface
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def wait_for_async(coro, timeout=2.0):
|
|
|
|
|
"""
|
|
|
|
|
Helper to wait for an async coroutine in synchronous tests.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
coro: Async coroutine to wait for
|
|
|
|
|
timeout: Maximum time to wait in seconds
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Result of the coroutine
|
|
|
|
|
"""
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
return loop.run_until_complete(asyncio.wait_for(coro, timeout=timeout))
|