#!/usr/bin/env python3 """ Two-node BLE Reticulum echo proof. Goal: Run the same program on zerodev1 and zerodev2. Each node: - derives its node name from hostname unless --name is supplied - loads or creates a stable Reticulum app identity - creates one inbound destination - announces that destination over configured Reticulum interfaces - optionally connects to a peer destination hash - prints every message received over a Reticulum link After cloning the SD card: 1. Change zerodev2 hostname. 2. Remove cloned Reticulum transport state on zerodev2. 3. Use a unique app identity file per host, or delete the cloned one. 4. Choose BLE mode at runtime, or configure BLEInterface in Reticulum: --ble-role peripheral # advertise only, accepts incoming BLE connections --ble-role central # scan/connect only --ble-role both # scan/connect and advertise 5. Run this program on both nodes. """ import argparse import atexit import os import re import signal import shutil import socket import sys import tempfile import threading import time APP_NAME = "ble_reticulum_poc" APP_ASPECT = "echo" RNS = None running = True active_links = {} active_links_lock = threading.Lock() temporary_config_dir = None def log(msg): print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) def stop(_signum=None, _frame=None): global running running = False def normalise_hex(h): return h.replace(":", "").replace(" ", "").strip().lower() def default_identity_path(configdir, node_name): if configdir is None: configdir = os.path.expanduser("~/.reticulum") return os.path.join(configdir, f"{APP_NAME}_{node_name}.identity") def base_config_dir(configdir): return os.path.abspath(os.path.expanduser(configdir or "~/.reticulum")) def str_to_yes_no(value): if value is None: return None value = str(value).strip().lower() if value in ("yes", "true", "1", "on", "enable", "enabled"): return "yes" if value in ("no", "false", "0", "off", "disable", "disabled"): return "no" raise argparse.ArgumentTypeError("expected yes/no, true/false, or 1/0") def requested_ble_overrides(args): enable_central = args.enable_central enable_peripheral = args.enable_peripheral if args.ble_role == "central": enable_central = "yes" enable_peripheral = "no" elif args.ble_role == "peripheral": enable_central = "no" enable_peripheral = "yes" elif args.ble_role == "both": enable_central = "yes" enable_peripheral = "yes" return { key: value for key, value in { "enable_central": enable_central, "enable_peripheral": enable_peripheral, }.items() if value is not None } def find_ble_interface_block(lines): start = None for index, line in enumerate(lines): if re.match(r"^\s*\[\[\s*BLE Interface\s*\]\]\s*$", line): start = index break if start is None: return None, None end = len(lines) for index in range(start + 1, len(lines)): if re.match(r"^\s*\[\[.*\]\]\s*$", lines[index]) or re.match(r"^\s*\[[^\[].*\]\s*$", lines[index]): end = index break return start, end def set_config_value(lines, start, end, key, value): pattern = re.compile(rf"^(\s*{re.escape(key)}\s*=\s*).*$") for index in range(start + 1, end): if pattern.match(lines[index]): lines[index] = f"{key} = {value}\n" return 0 insert_at = end lines.insert(insert_at, f"{key} = {value}\n") return 1 def patch_ble_config(config_path, overrides): if os.path.isfile(config_path): with open(config_path, "r", encoding="utf-8") as config_file: lines = config_file.readlines() else: lines = [ "[reticulum]\n", "enable_transport = No\n", "share_instance = Yes\n", "\n", "[[BLE Interface]]\n", "type = BLEInterface\n", "enabled = yes\n", ] start, end = find_ble_interface_block(lines) if start is None: if lines and lines[-1].strip(): lines.append("\n") start = len(lines) lines.extend([ "[[BLE Interface]]\n", "type = BLEInterface\n", "enabled = yes\n", ]) end = len(lines) added = 0 for key, value in overrides.items(): added += set_config_value(lines, start, end + added, key, value) with open(config_path, "w", encoding="utf-8") as config_file: config_file.writelines(lines) def runtime_config_dir(args, node_name): global temporary_config_dir overrides = requested_ble_overrides(args) if not overrides: return args.config source_dir = base_config_dir(args.config) temporary_config_dir = tempfile.mkdtemp(prefix=f"{APP_NAME}_{node_name}_") if os.path.isdir(source_dir): shutil.copytree(source_dir, temporary_config_dir, dirs_exist_ok=True, symlinks=True) else: os.makedirs(temporary_config_dir, exist_ok=True) patch_ble_config(os.path.join(temporary_config_dir, "config"), overrides) atexit.register(shutil.rmtree, temporary_config_dir, ignore_errors=True) return temporary_config_dir def configure_rns_loglevel(verbosity): if verbosity is None: return levels = { "critical": "LOG_CRITICAL", "error": "LOG_ERROR", "warning": "LOG_WARNING", "notice": "LOG_NOTICE", "info": "LOG_INFO", "verbose": "LOG_VERBOSE", "debug": "LOG_DEBUG", "extreme": "LOG_EXTREME", } RNS.loglevel(getattr(RNS, levels[verbosity])) def load_or_create_identity(path): os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) if os.path.isfile(path): identity = RNS.Identity.from_file(path) if identity is None: raise RuntimeError(f"Could not load identity from {path}") log(f"Loaded identity: {path}") return identity identity = RNS.Identity() if not identity.to_file(path): raise RuntimeError(f"Could not save new identity to {path}") os.chmod(path, 0o600) log(f"Created new identity: {path}") return identity def link_key(link): return RNS.prettyhexrep(link.link_id) def send_link_packet(link, text): payload = text.encode("utf-8") RNS.Packet(link, payload, create_receipt=False).send() def link_packet_received(message, packet): try: text = message.decode("utf-8", errors="replace") except AttributeError: text = str(message) link = getattr(packet, "link", None) peer = link_key(link) if link else "unknown-link" log(f"RX link={peer}: {text}") def link_closed(link): key = link_key(link) with active_links_lock: active_links.pop(key, None) log(f"Link closed: {key}") def outbound_link_established(link): key = link_key(link) link.set_packet_callback(link_packet_received) link.set_link_closed_callback(link_closed) with active_links_lock: active_links[key] = link log(f"Outbound link established: {key}") send_link_packet(link, f"hello from {NODE_NAME}") def inbound_link_established(link): key = link_key(link) link.set_packet_callback(link_packet_received) link.set_link_closed_callback(link_closed) with active_links_lock: active_links[key] = link log(f"Inbound link established: {key}") send_link_packet(link, f"hello back from {NODE_NAME}") def direct_packet_received(data, packet): text = data.decode("utf-8", errors="replace") log(f"RX direct packet: {text}") def announce_loop(destination, interval): while running: destination.announce(app_data=NODE_NAME.encode("utf-8")) log(f"Announced {RNS.prettyhexrep(destination.hash)} as {NODE_NAME}") for _ in range(interval): if not running: break time.sleep(1) def heartbeat_loop(interval): seq = 0 while running: time.sleep(interval) seq += 1 with active_links_lock: links = list(active_links.values()) for link in links: if link.status == RNS.Link.ACTIVE: send_link_packet(link, f"heartbeat {seq} from {NODE_NAME}") def wait_for_path(destination_hash, timeout): started = time.time() while running and not RNS.Transport.has_path(destination_hash): remaining = timeout - (time.time() - started) if remaining <= 0: return False log(f"Requesting path to {RNS.prettyhexrep(destination_hash)}") RNS.Transport.request_path(destination_hash) for _ in range(20): if RNS.Transport.has_path(destination_hash) or not running: return RNS.Transport.has_path(destination_hash) time.sleep(0.25) return RNS.Transport.has_path(destination_hash) def connect_to_peer(peer_hexhash, timeout): destination_hash = bytes.fromhex(normalise_hex(peer_hexhash)) if not wait_for_path(destination_hash, timeout): log(f"No path to peer {RNS.prettyhexrep(destination_hash)} after {timeout}s") return None peer_identity = RNS.Identity.recall(destination_hash) if peer_identity is None: log(f"Path exists but identity recall failed for {RNS.prettyhexrep(destination_hash)}") return None peer_destination = RNS.Destination( peer_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, APP_ASPECT, ) log(f"Establishing link to {RNS.prettyhexrep(destination_hash)}") link = RNS.Link(peer_destination) link.set_link_established_callback(outbound_link_established) link.set_link_closed_callback(link_closed) return link def parse_args(): parser = argparse.ArgumentParser(description="BLE Reticulum two-node echo proof") parser.add_argument("--config", default=None, help="Reticulum config directory") parser.add_argument("--identity", default=None, help="App identity file path") parser.add_argument("--name", default=None, help="Node name, default: hostname") parser.add_argument("--peer", default=None, help="Peer destination hash to connect to") parser.add_argument("--announce-interval", type=int, default=30) parser.add_argument("--send-interval", type=int, default=10) parser.add_argument("--path-timeout", type=int, default=120) parser.add_argument( "--ble-role", choices=("central", "peripheral", "both"), default=None, help="Runtime BLE mode override: central scans/connects, peripheral advertises, both does both", ) parser.add_argument( "--enable-central", "--enable-cental", type=str_to_yes_no, default=None, help="Runtime override for BLEInterface enable_central", ) parser.add_argument( "--enable-peripheral", type=str_to_yes_no, default=None, help="Runtime override for BLEInterface enable_peripheral", ) parser.add_argument( "--verbosity", choices=("critical", "error", "warning", "notice", "info", "verbose", "debug", "extreme"), default=None, help="Reticulum log verbosity for this run", ) return parser.parse_args() if __name__ == "__main__": signal.signal(signal.SIGINT, stop) signal.signal(signal.SIGTERM, stop) args = parse_args() import RNS as rns RNS = rns NODE_NAME = args.name or socket.gethostname().split(".")[0] identity_path = args.identity or default_identity_path(args.config, NODE_NAME) reticulum_config = runtime_config_dir(args, NODE_NAME) log(f"Starting node {NODE_NAME}") log(f"Reticulum config: {args.config or '~/.reticulum'}") if reticulum_config != args.config: log(f"Runtime config: {reticulum_config}") configure_rns_loglevel(args.verbosity) reticulum = RNS.Reticulum(reticulum_config) identity = load_or_create_identity(identity_path) destination = RNS.Destination( identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, APP_ASPECT, ) destination.set_link_established_callback(inbound_link_established) destination.set_packet_callback(direct_packet_received) destination.set_proof_strategy(RNS.Destination.PROVE_ALL) log(f"Destination hash: {RNS.prettyhexrep(destination.hash)}") log("Use this hash as --peer on the other node.") threading.Thread(target=announce_loop, args=(destination, args.announce_interval), daemon=True).start() threading.Thread(target=heartbeat_loop, args=(args.send_interval,), daemon=True).start() if args.peer: threading.Thread(target=connect_to_peer, args=(args.peer, args.path_timeout), daemon=True).start() while running: time.sleep(0.5) log("Stopping") sys.exit(0)