First Phase -- missed including these files in the prior commit

This commit is contained in:
John Poole 2026-05-16 17:21:02 -07:00
commit 3f3f3668f1
7 changed files with 882 additions and 0 deletions

View file

@ -0,0 +1,489 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <algorithm>
#include <cstdint>
#include <map>
#include <stdexcept>
#include <string>
#include <tuple>
#include <vector>
namespace py = pybind11;
namespace {
using Bytes = std::string;
constexpr uint8_t TYPE_START = 0x01;
constexpr uint8_t TYPE_CONTINUE = 0x02;
constexpr uint8_t TYPE_END = 0x03;
constexpr size_t HEADER_SIZE = 5;
double now_seconds() {
py::object time = py::module_::import("time").attr("time");
return time().cast<double>();
}
Bytes py_bytes_to_string(const py::bytes &value) {
return value.cast<std::string>();
}
py::bytes string_to_py_bytes(const Bytes &value) {
return py::bytes(value);
}
struct Buffer {
std::map<uint16_t, Bytes> fragments;
uint16_t total = 0;
double start_time = 0.0;
std::string sender_id;
};
struct BufferKey {
std::string sender_id;
uint16_t packet_index = 0;
uint16_t total = 0;
bool operator<(const BufferKey &other) const {
return std::tie(sender_id, packet_index, total) <
std::tie(other.sender_id, other.packet_index, other.total);
}
};
} // namespace
class BLEFragmenterCpp {
public:
static constexpr uint8_t TYPE_START_VALUE = TYPE_START;
static constexpr uint8_t TYPE_CONTINUE_VALUE = TYPE_CONTINUE;
static constexpr uint8_t TYPE_END_VALUE = TYPE_END;
static constexpr size_t HEADER_SIZE_VALUE = HEADER_SIZE;
explicit BLEFragmenterCpp(int mtu_value = 185) {
mtu = std::max(mtu_value, 20);
payload_size = mtu - static_cast<int>(HEADER_SIZE);
if (payload_size < 1) {
throw py::value_error("MTU " + std::to_string(mtu_value) +
" too small for fragmentation (min " +
std::to_string(HEADER_SIZE + 1) + ")");
}
}
std::vector<py::bytes> fragment_packet(const py::bytes &packet_obj) const {
Bytes packet = py_bytes_to_string(packet_obj);
if (packet.empty()) {
throw py::value_error("Cannot fragment empty packet");
}
const size_t packet_size = packet.size();
const size_t num_fragments =
(packet_size + static_cast<size_t>(payload_size) - 1) /
static_cast<size_t>(payload_size);
if (num_fragments > 65535) {
throw py::value_error(
"Packet requires " + std::to_string(num_fragments) +
" fragments, exceeds max (65535). Packet size too large for BLE MTU " +
std::to_string(mtu) + ". Maximum supported: " +
std::to_string(65535 * payload_size) + " bytes");
}
std::vector<py::bytes> fragments;
fragments.reserve(num_fragments);
for (size_t i = 0; i < num_fragments; ++i) {
uint8_t frag_type = TYPE_CONTINUE;
if (i == 0) {
frag_type = TYPE_START;
} else if (i == num_fragments - 1) {
frag_type = TYPE_END;
}
const size_t start_idx = i * static_cast<size_t>(payload_size);
const size_t end_idx =
std::min(start_idx + static_cast<size_t>(payload_size), packet_size);
const Bytes data = packet.substr(start_idx, end_idx - start_idx);
Bytes fragment;
fragment.reserve(HEADER_SIZE + data.size());
fragment.push_back(static_cast<char>(frag_type));
fragment.push_back(static_cast<char>((i >> 8) & 0xff));
fragment.push_back(static_cast<char>(i & 0xff));
fragment.push_back(static_cast<char>((num_fragments >> 8) & 0xff));
fragment.push_back(static_cast<char>(num_fragments & 0xff));
fragment.append(data);
fragments.emplace_back(string_to_py_bytes(fragment));
}
return fragments;
}
py::tuple get_fragment_overhead(size_t packet_size) const {
const size_t num_fragments =
(packet_size + static_cast<size_t>(payload_size) - 1) /
static_cast<size_t>(payload_size);
const size_t overhead_bytes = num_fragments * HEADER_SIZE;
const double overhead_pct =
packet_size > 0 ? (static_cast<double>(overhead_bytes) /
static_cast<double>(packet_size)) *
100.0
: 0.0;
return py::make_tuple(num_fragments, overhead_bytes, overhead_pct);
}
int mtu = 185;
int payload_size = 180;
};
class BLEReassemblerCpp {
public:
static constexpr double DEFAULT_TIMEOUT_VALUE = 30.0;
explicit BLEReassemblerCpp(py::object timeout_obj = py::none()) {
if (timeout_obj.is_none()) {
timeout = DEFAULT_TIMEOUT_VALUE;
} else {
timeout = timeout_obj.cast<double>();
}
}
py::object receive_fragment(const py::bytes &fragment_obj,
py::object sender_obj = py::none()) {
const Bytes fragment = py_bytes_to_string(fragment_obj);
if (fragment.size() < HEADER_SIZE) {
throw py::value_error("Fragment too short: " +
std::to_string(fragment.size()) +
" bytes (min " + std::to_string(HEADER_SIZE) + ")");
}
std::string sender_id = "default";
if (!sender_obj.is_none()) {
sender_id = py::str(sender_obj).cast<std::string>();
}
fragments_received += 1;
const uint8_t frag_type = static_cast<uint8_t>(fragment[0]);
const uint16_t sequence =
(static_cast<uint16_t>(static_cast<uint8_t>(fragment[1])) << 8) |
static_cast<uint16_t>(static_cast<uint8_t>(fragment[2]));
const uint16_t total =
(static_cast<uint16_t>(static_cast<uint8_t>(fragment[3])) << 8) |
static_cast<uint16_t>(static_cast<uint8_t>(fragment[4]));
const Bytes data = fragment.substr(HEADER_SIZE);
if (frag_type != TYPE_START && frag_type != TYPE_CONTINUE &&
frag_type != TYPE_END) {
char buffer[64];
snprintf(buffer, sizeof(buffer), "Invalid fragment type: 0x%02x",
frag_type);
throw py::value_error(buffer);
}
if (sequence >= total) {
throw py::value_error("Invalid sequence " + std::to_string(sequence) +
" >= total " + std::to_string(total));
}
if (total == 0) {
throw py::value_error("Total fragments cannot be zero");
}
BufferKey packet_key{sender_id, static_cast<uint16_t>(sequence / total),
total};
if (sequence == 0) {
Buffer buffer;
buffer.fragments[sequence] = data;
buffer.total = total;
buffer.start_time = now_seconds();
buffer.sender_id = sender_id;
reassembly_buffers[packet_key] = buffer;
} else {
bool found = false;
BufferKey buffer_key;
const double current_time = now_seconds();
for (const auto &entry : reassembly_buffers) {
if (entry.first.sender_id == sender_id &&
entry.second.total == total &&
current_time - entry.second.start_time < timeout) {
buffer_key = entry.first;
found = true;
break;
}
}
if (!found) {
if (reassembly_buffers.find(packet_key) == reassembly_buffers.end()) {
Buffer buffer;
buffer.total = total;
buffer.start_time = now_seconds();
buffer.sender_id = sender_id;
reassembly_buffers[packet_key] = buffer;
}
Buffer &buffer = reassembly_buffers[packet_key];
if (buffer.fragments.find(sequence) != buffer.fragments.end()) {
check_duplicate_or_throw(packet_key, sequence, data);
return py::none();
}
buffer.fragments[sequence] = data;
return py::none();
}
packet_key = buffer_key;
Buffer &buffer = reassembly_buffers[packet_key];
if (buffer.total != total) {
reassembly_buffers.erase(packet_key);
throw py::value_error("Fragment total mismatch for " + sender_id +
": expected " +
std::to_string(buffer.total) + ", got " +
std::to_string(total));
}
if (buffer.fragments.find(sequence) != buffer.fragments.end()) {
check_duplicate_or_throw(packet_key, sequence, data);
return py::none();
}
buffer.fragments[sequence] = data;
}
Buffer &buffer = reassembly_buffers[packet_key];
if (buffer.fragments.size() == total) {
for (uint16_t i = 0; i < total; ++i) {
if (buffer.fragments.find(i) == buffer.fragments.end()) {
return py::none();
}
}
Bytes packet = reassemble(buffer);
reassembly_buffers.erase(packet_key);
packets_reassembled += 1;
return string_to_py_bytes(packet);
}
return py::none();
}
int cleanup_stale_buffers() {
const double current_time = now_seconds();
std::vector<BufferKey> stale_keys;
for (const auto &entry : reassembly_buffers) {
if (current_time - entry.second.start_time > timeout) {
stale_keys.push_back(entry.first);
}
}
for (const auto &key : stale_keys) {
reassembly_buffers.erase(key);
packets_timeout += 1;
}
return static_cast<int>(stale_keys.size());
}
py::bytes _reassemble(py::dict py_buffer) const {
if (!py_buffer.contains("fragments") || !py_buffer.contains("total")) {
throw py::key_error("fragments");
}
py::dict py_fragments = py_buffer["fragments"].cast<py::dict>();
const uint16_t total = py_buffer["total"].cast<uint16_t>();
Bytes packet;
for (uint16_t i = 0; i < total; ++i) {
py::object key = py::int_(i);
if (!py_fragments.contains(key)) {
throw py::value_error("Missing fragment " + std::to_string(i) +
" during reassembly");
}
packet.append(py_fragments[key].cast<std::string>());
}
return string_to_py_bytes(packet);
}
py::dict get_statistics() const {
py::dict stats;
stats["packets_reassembled"] = packets_reassembled;
stats["packets_timeout"] = packets_timeout;
stats["fragments_received"] = fragments_received;
stats["pending_packets"] = reassembly_buffers.size();
return stats;
}
void reset_statistics() {
packets_reassembled = 0;
packets_timeout = 0;
fragments_received = 0;
}
py::dict buffers_as_dict() const {
py::dict result;
for (const auto &entry : reassembly_buffers) {
py::tuple key =
py::make_tuple(entry.first.sender_id, entry.first.packet_index,
entry.first.total);
py::dict value;
py::dict fragments;
for (const auto &frag : entry.second.fragments) {
fragments[py::int_(frag.first)] = string_to_py_bytes(frag.second);
}
value["fragments"] = fragments;
value["total"] = entry.second.total;
value["start_time"] = entry.second.start_time;
value["sender_id"] = entry.second.sender_id;
result[key] = value;
}
return result;
}
double timeout = DEFAULT_TIMEOUT_VALUE;
int packets_reassembled = 0;
int packets_timeout = 0;
int fragments_received = 0;
private:
void check_duplicate_or_throw(const BufferKey &packet_key, uint16_t sequence,
const Bytes &data) {
Buffer &buffer = reassembly_buffers[packet_key];
const Bytes &existing_data = buffer.fragments[sequence];
if (existing_data == data) {
return;
}
reassembly_buffers.erase(packet_key);
throw py::value_error("Fragment " + std::to_string(sequence) + " from " +
packet_key.sender_id +
" received twice with different data! Possible corruption.");
}
static Bytes reassemble(const Buffer &buffer) {
Bytes packet;
for (uint16_t i = 0; i < buffer.total; ++i) {
auto iter = buffer.fragments.find(i);
if (iter == buffer.fragments.end()) {
throw py::value_error("Missing fragment " + std::to_string(i) +
" during reassembly");
}
packet.append(iter->second);
}
return packet;
}
std::map<BufferKey, Buffer> reassembly_buffers;
};
class HDLCFramerCpp {
public:
static constexpr uint8_t FLAG_VALUE = 0x7E;
static constexpr uint8_t ESCAPE_VALUE = 0x7D;
static constexpr uint8_t ESCAPE_XOR_VALUE = 0x20;
static py::bytes frame_packet(const py::bytes &packet_obj) {
const Bytes packet = py_bytes_to_string(packet_obj);
Bytes stuffed;
stuffed.reserve(packet.size() + 2);
for (unsigned char byte : packet) {
if (byte == FLAG_VALUE || byte == ESCAPE_VALUE) {
stuffed.push_back(static_cast<char>(ESCAPE_VALUE));
stuffed.push_back(static_cast<char>(byte ^ ESCAPE_XOR_VALUE));
} else {
stuffed.push_back(static_cast<char>(byte));
}
}
Bytes frame;
frame.reserve(stuffed.size() + 2);
frame.push_back(static_cast<char>(FLAG_VALUE));
frame.append(stuffed);
frame.push_back(static_cast<char>(FLAG_VALUE));
return string_to_py_bytes(frame);
}
static py::bytes deframe_packet(const py::bytes &frame_obj) {
const Bytes frame = py_bytes_to_string(frame_obj);
if (frame.size() < 2) {
throw py::value_error("Frame too short (minimum 2 bytes for delimiters)");
}
if (static_cast<uint8_t>(frame.front()) != FLAG_VALUE ||
static_cast<uint8_t>(frame.back()) != FLAG_VALUE) {
throw py::value_error("Invalid frame: missing FLAG delimiters");
}
Bytes unstuffed;
bool escape_next = false;
for (size_t i = 1; i + 1 < frame.size(); ++i) {
const uint8_t byte = static_cast<uint8_t>(frame[i]);
if (escape_next) {
unstuffed.push_back(static_cast<char>(byte ^ ESCAPE_XOR_VALUE));
escape_next = false;
} else if (byte == ESCAPE_VALUE) {
escape_next = true;
} else if (byte == FLAG_VALUE) {
throw py::value_error("Unexpected FLAG in frame data");
} else {
unstuffed.push_back(static_cast<char>(byte));
}
}
if (escape_next) {
throw py::value_error("Frame ends with ESCAPE character");
}
return string_to_py_bytes(unstuffed);
}
};
PYBIND11_MODULE(ble_protocol_core_cpp, m) {
m.doc() = "C++ protocol-core implementation for BLE Reticulum fragmentation";
py::class_<BLEFragmenterCpp>(m, "BLEFragmenter")
.def(py::init<int>(), py::arg("mtu") = 185)
.def("fragment_packet", &BLEFragmenterCpp::fragment_packet,
py::arg("packet"))
.def("get_fragment_overhead", &BLEFragmenterCpp::get_fragment_overhead,
py::arg("packet_size"))
.def_readwrite("mtu", &BLEFragmenterCpp::mtu)
.def_readwrite("payload_size", &BLEFragmenterCpp::payload_size)
.def_readonly_static("TYPE_START", &BLEFragmenterCpp::TYPE_START_VALUE)
.def_readonly_static("TYPE_CONTINUE",
&BLEFragmenterCpp::TYPE_CONTINUE_VALUE)
.def_readonly_static("TYPE_END", &BLEFragmenterCpp::TYPE_END_VALUE)
.def_readonly_static("HEADER_SIZE", &BLEFragmenterCpp::HEADER_SIZE_VALUE);
py::class_<BLEReassemblerCpp>(m, "BLEReassembler")
.def(py::init<py::object>(), py::arg("timeout") = py::none())
.def("receive_fragment", &BLEReassemblerCpp::receive_fragment,
py::arg("fragment"), py::arg("sender_id") = py::none())
.def("_reassemble", &BLEReassemblerCpp::_reassemble, py::arg("buffer"))
.def("cleanup_stale_buffers", &BLEReassemblerCpp::cleanup_stale_buffers)
.def("get_statistics", &BLEReassemblerCpp::get_statistics)
.def("reset_statistics", &BLEReassemblerCpp::reset_statistics)
.def_readwrite("timeout", &BLEReassemblerCpp::timeout)
.def_readwrite("packets_reassembled",
&BLEReassemblerCpp::packets_reassembled)
.def_readwrite("packets_timeout", &BLEReassemblerCpp::packets_timeout)
.def_readwrite("fragments_received",
&BLEReassemblerCpp::fragments_received)
.def_property_readonly("reassembly_buffers",
&BLEReassemblerCpp::buffers_as_dict)
.def_readonly_static("DEFAULT_TIMEOUT",
&BLEReassemblerCpp::DEFAULT_TIMEOUT_VALUE);
py::class_<HDLCFramerCpp>(m, "HDLCFramer")
.def(py::init<>())
.def_static("frame_packet", &HDLCFramerCpp::frame_packet,
py::arg("packet"))
.def_static("deframe_packet", &HDLCFramerCpp::deframe_packet,
py::arg("frame"))
.def_readonly_static("FLAG", &HDLCFramerCpp::FLAG_VALUE)
.def_readonly_static("ESCAPE", &HDLCFramerCpp::ESCAPE_VALUE)
.def_readonly_static("ESCAPE_XOR", &HDLCFramerCpp::ESCAPE_XOR_VALUE);
}

