179 lines
5.8 KiB
Python
179 lines
5.8 KiB
Python
import os
|
|
import sys
|
|
|
|
import pytest
|
|
|
|
|
|
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
SRC_DIR = os.path.join(REPO_ROOT, "src")
|
|
CPP_BUILD_DIR = os.path.join(REPO_ROOT, "migration", "protocol_core")
|
|
|
|
sys.path.insert(0, SRC_DIR)
|
|
sys.path.insert(0, CPP_BUILD_DIR)
|
|
|
|
cpp = pytest.importorskip(
|
|
"ble_protocol_core_cpp",
|
|
reason=(
|
|
"compiled pybind11 module missing; build with "
|
|
"`python3 migration/protocol_core/setup.py build_ext --inplace`"
|
|
),
|
|
)
|
|
|
|
try:
|
|
from ble_reticulum.BLEInterface import BLEInterface
|
|
except Exception:
|
|
BLEInterface = None
|
|
|
|
REFERENCE_MODE = "real_bleinterface" if BLEInterface is not None else "source_fallback"
|
|
|
|
|
|
def python_interface():
|
|
if BLEInterface is None:
|
|
return None
|
|
return object.__new__(BLEInterface)
|
|
|
|
|
|
def py_compute_identity_hash(peer_identity):
|
|
if BLEInterface is None:
|
|
return peer_identity.hex()[:16]
|
|
return BLEInterface._compute_identity_hash(python_interface(), peer_identity)
|
|
|
|
|
|
def py_get_fragmenter_key(peer_identity, peer_address):
|
|
if BLEInterface is None:
|
|
return peer_identity.hex()
|
|
return BLEInterface._get_fragmenter_key(
|
|
python_interface(), peer_identity, peer_address
|
|
)
|
|
|
|
|
|
def test_reference_mode_is_explicit():
|
|
assert REFERENCE_MODE in {"real_bleinterface", "source_fallback"}
|
|
|
|
|
|
def test_real_bleinterface_reference_when_importable():
|
|
if BLEInterface is None:
|
|
pytest.skip("BLEInterface dependencies are not importable; using source fallback")
|
|
|
|
assert REFERENCE_MODE == "real_bleinterface"
|
|
assert python_interface() is not None
|
|
|
|
|
|
def test_source_fallback_matches_current_helper_logic_when_bleinterface_unavailable():
|
|
if BLEInterface is not None:
|
|
pytest.skip("BLEInterface importable; source fallback not used")
|
|
|
|
assert REFERENCE_MODE == "source_fallback"
|
|
interface_path = os.path.join(SRC_DIR, "ble_reticulum", "BLEInterface.py")
|
|
with open(interface_path, "r", encoding="utf-8") as handle:
|
|
source = handle.read()
|
|
assert "return peer_identity.hex()" in source
|
|
assert "return peer_identity.hex()[:16]" in source
|
|
|
|
|
|
def assert_same_exception(py_callable, cpp_callable):
|
|
with pytest.raises(Exception) as py_exc:
|
|
py_callable()
|
|
with pytest.raises(Exception) as cpp_exc:
|
|
cpp_callable()
|
|
|
|
assert type(cpp_exc.value) is type(py_exc.value)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"identity",
|
|
[
|
|
bytes.fromhex("00112233445566778899aabbccddeeff"),
|
|
b"\x00" * 16,
|
|
b"\xff" * 16,
|
|
bytes(range(16)),
|
|
],
|
|
)
|
|
def test_normal_16_byte_identities(identity):
|
|
assert cpp.compute_identity_hash(identity) == py_compute_identity_hash(identity)
|
|
assert cpp.get_fragmenter_key(identity, "AA:BB:CC:DD:EE:FF") == py_get_fragmenter_key(
|
|
identity, "AA:BB:CC:DD:EE:FF"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("identity", [b"", b"\x01", b"\x01\x23\x45\x67\x89\xab\xcd"])
|
|
def test_shorter_identities(identity):
|
|
assert cpp.compute_identity_hash(identity) == py_compute_identity_hash(identity)
|
|
assert cpp.get_fragmenter_key(identity, None) == py_get_fragmenter_key(identity, None)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"identity",
|
|
[
|
|
bytes(range(17)),
|
|
bytes(range(32)),
|
|
b"\x10\x20\x30\x40\x50\x60\x70\x80" + b"\x99" * 64,
|
|
],
|
|
)
|
|
def test_longer_identities(identity):
|
|
assert cpp.compute_identity_hash(identity) == py_compute_identity_hash(identity)
|
|
assert cpp.get_fragmenter_key(identity, "") == py_get_fragmenter_key(identity, "")
|
|
|
|
|
|
@pytest.mark.parametrize("identity", [None, "00112233", 1234, object()])
|
|
def test_none_or_invalid_identity_values(identity):
|
|
assert_same_exception(
|
|
lambda: py_compute_identity_hash(identity),
|
|
lambda: cpp.compute_identity_hash(identity),
|
|
)
|
|
assert_same_exception(
|
|
lambda: py_get_fragmenter_key(identity, "AA:BB"),
|
|
lambda: cpp.get_fragmenter_key(identity, "AA:BB"),
|
|
)
|
|
|
|
|
|
def test_peer_address_ignored_for_fragmenter_key():
|
|
identity = bytes.fromhex("00112233445566778899aabbccddeeff")
|
|
addresses = [
|
|
None,
|
|
"",
|
|
"AA:BB:CC:DD:EE:FF",
|
|
"11:22:33:44:55:66",
|
|
object(),
|
|
]
|
|
|
|
expected = py_get_fragmenter_key(identity, addresses[0])
|
|
for address in addresses:
|
|
assert py_get_fragmenter_key(identity, address) == expected
|
|
assert cpp.get_fragmenter_key(identity, address) == expected
|
|
|
|
|
|
def test_stable_lowercase_hex_formatting():
|
|
identity = bytes.fromhex("ABCDEF0123456789ABCDEF0123456789")
|
|
|
|
assert py_compute_identity_hash(identity) == "abcdef0123456789"
|
|
assert cpp.compute_identity_hash(identity) == "abcdef0123456789"
|
|
assert py_get_fragmenter_key(identity, "ignored") == identity.hex()
|
|
assert cpp.get_fragmenter_key(identity, "ignored") == identity.hex()
|
|
assert cpp.compute_identity_hash(identity).islower()
|
|
assert cpp.get_fragmenter_key(identity, "ignored").islower()
|
|
|
|
|
|
def test_collision_sensitive_truncated_hash_edge_case():
|
|
shared_prefix = bytes.fromhex("0011223344556677")
|
|
identity_a = shared_prefix + bytes.fromhex("0000000000000000")
|
|
identity_b = shared_prefix + bytes.fromhex("ffffffffffffffff")
|
|
|
|
assert py_compute_identity_hash(identity_a) == py_compute_identity_hash(identity_b)
|
|
assert cpp.compute_identity_hash(identity_a) == cpp.compute_identity_hash(identity_b)
|
|
|
|
assert py_get_fragmenter_key(identity_a, "addr1") != py_get_fragmenter_key(
|
|
identity_b, "addr2"
|
|
)
|
|
assert cpp.get_fragmenter_key(identity_a, "addr1") != cpp.get_fragmenter_key(
|
|
identity_b, "addr2"
|
|
)
|
|
|
|
|
|
def test_bytearray_matches_current_python_hex_behavior():
|
|
identity = bytearray(bytes.fromhex("00112233445566778899aabbccddeeff"))
|
|
|
|
assert cpp.compute_identity_hash(identity) == py_compute_identity_hash(identity)
|
|
assert cpp.get_fragmenter_key(identity, "ignored") == py_get_fragmenter_key(
|
|
identity, "ignored"
|
|
)
|