--message-chunk-size is now treated as a requested maximum. If the requested value is too large for the Reticulum link budget, the program caps it and logs that it did so. The cap accounts for file metadata and send_epoch.
591 lines
18 KiB
Python
Executable file
591 lines
18 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Two-node BLE Reticulum echo proof.
|
|
|
|
Goal:
|
|
Run the same program on zerodev1 and zerodev2. Each node:
|
|
- derives its node name from hostname unless --name is supplied
|
|
- loads or creates a stable Reticulum app identity
|
|
- creates one inbound destination
|
|
- announces that destination over configured Reticulum interfaces
|
|
- optionally connects to a peer destination hash
|
|
- prints every message received over a Reticulum link
|
|
|
|
After cloning the SD card:
|
|
1. Change zerodev2 hostname.
|
|
2. Remove cloned Reticulum transport state on zerodev2.
|
|
3. Use a unique app identity file per host, or delete the cloned one.
|
|
4. Choose BLE mode at runtime, or configure BLEInterface in Reticulum:
|
|
--ble-role peripheral # advertise only, accepts incoming BLE connections
|
|
--ble-role central # scan/connect only
|
|
--ble-role both # scan/connect and advertise
|
|
|
|
5. Run this program on both nodes.
|
|
"""
|
|
|
|
import argparse
|
|
|
|
import atexit
|
|
import os
|
|
import re
|
|
import signal
|
|
import shutil
|
|
import socket
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
APP_NAME = "ble_reticulum_poc"
|
|
APP_ASPECT = "echo"
|
|
DEFAULT_MESSAGE_CHUNK_SIZE = 300
|
|
LINK_PAYLOAD_BUDGET = 420
|
|
|
|
RNS = None
|
|
running = True
|
|
active_links = {}
|
|
active_links_lock = threading.Lock()
|
|
temporary_config_dir = None
|
|
message_file_text = None
|
|
message_file_path = None
|
|
message_chunk_size = DEFAULT_MESSAGE_CHUNK_SIZE
|
|
|
|
|
|
|
|
def log(msg):
|
|
now = time.time()
|
|
timestamp = time.strftime("%H:%M:%S", time.localtime(now))
|
|
milliseconds = int((now % 1) * 1000)
|
|
print(f"[{timestamp}.{milliseconds:03d}] {msg}", flush=True)
|
|
|
|
|
|
def normalise_argv(argv):
|
|
normalised = [argv[0]]
|
|
key_value_args = {
|
|
"message_file": "--message-file",
|
|
"message-file": "--message-file",
|
|
}
|
|
|
|
for arg in argv[1:]:
|
|
key, separator, value = arg.partition("=")
|
|
if separator and key in key_value_args:
|
|
normalised.extend([key_value_args[key], value])
|
|
else:
|
|
normalised.append(arg)
|
|
|
|
return normalised
|
|
|
|
|
|
def stop(_signum=None, _frame=None):
|
|
global running
|
|
running = False
|
|
|
|
|
|
def normalise_hex(h):
|
|
return h.replace(":", "").replace(" ", "").strip().lower()
|
|
|
|
|
|
def default_identity_path(configdir, node_name):
|
|
if configdir is None:
|
|
configdir = os.path.expanduser("~/.reticulum")
|
|
return os.path.join(configdir, f"{APP_NAME}_{node_name}.identity")
|
|
|
|
|
|
|
|
def base_config_dir(configdir):
|
|
return os.path.abspath(os.path.expanduser(configdir or "~/.reticulum"))
|
|
|
|
|
|
def str_to_yes_no(value):
|
|
if value is None:
|
|
return None
|
|
|
|
value = str(value).strip().lower()
|
|
if value in ("yes", "true", "1", "on", "enable", "enabled"):
|
|
return "yes"
|
|
if value in ("no", "false", "0", "off", "disable", "disabled"):
|
|
return "no"
|
|
|
|
raise argparse.ArgumentTypeError("expected yes/no, true/false, or 1/0")
|
|
|
|
|
|
def requested_ble_overrides(args):
|
|
enable_central = args.enable_central
|
|
enable_peripheral = args.enable_peripheral
|
|
|
|
if args.ble_role == "central":
|
|
enable_central = "yes"
|
|
enable_peripheral = "no"
|
|
elif args.ble_role == "peripheral":
|
|
enable_central = "no"
|
|
enable_peripheral = "yes"
|
|
elif args.ble_role == "both":
|
|
enable_central = "yes"
|
|
enable_peripheral = "yes"
|
|
|
|
return {
|
|
key: value for key, value in {
|
|
"enable_central": enable_central,
|
|
"enable_peripheral": enable_peripheral,
|
|
}.items() if value is not None
|
|
}
|
|
|
|
|
|
def find_ble_interface_block(lines):
|
|
start = None
|
|
for index, line in enumerate(lines):
|
|
if re.match(r"^\s*\[\[\s*BLE Interface\s*\]\]\s*$", line):
|
|
start = index
|
|
break
|
|
|
|
if start is None:
|
|
return None, None
|
|
|
|
end = len(lines)
|
|
for index in range(start + 1, len(lines)):
|
|
if re.match(r"^\s*\[\[.*\]\]\s*$", lines[index]) or re.match(r"^\s*\[[^\[].*\]\s*$", lines[index]):
|
|
end = index
|
|
break
|
|
|
|
return start, end
|
|
|
|
|
|
def set_config_value(lines, start, end, key, value):
|
|
pattern = re.compile(rf"^(\s*{re.escape(key)}\s*=\s*).*$")
|
|
for index in range(start + 1, end):
|
|
if pattern.match(lines[index]):
|
|
lines[index] = f"{key} = {value}\n"
|
|
return 0
|
|
|
|
insert_at = end
|
|
lines.insert(insert_at, f"{key} = {value}\n")
|
|
return 1
|
|
|
|
|
|
def patch_ble_config(config_path, overrides):
|
|
if os.path.isfile(config_path):
|
|
with open(config_path, "r", encoding="utf-8") as config_file:
|
|
lines = config_file.readlines()
|
|
else:
|
|
lines = [
|
|
"[reticulum]\n",
|
|
"enable_transport = No\n",
|
|
"share_instance = Yes\n",
|
|
"\n",
|
|
"[[BLE Interface]]\n",
|
|
"type = BLEInterface\n",
|
|
"enabled = yes\n",
|
|
]
|
|
|
|
start, end = find_ble_interface_block(lines)
|
|
if start is None:
|
|
if lines and lines[-1].strip():
|
|
lines.append("\n")
|
|
start = len(lines)
|
|
lines.extend([
|
|
"[[BLE Interface]]\n",
|
|
"type = BLEInterface\n",
|
|
"enabled = yes\n",
|
|
])
|
|
end = len(lines)
|
|
|
|
added = 0
|
|
for key, value in overrides.items():
|
|
added += set_config_value(lines, start, end + added, key, value)
|
|
|
|
with open(config_path, "w", encoding="utf-8") as config_file:
|
|
config_file.writelines(lines)
|
|
|
|
|
|
def runtime_config_dir(args, node_name):
|
|
global temporary_config_dir
|
|
|
|
overrides = requested_ble_overrides(args)
|
|
if not overrides:
|
|
return args.config
|
|
|
|
source_dir = base_config_dir(args.config)
|
|
temporary_config_dir = tempfile.mkdtemp(prefix=f"{APP_NAME}_{node_name}_")
|
|
|
|
if os.path.isdir(source_dir):
|
|
shutil.copytree(source_dir, temporary_config_dir, dirs_exist_ok=True, symlinks=True)
|
|
else:
|
|
os.makedirs(temporary_config_dir, exist_ok=True)
|
|
|
|
patch_ble_config(os.path.join(temporary_config_dir, "config"), overrides)
|
|
atexit.register(shutil.rmtree, temporary_config_dir, ignore_errors=True)
|
|
return temporary_config_dir
|
|
|
|
|
|
def configure_rns_loglevel(verbosity):
|
|
if verbosity is None:
|
|
return
|
|
|
|
levels = {
|
|
"critical": "LOG_CRITICAL",
|
|
"error": "LOG_ERROR",
|
|
"warning": "LOG_WARNING",
|
|
"notice": "LOG_NOTICE",
|
|
"info": "LOG_INFO",
|
|
"verbose": "LOG_VERBOSE",
|
|
"debug": "LOG_DEBUG",
|
|
"extreme": "LOG_EXTREME",
|
|
}
|
|
|
|
#RNS.loglevel(getattr(RNS, levels[verbosity]))
|
|
level = getattr(RNS, levels[verbosity])
|
|
set_loglevel = getattr(RNS, "loglevel", None)
|
|
|
|
if callable(set_loglevel):
|
|
set_loglevel(level)
|
|
else:
|
|
RNS.loglevel = level
|
|
|
|
|
|
|
|
def load_or_create_identity(path):
|
|
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
|
|
|
|
if os.path.isfile(path):
|
|
identity = RNS.Identity.from_file(path)
|
|
if identity is None:
|
|
raise RuntimeError(f"Could not load identity from {path}")
|
|
log(f"Loaded identity: {path}")
|
|
return identity
|
|
|
|
identity = RNS.Identity()
|
|
if not identity.to_file(path):
|
|
raise RuntimeError(f"Could not save new identity to {path}")
|
|
os.chmod(path, 0o600)
|
|
log(f"Created new identity: {path}")
|
|
return identity
|
|
|
|
|
|
def link_key(link):
|
|
return RNS.prettyhexrep(link.link_id)
|
|
|
|
|
|
def send_link_packet(link, text):
|
|
payload = f"{text} send_epoch={time.time():.6f}".encode("utf-8")
|
|
RNS.Packet(link, payload, create_receipt=False).send()
|
|
|
|
|
|
def active_link_count():
|
|
with active_links_lock:
|
|
links = list(active_links.values())
|
|
|
|
return sum(1 for link in links if link.status == RNS.Link.ACTIVE)
|
|
|
|
|
|
def load_message_file(path):
|
|
expanded_path = os.path.abspath(os.path.expanduser(path))
|
|
with open(expanded_path, "r", encoding="utf-8") as file_handle:
|
|
return expanded_path, file_handle.read()
|
|
|
|
|
|
def utf8_chunks(text, max_bytes):
|
|
chunk = ""
|
|
chunk_bytes = 0
|
|
|
|
for character in text:
|
|
character_bytes = len(character.encode("utf-8"))
|
|
if chunk and chunk_bytes + character_bytes > max_bytes:
|
|
yield chunk
|
|
chunk = character
|
|
chunk_bytes = character_bytes
|
|
else:
|
|
chunk += character
|
|
chunk_bytes += character_bytes
|
|
|
|
if chunk:
|
|
yield chunk
|
|
|
|
|
|
def file_chunk_prefix(index, total, chunk_bytes):
|
|
return (
|
|
f"file_chunk {index}/{total} from {NODE_NAME} "
|
|
f"bytes={chunk_bytes} file={os.path.basename(message_file_path)} data="
|
|
)
|
|
|
|
|
|
def file_data_chunk_size(total, total_bytes):
|
|
prefix = file_chunk_prefix(total, total, total_bytes)
|
|
send_epoch_suffix = " send_epoch=1778951576.861234"
|
|
metadata_bytes = len(prefix.encode("utf-8")) + len(send_epoch_suffix.encode("utf-8"))
|
|
safe_size = max(1, LINK_PAYLOAD_BUDGET - metadata_bytes)
|
|
return min(message_chunk_size, safe_size)
|
|
|
|
|
|
def send_message_file(link):
|
|
if message_file_text is None:
|
|
return
|
|
|
|
total_bytes = len(message_file_text.encode("utf-8"))
|
|
estimated_total = max(1, (total_bytes + message_chunk_size - 1) // message_chunk_size)
|
|
data_chunk_size = file_data_chunk_size(estimated_total, total_bytes)
|
|
chunks = list(utf8_chunks(message_file_text, data_chunk_size))
|
|
total = max(1, len(chunks))
|
|
data_chunk_size = file_data_chunk_size(total, total_bytes)
|
|
chunks = list(utf8_chunks(message_file_text, data_chunk_size))
|
|
total = max(1, len(chunks))
|
|
if data_chunk_size < message_chunk_size:
|
|
log(
|
|
f"Requested message chunk size {message_chunk_size} exceeds Reticulum link budget; "
|
|
f"using {data_chunk_size} data bytes per chunk"
|
|
)
|
|
log(f"Sending file {message_file_path} as {total} chunk(s), {total_bytes} bytes")
|
|
|
|
for index, chunk_text in enumerate(chunks):
|
|
if not running or link.status != RNS.Link.ACTIVE:
|
|
return
|
|
|
|
chunk_bytes = len(chunk_text.encode("utf-8"))
|
|
send_link_packet(link, f"{file_chunk_prefix(index + 1, total, chunk_bytes)}{chunk_text}")
|
|
time.sleep(0.1)
|
|
|
|
|
|
def start_message_file_sender(link):
|
|
if message_file_text is not None:
|
|
threading.Thread(target=send_message_file, args=(link,), daemon=True).start()
|
|
|
|
|
|
def link_packet_received(message, packet):
|
|
try:
|
|
text = message.decode("utf-8", errors="replace")
|
|
except AttributeError:
|
|
text = str(message)
|
|
|
|
link = getattr(packet, "link", None)
|
|
peer = link_key(link) if link else "unknown-link"
|
|
log(f"RX link={peer}: {text}")
|
|
|
|
|
|
def link_closed(link):
|
|
key = link_key(link)
|
|
with active_links_lock:
|
|
active_links.pop(key, None)
|
|
log(f"Link closed: {key}")
|
|
|
|
|
|
def outbound_link_established(link):
|
|
key = link_key(link)
|
|
link.set_packet_callback(link_packet_received)
|
|
link.set_link_closed_callback(link_closed)
|
|
|
|
with active_links_lock:
|
|
active_links[key] = link
|
|
|
|
log(f"Outbound link established: {key}")
|
|
send_link_packet(link, f"hello from {NODE_NAME}")
|
|
start_message_file_sender(link)
|
|
|
|
|
|
def inbound_link_established(link):
|
|
key = link_key(link)
|
|
link.set_packet_callback(link_packet_received)
|
|
link.set_link_closed_callback(link_closed)
|
|
|
|
with active_links_lock:
|
|
active_links[key] = link
|
|
|
|
log(f"Inbound link established: {key}")
|
|
send_link_packet(link, f"hello back from {NODE_NAME}")
|
|
start_message_file_sender(link)
|
|
|
|
|
|
def direct_packet_received(data, packet):
|
|
text = data.decode("utf-8", errors="replace")
|
|
log(f"RX direct packet: {text}")
|
|
|
|
|
|
def announce_loop(destination, interval, only_when_disconnected):
|
|
while running:
|
|
if only_when_disconnected and active_link_count() > 0:
|
|
log("Skipped announce because an active Reticulum link exists")
|
|
else:
|
|
destination.announce(app_data=NODE_NAME.encode("utf-8"))
|
|
log(f"Announced {RNS.prettyhexrep(destination.hash)} as {NODE_NAME}")
|
|
|
|
for _ in range(interval):
|
|
if not running:
|
|
break
|
|
time.sleep(1)
|
|
|
|
|
|
def heartbeat_loop(interval):
|
|
seq = 0
|
|
while running:
|
|
time.sleep(interval)
|
|
seq += 1
|
|
|
|
with active_links_lock:
|
|
links = list(active_links.values())
|
|
|
|
for link in links:
|
|
if link.status == RNS.Link.ACTIVE:
|
|
send_link_packet(link, f"heartbeat {seq} from {NODE_NAME}")
|
|
|
|
|
|
def wait_for_path(destination_hash, timeout):
|
|
started = time.time()
|
|
|
|
while running and not RNS.Transport.has_path(destination_hash):
|
|
remaining = timeout - (time.time() - started)
|
|
if remaining <= 0:
|
|
return False
|
|
|
|
log(f"Requesting path to {RNS.prettyhexrep(destination_hash)}")
|
|
RNS.Transport.request_path(destination_hash)
|
|
|
|
for _ in range(20):
|
|
if RNS.Transport.has_path(destination_hash) or not running:
|
|
return RNS.Transport.has_path(destination_hash)
|
|
time.sleep(0.25)
|
|
|
|
return RNS.Transport.has_path(destination_hash)
|
|
|
|
|
|
def connect_to_peer(peer_hexhash, timeout):
|
|
destination_hash = bytes.fromhex(normalise_hex(peer_hexhash))
|
|
|
|
if not wait_for_path(destination_hash, timeout):
|
|
log(f"No path to peer {RNS.prettyhexrep(destination_hash)} after {timeout}s")
|
|
return None
|
|
|
|
peer_identity = RNS.Identity.recall(destination_hash)
|
|
if peer_identity is None:
|
|
log(f"Path exists but identity recall failed for {RNS.prettyhexrep(destination_hash)}")
|
|
return None
|
|
|
|
peer_destination = RNS.Destination(
|
|
peer_identity,
|
|
RNS.Destination.OUT,
|
|
RNS.Destination.SINGLE,
|
|
APP_NAME,
|
|
APP_ASPECT,
|
|
)
|
|
|
|
log(f"Establishing link to {RNS.prettyhexrep(destination_hash)}")
|
|
link = RNS.Link(peer_destination)
|
|
link.set_link_established_callback(outbound_link_established)
|
|
link.set_link_closed_callback(link_closed)
|
|
return link
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="BLE Reticulum two-node echo proof")
|
|
parser.add_argument("--config", default=None, help="Reticulum config directory")
|
|
parser.add_argument("--identity", default=None, help="App identity file path")
|
|
parser.add_argument("--name", default=None, help="Node name, default: hostname")
|
|
parser.add_argument("--peer", default=None, help="Peer destination hash to connect to")
|
|
parser.add_argument("--announce-interval", type=int, default=30)
|
|
parser.add_argument("--send-interval", type=int, default=10)
|
|
parser.add_argument("--path-timeout", type=int, default=120)
|
|
parser.add_argument(
|
|
"--message-file",
|
|
default=None,
|
|
help="Send this UTF-8 text file once per established link instead of periodic heartbeats",
|
|
)
|
|
parser.add_argument(
|
|
"--message-chunk-size",
|
|
type=int,
|
|
default=DEFAULT_MESSAGE_CHUNK_SIZE,
|
|
help="Requested maximum UTF-8 data bytes per file chunk packet",
|
|
)
|
|
parser.add_argument(
|
|
"--announce-only-when-disconnected",
|
|
action="store_true",
|
|
help="Skip periodic announces while at least one Reticulum link is active",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--ble-role",
|
|
choices=("central", "peripheral", "both"),
|
|
default=None,
|
|
help="Runtime BLE mode override: central scans/connects, peripheral advertises, both does both",
|
|
)
|
|
parser.add_argument(
|
|
"--enable-central",
|
|
"--enable-cental",
|
|
type=str_to_yes_no,
|
|
default=None,
|
|
help="Runtime override for BLEInterface enable_central",
|
|
)
|
|
parser.add_argument(
|
|
"--enable-peripheral",
|
|
type=str_to_yes_no,
|
|
default=None,
|
|
help="Runtime override for BLEInterface enable_peripheral",
|
|
)
|
|
parser.add_argument(
|
|
"--verbosity",
|
|
choices=("critical", "error", "warning", "notice", "info", "verbose", "debug", "extreme"),
|
|
default=None,
|
|
help="Reticulum log verbosity for this run",
|
|
)
|
|
|
|
args = parser.parse_args(normalise_argv(sys.argv)[1:])
|
|
if args.message_chunk_size < 1:
|
|
parser.error("--message-chunk-size must be at least 1")
|
|
return args
|
|
|
|
|
|
if __name__ == "__main__":
|
|
signal.signal(signal.SIGINT, stop)
|
|
signal.signal(signal.SIGTERM, stop)
|
|
|
|
args = parse_args()
|
|
|
|
import RNS as rns
|
|
RNS = rns
|
|
|
|
NODE_NAME = args.name or socket.gethostname().split(".")[0]
|
|
identity_path = args.identity or default_identity_path(args.config, NODE_NAME)
|
|
reticulum_config = runtime_config_dir(args, NODE_NAME)
|
|
message_chunk_size = args.message_chunk_size
|
|
if args.message_file:
|
|
message_file_path, message_file_text = load_message_file(args.message_file)
|
|
|
|
log(f"Starting node {NODE_NAME}")
|
|
log(f"Reticulum config: {args.config or '~/.reticulum'}")
|
|
if reticulum_config != args.config:
|
|
log(f"Runtime config: {reticulum_config}")
|
|
configure_rns_loglevel(args.verbosity)
|
|
|
|
reticulum = RNS.Reticulum(reticulum_config)
|
|
|
|
identity = load_or_create_identity(identity_path)
|
|
|
|
destination = RNS.Destination(
|
|
identity,
|
|
RNS.Destination.IN,
|
|
RNS.Destination.SINGLE,
|
|
APP_NAME,
|
|
APP_ASPECT,
|
|
)
|
|
destination.set_link_established_callback(inbound_link_established)
|
|
destination.set_packet_callback(direct_packet_received)
|
|
destination.set_proof_strategy(RNS.Destination.PROVE_ALL)
|
|
|
|
log(f"Destination hash: {RNS.prettyhexrep(destination.hash)}")
|
|
log("Use this hash as --peer on the other node.")
|
|
if message_file_text is not None:
|
|
log(f"Message file mode: {message_file_path} ({len(message_file_text.encode('utf-8'))} bytes)")
|
|
log("Periodic heartbeats are disabled in message file mode.")
|
|
|
|
threading.Thread(
|
|
target=announce_loop,
|
|
args=(destination, args.announce_interval, args.announce_only_when_disconnected),
|
|
daemon=True,
|
|
).start()
|
|
if message_file_text is None:
|
|
threading.Thread(target=heartbeat_loop, args=(args.send_interval,), daemon=True).start()
|
|
|
|
if args.peer:
|
|
threading.Thread(target=connect_to_peer, args=(args.peer, args.path_timeout), daemon=True).start()
|
|
|
|
while running:
|
|
time.sleep(0.5)
|
|
|
|
log("Stopping")
|
|
sys.exit(0)
|