ble-reticulum/examples/ble_dual_node_echo.py
2026-05-16 08:31:39 -07:00

455 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Two-node BLE Reticulum echo proof.
Goal:
Run the same program on zerodev1 and zerodev2. Each node:
- derives its node name from hostname unless --name is supplied
- loads or creates a stable Reticulum app identity
- creates one inbound destination
- announces that destination over configured Reticulum interfaces
- optionally connects to a peer destination hash
- prints every message received over a Reticulum link
After cloning the SD card:
1. Change zerodev2 hostname.
2. Remove cloned Reticulum transport state on zerodev2.
3. Use a unique app identity file per host, or delete the cloned one.
4. Choose BLE mode at runtime, or configure BLEInterface in Reticulum:
--ble-role peripheral # advertise only, accepts incoming BLE connections
--ble-role central # scan/connect only
--ble-role both # scan/connect and advertise
5. Run this program on both nodes.
"""
import argparse
import atexit
import os
import re
import signal
import shutil
import socket
import sys
import tempfile
import threading
import time
APP_NAME = "ble_reticulum_poc"
APP_ASPECT = "echo"
RNS = None
running = True
active_links = {}
active_links_lock = threading.Lock()
temporary_config_dir = None
def log(msg):
now = time.time()
timestamp = time.strftime("%H:%M:%S", time.localtime(now))
milliseconds = int((now % 1) * 1000)
print(f"[{timestamp}.{milliseconds:03d}] {msg}", flush=True)
def stop(_signum=None, _frame=None):
global running
running = False
def normalise_hex(h):
return h.replace(":", "").replace(" ", "").strip().lower()
def default_identity_path(configdir, node_name):
if configdir is None:
configdir = os.path.expanduser("~/.reticulum")
return os.path.join(configdir, f"{APP_NAME}_{node_name}.identity")
def base_config_dir(configdir):
return os.path.abspath(os.path.expanduser(configdir or "~/.reticulum"))
def str_to_yes_no(value):
if value is None:
return None
value = str(value).strip().lower()
if value in ("yes", "true", "1", "on", "enable", "enabled"):
return "yes"
if value in ("no", "false", "0", "off", "disable", "disabled"):
return "no"
raise argparse.ArgumentTypeError("expected yes/no, true/false, or 1/0")
def requested_ble_overrides(args):
enable_central = args.enable_central
enable_peripheral = args.enable_peripheral
if args.ble_role == "central":
enable_central = "yes"
enable_peripheral = "no"
elif args.ble_role == "peripheral":
enable_central = "no"
enable_peripheral = "yes"
elif args.ble_role == "both":
enable_central = "yes"
enable_peripheral = "yes"
return {
key: value for key, value in {
"enable_central": enable_central,
"enable_peripheral": enable_peripheral,
}.items() if value is not None
}
def find_ble_interface_block(lines):
start = None
for index, line in enumerate(lines):
if re.match(r"^\s*\[\[\s*BLE Interface\s*\]\]\s*$", line):
start = index
break
if start is None:
return None, None
end = len(lines)
for index in range(start + 1, len(lines)):
if re.match(r"^\s*\[\[.*\]\]\s*$", lines[index]) or re.match(r"^\s*\[[^\[].*\]\s*$", lines[index]):
end = index
break
return start, end
def set_config_value(lines, start, end, key, value):
pattern = re.compile(rf"^(\s*{re.escape(key)}\s*=\s*).*$")
for index in range(start + 1, end):
if pattern.match(lines[index]):
lines[index] = f"{key} = {value}\n"
return 0
insert_at = end
lines.insert(insert_at, f"{key} = {value}\n")
return 1
def patch_ble_config(config_path, overrides):
if os.path.isfile(config_path):
with open(config_path, "r", encoding="utf-8") as config_file:
lines = config_file.readlines()
else:
lines = [
"[reticulum]\n",
"enable_transport = No\n",
"share_instance = Yes\n",
"\n",
"[[BLE Interface]]\n",
"type = BLEInterface\n",
"enabled = yes\n",
]
start, end = find_ble_interface_block(lines)
if start is None:
if lines and lines[-1].strip():
lines.append("\n")
start = len(lines)
lines.extend([
"[[BLE Interface]]\n",
"type = BLEInterface\n",
"enabled = yes\n",
])
end = len(lines)
added = 0
for key, value in overrides.items():
added += set_config_value(lines, start, end + added, key, value)
with open(config_path, "w", encoding="utf-8") as config_file:
config_file.writelines(lines)
def runtime_config_dir(args, node_name):
global temporary_config_dir
overrides = requested_ble_overrides(args)
if not overrides:
return args.config
source_dir = base_config_dir(args.config)
temporary_config_dir = tempfile.mkdtemp(prefix=f"{APP_NAME}_{node_name}_")
if os.path.isdir(source_dir):
shutil.copytree(source_dir, temporary_config_dir, dirs_exist_ok=True, symlinks=True)
else:
os.makedirs(temporary_config_dir, exist_ok=True)
patch_ble_config(os.path.join(temporary_config_dir, "config"), overrides)
atexit.register(shutil.rmtree, temporary_config_dir, ignore_errors=True)
return temporary_config_dir
def configure_rns_loglevel(verbosity):
if verbosity is None:
return
levels = {
"critical": "LOG_CRITICAL",
"error": "LOG_ERROR",
"warning": "LOG_WARNING",
"notice": "LOG_NOTICE",
"info": "LOG_INFO",
"verbose": "LOG_VERBOSE",
"debug": "LOG_DEBUG",
"extreme": "LOG_EXTREME",
}
#RNS.loglevel(getattr(RNS, levels[verbosity]))
level = getattr(RNS, levels[verbosity])
set_loglevel = getattr(RNS, "loglevel", None)
if callable(set_loglevel):
set_loglevel(level)
else:
RNS.loglevel = level
def load_or_create_identity(path):
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
if os.path.isfile(path):
identity = RNS.Identity.from_file(path)
if identity is None:
raise RuntimeError(f"Could not load identity from {path}")
log(f"Loaded identity: {path}")
return identity
identity = RNS.Identity()
if not identity.to_file(path):
raise RuntimeError(f"Could not save new identity to {path}")
os.chmod(path, 0o600)
log(f"Created new identity: {path}")
return identity
def link_key(link):
return RNS.prettyhexrep(link.link_id)
def send_link_packet(link, text):
payload = f"{text} send_epoch={time.time():.6f}".encode("utf-8")
RNS.Packet(link, payload, create_receipt=False).send()
def link_packet_received(message, packet):
try:
text = message.decode("utf-8", errors="replace")
except AttributeError:
text = str(message)
link = getattr(packet, "link", None)
peer = link_key(link) if link else "unknown-link"
log(f"RX link={peer}: {text}")
def link_closed(link):
key = link_key(link)
with active_links_lock:
active_links.pop(key, None)
log(f"Link closed: {key}")
def outbound_link_established(link):
key = link_key(link)
link.set_packet_callback(link_packet_received)
link.set_link_closed_callback(link_closed)
with active_links_lock:
active_links[key] = link
log(f"Outbound link established: {key}")
send_link_packet(link, f"hello from {NODE_NAME}")
def inbound_link_established(link):
key = link_key(link)
link.set_packet_callback(link_packet_received)
link.set_link_closed_callback(link_closed)
with active_links_lock:
active_links[key] = link
log(f"Inbound link established: {key}")
send_link_packet(link, f"hello back from {NODE_NAME}")
def direct_packet_received(data, packet):
text = data.decode("utf-8", errors="replace")
log(f"RX direct packet: {text}")
def announce_loop(destination, interval):
while running:
destination.announce(app_data=NODE_NAME.encode("utf-8"))
log(f"Announced {RNS.prettyhexrep(destination.hash)} as {NODE_NAME}")
for _ in range(interval):
if not running:
break
time.sleep(1)
def heartbeat_loop(interval):
seq = 0
while running:
time.sleep(interval)
seq += 1
with active_links_lock:
links = list(active_links.values())
for link in links:
if link.status == RNS.Link.ACTIVE:
send_link_packet(link, f"heartbeat {seq} from {NODE_NAME}")
def wait_for_path(destination_hash, timeout):
started = time.time()
while running and not RNS.Transport.has_path(destination_hash):
remaining = timeout - (time.time() - started)
if remaining <= 0:
return False
log(f"Requesting path to {RNS.prettyhexrep(destination_hash)}")
RNS.Transport.request_path(destination_hash)
for _ in range(20):
if RNS.Transport.has_path(destination_hash) or not running:
return RNS.Transport.has_path(destination_hash)
time.sleep(0.25)
return RNS.Transport.has_path(destination_hash)
def connect_to_peer(peer_hexhash, timeout):
destination_hash = bytes.fromhex(normalise_hex(peer_hexhash))
if not wait_for_path(destination_hash, timeout):
log(f"No path to peer {RNS.prettyhexrep(destination_hash)} after {timeout}s")
return None
peer_identity = RNS.Identity.recall(destination_hash)
if peer_identity is None:
log(f"Path exists but identity recall failed for {RNS.prettyhexrep(destination_hash)}")
return None
peer_destination = RNS.Destination(
peer_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
APP_ASPECT,
)
log(f"Establishing link to {RNS.prettyhexrep(destination_hash)}")
link = RNS.Link(peer_destination)
link.set_link_established_callback(outbound_link_established)
link.set_link_closed_callback(link_closed)
return link
def parse_args():
parser = argparse.ArgumentParser(description="BLE Reticulum two-node echo proof")
parser.add_argument("--config", default=None, help="Reticulum config directory")
parser.add_argument("--identity", default=None, help="App identity file path")
parser.add_argument("--name", default=None, help="Node name, default: hostname")
parser.add_argument("--peer", default=None, help="Peer destination hash to connect to")
parser.add_argument("--announce-interval", type=int, default=30)
parser.add_argument("--send-interval", type=int, default=10)
parser.add_argument("--path-timeout", type=int, default=120)
parser.add_argument(
"--ble-role",
choices=("central", "peripheral", "both"),
default=None,
help="Runtime BLE mode override: central scans/connects, peripheral advertises, both does both",
)
parser.add_argument(
"--enable-central",
"--enable-cental",
type=str_to_yes_no,
default=None,
help="Runtime override for BLEInterface enable_central",
)
parser.add_argument(
"--enable-peripheral",
type=str_to_yes_no,
default=None,
help="Runtime override for BLEInterface enable_peripheral",
)
parser.add_argument(
"--verbosity",
choices=("critical", "error", "warning", "notice", "info", "verbose", "debug", "extreme"),
default=None,
help="Reticulum log verbosity for this run",
)
return parser.parse_args()
if __name__ == "__main__":
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
args = parse_args()
import RNS as rns
RNS = rns
NODE_NAME = args.name or socket.gethostname().split(".")[0]
identity_path = args.identity or default_identity_path(args.config, NODE_NAME)
reticulum_config = runtime_config_dir(args, NODE_NAME)
log(f"Starting node {NODE_NAME}")
log(f"Reticulum config: {args.config or '~/.reticulum'}")
if reticulum_config != args.config:
log(f"Runtime config: {reticulum_config}")
configure_rns_loglevel(args.verbosity)
reticulum = RNS.Reticulum(reticulum_config)
identity = load_or_create_identity(identity_path)
destination = RNS.Destination(
identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
APP_ASPECT,
)
destination.set_link_established_callback(inbound_link_established)
destination.set_packet_callback(direct_packet_received)
destination.set_proof_strategy(RNS.Destination.PROVE_ALL)
log(f"Destination hash: {RNS.prettyhexrep(destination.hash)}")
log("Use this hash as --peer on the other node.")
threading.Thread(target=announce_loop, args=(destination, args.announce_interval), daemon=True).start()
threading.Thread(target=heartbeat_loop, args=(args.send_interval,), daemon=True).start()
if args.peer:
threading.Thread(target=connect_to_peer, args=(args.peer, args.path_timeout), daemon=True).start()
while running:
time.sleep(0.5)
log("Stopping")
sys.exit(0)