revised to sent reticulum enable_peripheral and enable_central, also parameter to reduce debugging output

This commit is contained in:
John Poole 2026-05-15 18:56:09 -07:00
commit d7e03271a4

View file

@ -15,11 +15,19 @@ After cloning the SD card:
1. Change zerodev2 hostname.
2. Remove cloned Reticulum transport state on zerodev2.
3. Use a unique app identity file per host, or delete the cloned one.
<<<<<<< Updated upstream
4. Configure BLEInterface on both Pis with central+peripheral enabled.
=======
4. Choose BLE mode at runtime, or configure BLEInterface in Reticulum:
--ble-role peripheral # advertise only, accepts incoming BLE connections
--ble-role central # scan/connect only
--ble-role both # scan/connect and advertise
>>>>>>> Stashed changes
5. Run this program on both nodes.
"""
import argparse
<<<<<<< Updated upstream
import os
import signal
import socket
@ -35,6 +43,27 @@ APP_ASPECT = "echo"
running = True
active_links = {}
active_links_lock = threading.Lock()
=======
import atexit
import os
import re
import signal
import shutil
import socket
import sys
import tempfile
import threading
import time
APP_NAME = "ble_reticulum_poc"
APP_ASPECT = "echo"
RNS = None
running = True
active_links = {}
active_links_lock = threading.Lock()
temporary_config_dir = None
>>>>>>> Stashed changes
def log(msg):
@ -56,6 +85,152 @@ def default_identity_path(configdir, node_name):
return os.path.join(configdir, f"{APP_NAME}_{node_name}.identity")
<<<<<<< Updated upstream
=======
def base_config_dir(configdir):
return os.path.abspath(os.path.expanduser(configdir or "~/.reticulum"))
def str_to_yes_no(value):
if value is None:
return None
value = str(value).strip().lower()
if value in ("yes", "true", "1", "on", "enable", "enabled"):
return "yes"
if value in ("no", "false", "0", "off", "disable", "disabled"):
return "no"
raise argparse.ArgumentTypeError("expected yes/no, true/false, or 1/0")
def requested_ble_overrides(args):
enable_central = args.enable_central
enable_peripheral = args.enable_peripheral
if args.ble_role == "central":
enable_central = "yes"
enable_peripheral = "no"
elif args.ble_role == "peripheral":
enable_central = "no"
enable_peripheral = "yes"
elif args.ble_role == "both":
enable_central = "yes"
enable_peripheral = "yes"
return {
key: value for key, value in {
"enable_central": enable_central,
"enable_peripheral": enable_peripheral,
}.items() if value is not None
}
def find_ble_interface_block(lines):
start = None
for index, line in enumerate(lines):
if re.match(r"^\s*\[\[\s*BLE Interface\s*\]\]\s*$", line):
start = index
break
if start is None:
return None, None
end = len(lines)
for index in range(start + 1, len(lines)):
if re.match(r"^\s*\[\[.*\]\]\s*$", lines[index]) or re.match(r"^\s*\[[^\[].*\]\s*$", lines[index]):
end = index
break
return start, end
def set_config_value(lines, start, end, key, value):
pattern = re.compile(rf"^(\s*{re.escape(key)}\s*=\s*).*$")
for index in range(start + 1, end):
if pattern.match(lines[index]):
lines[index] = f"{key} = {value}\n"
return 0
insert_at = end
lines.insert(insert_at, f"{key} = {value}\n")
return 1
def patch_ble_config(config_path, overrides):
if os.path.isfile(config_path):
with open(config_path, "r", encoding="utf-8") as config_file:
lines = config_file.readlines()
else:
lines = [
"[reticulum]\n",
"enable_transport = No\n",
"share_instance = Yes\n",
"\n",
"[[BLE Interface]]\n",
"type = BLEInterface\n",
"enabled = yes\n",
]
start, end = find_ble_interface_block(lines)
if start is None:
if lines and lines[-1].strip():
lines.append("\n")
start = len(lines)
lines.extend([
"[[BLE Interface]]\n",
"type = BLEInterface\n",
"enabled = yes\n",
])
end = len(lines)
added = 0
for key, value in overrides.items():
added += set_config_value(lines, start, end + added, key, value)
with open(config_path, "w", encoding="utf-8") as config_file:
config_file.writelines(lines)
def runtime_config_dir(args, node_name):
global temporary_config_dir
overrides = requested_ble_overrides(args)
if not overrides:
return args.config
source_dir = base_config_dir(args.config)
temporary_config_dir = tempfile.mkdtemp(prefix=f"{APP_NAME}_{node_name}_")
if os.path.isdir(source_dir):
shutil.copytree(source_dir, temporary_config_dir, dirs_exist_ok=True, symlinks=True)
else:
os.makedirs(temporary_config_dir, exist_ok=True)
patch_ble_config(os.path.join(temporary_config_dir, "config"), overrides)
atexit.register(shutil.rmtree, temporary_config_dir, ignore_errors=True)
return temporary_config_dir
def configure_rns_loglevel(verbosity):
if verbosity is None:
return
levels = {
"critical": "LOG_CRITICAL",
"error": "LOG_ERROR",
"warning": "LOG_WARNING",
"notice": "LOG_NOTICE",
"info": "LOG_INFO",
"verbose": "LOG_VERBOSE",
"debug": "LOG_DEBUG",
"extreme": "LOG_EXTREME",
}
RNS.loglevel(getattr(RNS, levels[verbosity]))
>>>>>>> Stashed changes
def load_or_create_identity(path):
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
@ -210,6 +385,34 @@ def parse_args():
parser.add_argument("--announce-interval", type=int, default=30)
parser.add_argument("--send-interval", type=int, default=10)
parser.add_argument("--path-timeout", type=int, default=120)
<<<<<<< Updated upstream
=======
parser.add_argument(
"--ble-role",
choices=("central", "peripheral", "both"),
default=None,
help="Runtime BLE mode override: central scans/connects, peripheral advertises, both does both",
)
parser.add_argument(
"--enable-central",
"--enable-cental",
type=str_to_yes_no,
default=None,
help="Runtime override for BLEInterface enable_central",
)
parser.add_argument(
"--enable-peripheral",
type=str_to_yes_no,
default=None,
help="Runtime override for BLEInterface enable_peripheral",
)
parser.add_argument(
"--verbosity",
choices=("critical", "error", "warning", "notice", "info", "verbose", "debug", "extreme"),
default=None,
help="Reticulum log verbosity for this run",
)
>>>>>>> Stashed changes
return parser.parse_args()
@ -218,6 +421,7 @@ if __name__ == "__main__":
signal.signal(signal.SIGTERM, stop)
args = parse_args()
<<<<<<< Updated upstream
NODE_NAME = args.name or socket.gethostname().split(".")[0]
identity_path = args.identity or default_identity_path(args.config, NODE_NAME)
@ -225,6 +429,22 @@ if __name__ == "__main__":
log(f"Reticulum config: {args.config or '~/.reticulum'}")
reticulum = RNS.Reticulum(args.config)
=======
import RNS as rns
RNS = rns
NODE_NAME = args.name or socket.gethostname().split(".")[0]
identity_path = args.identity or default_identity_path(args.config, NODE_NAME)
reticulum_config = runtime_config_dir(args, NODE_NAME)
log(f"Starting node {NODE_NAME}")
log(f"Reticulum config: {args.config or '~/.reticulum'}")
if reticulum_config != args.config:
log(f"Runtime config: {reticulum_config}")
configure_rns_loglevel(args.verbosity)
reticulum = RNS.Reticulum(reticulum_config)
>>>>>>> Stashed changes
identity = load_or_create_identity(identity_path)
destination = RNS.Destination(