View file

@ -0,0 +1,22 @@
from pathlib import Path
from pybind11.setup_helpers import Pybind11Extension, build_ext
from setuptools import setup
ROOT = Path(__file__).resolve().parent
setup(
name="ble-protocol-core-cpp",
version="0.0.1",
ext_modules=[
Pybind11Extension(
"ble_protocol_core_cpp",
[str(ROOT / "ble_protocol_core.cpp")],
cxx_std=17,
)
],
cmdclass={"build_ext": build_ext},
zip_safe=False,
)

View file

@ -0,0 +1,45 @@
# C++ Protocol Core Fragmentation Equivalence Test
Date: 2026-05-16
Host: jp
Python environment: rnsenv
Repository: /usr/local/src/ble-reticulum
## Command
```bash
cd /usr/local/src/ble-reticulum
python3 -m pytest migration/tests/test_fragmentation_cpp_equivalence.py -vv
Result
28 tests passed.
Coverage Summary
The C++ implementation was compared against the existing Python implementation for:
BLEFragmenter single-fragment packets
BLEFragmenter multi-fragment packets
MTU boundary sizes: 20, 23, 50, 185
Empty and non-bytes packet errors
BLEReassembler single-fragment reassembly
BLEReassembler multi-fragment reassembly
Out-of-order fragments with start first
Malformed fragments
Duplicate fragments with same data
Duplicate fragments with different data
Stale buffer cleanup
Statistics reset
Internal reassemble behavior
HDLC frame/deframe round trips
Full byte range HDLC round trip
Many HDLC round trips
Invalid HDLC escape sequences and frames
Non-bytes HDLC errors
Raw Result
28 passed, 2 warnings in 0.38s.
Interpretation
This establishes phase-1 behavioral equivalence for the protocol-core fragmentation/reassembly layer. The C++ code is not yet integrated into BLEInterface.py for live Reticulum/BLE traffic.

View file

@ -0,0 +1,5 @@
-- Do-not-port-yet list
SELECT source_file, class_name, symbol_name, tag, rationale
FROM symbols
WHERE tag IN ('GLUE', 'PLATFORM')
ORDER BY tag, source_file, line_number;

View file

@ -0,0 +1,6 @@
-- Things needing your review
SELECT source_file, class_name, symbol_name, line_number, tag, rationale
FROM symbols
WHERE status = 'NEEDS_REVIEW'
OR tag = 'UNKNOWN'
ORDER BY source_file, line_number;

View file

@ -0,0 +1,6 @@
-- Phase-1 C++ candidates
SELECT source_file, class_name, symbol_name, line_number, rationale
FROM symbols
WHERE tag = 'CORE'
AND cpp_candidate = 1
ORDER BY source_file, line_number;

View file

@ -0,0 +1,309 @@
import os
import sys
import time
import pytest
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
SRC_DIR = os.path.join(REPO_ROOT, "src")
CPP_BUILD_DIR = os.path.join(REPO_ROOT, "migration", "protocol_core")
sys.path.insert(0, SRC_DIR)
sys.path.insert(0, CPP_BUILD_DIR)
from ble_reticulum.BLEFragmentation import ( # noqa: E402
BLEFragmenter as PyBLEFragmenter,
)
from ble_reticulum.BLEFragmentation import ( # noqa: E402
BLEReassembler as PyBLEReassembler,
)
from ble_reticulum.BLEFragmentation import HDLCFramer as PyHDLCFramer # noqa: E402
cpp = pytest.importorskip(
"ble_protocol_core_cpp",
reason=(
"compiled pybind11 module missing; build with "
"`python3 migration/protocol_core/setup.py build_ext --inplace`"
),
)
CppBLEFragmenter = cpp.BLEFragmenter
CppBLEReassembler = cpp.BLEReassembler
CppHDLCFramer = cpp.HDLCFramer
def assert_same_exception(py_callable, cpp_callable):
with pytest.raises(Exception) as py_exc:
py_callable()
with pytest.raises(Exception) as cpp_exc:
cpp_callable()
assert type(cpp_exc.value) is type(py_exc.value)
assert str(cpp_exc.value) == str(py_exc.value)
def compare_fragmenter(mtu, packet):
py_fragmenter = PyBLEFragmenter(mtu=mtu)
cpp_fragmenter = CppBLEFragmenter(mtu=mtu)
py_fragments = py_fragmenter.fragment_packet(packet)
cpp_fragments = cpp_fragmenter.fragment_packet(packet)
assert cpp_fragmenter.mtu == py_fragmenter.mtu
assert cpp_fragmenter.payload_size == py_fragmenter.payload_size
assert cpp_fragments == py_fragments
assert cpp_fragmenter.get_fragment_overhead(len(packet)) == py_fragmenter.get_fragment_overhead(
len(packet)
)
return py_fragments, cpp_fragments
def compare_reassembly(mtu, packet, order=None, sender_id="device1"):
py_fragments, cpp_fragments = compare_fragmenter(mtu, packet)
py_reassembler = PyBLEReassembler()
cpp_reassembler = CppBLEReassembler()
if order is None:
order = list(range(len(py_fragments)))
py_result = None
cpp_result = None
for index in order:
py_result = py_reassembler.receive_fragment(py_fragments[index], sender_id)
cpp_result = cpp_reassembler.receive_fragment(cpp_fragments[index], sender_id)
assert cpp_result == py_result
assert py_result == packet
assert cpp_result == packet
assert cpp_reassembler.get_statistics() == py_reassembler.get_statistics()
class TestBLEFragmenterCppEquivalence:
def test_single_fragment_packets(self):
for mtu, packet in [
(185, b"Hello, Reticulum!"),
(20, b"A"),
(50, bytes(range(10))),
]:
py_fragments, cpp_fragments = compare_fragmenter(mtu, packet)
assert len(py_fragments) == 1
assert len(cpp_fragments) == 1
def test_multi_fragment_packets(self):
for mtu, packet in [
(185, b"A" * 500),
(100, b"B" * 300),
(20, bytes(range(256))),
]:
py_fragments, cpp_fragments = compare_fragmenter(mtu, packet)
assert len(py_fragments) > 1
assert len(cpp_fragments) > 1
compare_reassembly(mtu, packet)
@pytest.mark.parametrize("mtu", [20, 23, 50, 185])
def test_mtu_boundary_sizes(self, mtu):
payload_size = max(mtu, 20) - PyBLEFragmenter.HEADER_SIZE
sizes = [
1,
payload_size - 1,
payload_size,
payload_size + 1,
payload_size * 2,
payload_size * 2 + 1,
]
for size in sizes:
if size <= 0:
continue
packet = bytes((i % 251 for i in range(size)))
compare_reassembly(mtu, packet)
def test_empty_and_non_bytes_packet_errors(self):
assert_same_exception(
lambda: PyBLEFragmenter().fragment_packet(b""),
lambda: CppBLEFragmenter().fragment_packet(b""),
)
with pytest.raises(TypeError):
CppBLEFragmenter().fragment_packet("not bytes")
with pytest.raises(TypeError):
PyBLEFragmenter().fragment_packet("not bytes")
class TestBLEReassemblerCppEquivalence:
def test_single_fragment_reassembly(self):
compare_reassembly(185, b"Short message")
def test_multi_fragment_reassembly(self):
compare_reassembly(100, b"E" * 300)
def test_out_of_order_fragments_with_start_first(self):
packet = b"F" * 150
py_fragments, _ = compare_fragmenter(50, packet)
assert len(py_fragments) == 4
compare_reassembly(50, packet, order=[0, 2, 1, 3])
def test_malformed_fragments(self):
cases = [
b"\x01\x00",
b"\xff\x00\x00\x00\x01payload",
b"\x01\x00\x01\x00\x01payload",
b"\x01\x00\x00\x00\x00payload",
]
for fragment in cases:
py_reassembler = PyBLEReassembler()
cpp_reassembler = CppBLEReassembler()
assert_same_exception(
lambda fragment=fragment: py_reassembler.receive_fragment(fragment, "device1"),
lambda fragment=fragment: cpp_reassembler.receive_fragment(fragment, "device1"),
)
def test_duplicate_fragments_same_data(self):
packet = b"D" * 160
py_fragments, cpp_fragments = compare_fragmenter(50, packet)
py_reassembler = PyBLEReassembler()
cpp_reassembler = CppBLEReassembler()
assert py_reassembler.receive_fragment(py_fragments[0], "device1") is None
assert cpp_reassembler.receive_fragment(cpp_fragments[0], "device1") is None
assert py_reassembler.receive_fragment(py_fragments[1], "device1") is None
assert cpp_reassembler.receive_fragment(cpp_fragments[1], "device1") is None
assert py_reassembler.receive_fragment(py_fragments[1], "device1") is None
assert cpp_reassembler.receive_fragment(cpp_fragments[1], "device1") is None
py_result = None
cpp_result = None
for index in range(2, len(py_fragments)):
py_result = py_reassembler.receive_fragment(py_fragments[index], "device1")
cpp_result = cpp_reassembler.receive_fragment(cpp_fragments[index], "device1")
assert cpp_result == py_result
assert py_result == packet
assert cpp_result == packet
def test_duplicate_fragments_different_data(self):
packet = b"Q" * 160
py_fragments, cpp_fragments = compare_fragmenter(50, packet)
py_reassembler = PyBLEReassembler()
cpp_reassembler = CppBLEReassembler()
py_reassembler.receive_fragment(py_fragments[0], "device1")
cpp_reassembler.receive_fragment(cpp_fragments[0], "device1")
py_reassembler.receive_fragment(py_fragments[1], "device1")
cpp_reassembler.receive_fragment(cpp_fragments[1], "device1")
py_bad = bytearray(py_fragments[1])
cpp_bad = bytearray(cpp_fragments[1])
py_bad[-1] ^= 0x01
cpp_bad[-1] ^= 0x01
assert_same_exception(
lambda: py_reassembler.receive_fragment(bytes(py_bad), "device1"),
lambda: cpp_reassembler.receive_fragment(bytes(cpp_bad), "device1"),
)
assert len(cpp_reassembler.reassembly_buffers) == len(py_reassembler.reassembly_buffers)
def test_stale_buffer_cleanup(self):
packet = b"G" * 300
py_fragments, cpp_fragments = compare_fragmenter(100, packet)
py_reassembler = PyBLEReassembler(timeout=0.1)
cpp_reassembler = CppBLEReassembler(timeout=0.1)
assert py_reassembler.receive_fragment(py_fragments[0], "device1") is None
assert cpp_reassembler.receive_fragment(cpp_fragments[0], "device1") is None
assert len(cpp_reassembler.reassembly_buffers) == len(py_reassembler.reassembly_buffers)
time.sleep(0.2)
assert cpp_reassembler.cleanup_stale_buffers() == py_reassembler.cleanup_stale_buffers()
assert len(cpp_reassembler.reassembly_buffers) == len(py_reassembler.reassembly_buffers)
assert cpp_reassembler.get_statistics() == py_reassembler.get_statistics()
def test_statistics_reset(self):
packet = b"H" * 300
py_fragments, cpp_fragments = compare_fragmenter(100, packet)
py_reassembler = PyBLEReassembler()
cpp_reassembler = CppBLEReassembler()
for py_fragment, cpp_fragment in zip(py_fragments, cpp_fragments):
assert cpp_reassembler.receive_fragment(cpp_fragment, "device1") == py_reassembler.receive_fragment(
py_fragment, "device1"
)
assert cpp_reassembler.get_statistics() == py_reassembler.get_statistics()
py_reassembler.reset_statistics()
cpp_reassembler.reset_statistics()
assert cpp_reassembler.get_statistics() == py_reassembler.get_statistics()
def test_internal_reassemble_method_matches_python(self):
py_reassembler = PyBLEReassembler()
cpp_reassembler = CppBLEReassembler()
buffer = {"fragments": {0: b"abc", 1: b"def", 2: b"ghi"}, "total": 3}
assert cpp_reassembler._reassemble(buffer) == py_reassembler._reassemble(buffer)
malformed = {"fragments": {0: b"abc", 2: b"ghi"}, "total": 3}
assert_same_exception(
lambda: py_reassembler._reassemble(malformed),
lambda: cpp_reassembler._reassemble(malformed),
)
class TestHDLCFramerCppEquivalence:
@pytest.mark.parametrize(
"packet",
[
b"",
b"Hello, World!",
bytes([0x7E, 0x01, 0x7E]),
bytes([0x7D, 0x02, 0x7D]),
bytes(range(256)),
],
)
def test_frame_deframe_round_trips(self, packet):
py_framed = PyHDLCFramer.frame_packet(packet)
cpp_framed = CppHDLCFramer.frame_packet(packet)
assert cpp_framed == py_framed
assert CppHDLCFramer.deframe_packet(cpp_framed) == PyHDLCFramer.deframe_packet(
py_framed
)
assert CppHDLCFramer.deframe_packet(cpp_framed) == packet
def test_many_hdlc_round_trips(self):
for value in range(256):
packet = bytes([value] * 10)
py_framed = PyHDLCFramer.frame_packet(packet)
cpp_framed = CppHDLCFramer.frame_packet(packet)
assert cpp_framed == py_framed
assert CppHDLCFramer.deframe_packet(cpp_framed) == packet
@pytest.mark.parametrize(
"frame",
[
b"",
b"\x7e",
b"missing-flags",
bytes([PyHDLCFramer.FLAG, 0x01, PyHDLCFramer.FLAG, PyHDLCFramer.FLAG]),
bytes([PyHDLCFramer.FLAG, PyHDLCFramer.ESCAPE, PyHDLCFramer.FLAG]),
],
)
def test_invalid_hdlc_escape_sequences_and_frames(self, frame):
assert_same_exception(
lambda frame=frame: PyHDLCFramer.deframe_packet(frame),
lambda frame=frame: CppHDLCFramer.deframe_packet(frame),
)
def test_non_bytes_errors(self):
with pytest.raises(TypeError):
PyHDLCFramer.frame_packet("not bytes")
with pytest.raises(TypeError):
CppHDLCFramer.frame_packet("not bytes")
with pytest.raises(TypeError):
PyHDLCFramer.deframe_packet("not bytes")
with pytest.raises(TypeError):
CppHDLCFramer.deframe_packet("not bytes")