From 8bb21688e5f803805bac2ed1f21e65c77a8359b0 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sat, 22 Oct 2022 20:12:54 +0200 Subject: [PATCH] Added config parsing and announce jobs to lxmd --- LXMF/Utilities/lxmd.py | 175 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 164 insertions(+), 11 deletions(-) diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index b1267d7..010ba7a 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -22,39 +22,137 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +DEFFERED_JOBS_DELAY = 10 +JOBS_INTERVAL = 5 + import RNS import LXMF import argparse +import threading import time import os from LXMF._version import __version__ from RNS.vendor.configobj import ConfigObj + +configpath = None +ignoredpath = None +identitypath = None +storagedir = None +targetloglevel = None + identity = None lxmd_config = None message_router = None lxmf_destination = None active_configuration = {} +last_peer_announce = None +last_node_announce = None + def create_default_config(configpath): lxmd_config = ConfigObj(__default_lxmd_config__.splitlines()) lxmd_config.filename = configpath lxmd_config.write() def apply_config(): - # TODO: Apply configuration - active_configuration["display_name"] = "Anonymous Peer" - active_configuration["enable_propagation_node"] = True - active_configuration["message_storage_limit"] = 2000 - active_configuration["prioritised_lxmf_destinations"] = [] - active_configuration["ignored_lxmf_destinations"] = [] + global active_configuration, targetloglevel + try: + # Load peer settings + if "lxmf" in lxmd_config and "display_name" in lxmd_config["lxmf"]: + active_configuration["display_name"] = lxmd_config["lxmf"]["display_name"] + else: + active_configuration["display_name"] = "Anonymous Peer" + + if "lxmf" in lxmd_config and "announce_at_start" in lxmd_config["lxmf"]: + active_configuration["peer_announce_at_start"] = lxmd_config["lxmf"].as_bool("announce_at_start") + else: + active_configuration["peer_announce_at_start"] = False + + if "lxmf" in lxmd_config and "announce_interval" in lxmd_config["lxmf"]: + active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60 + else: + active_configuration["peer_announce_interval"] = None + + if "lxmf" in lxmd_config and "on_inbound" in lxmd_config["lxmf"]: + active_configuration["on_inbound"] = lxmd_config["lxmf"]["on_inbound"] + else: + active_configuration["on_inbound"] = None + + # Load propagation node settings + if "propagation" in lxmd_config and "enable_node" in lxmd_config["propagation"]: + active_configuration["enable_propagation_node"] = lxmd_config["propagation"].as_bool("enable_node") + else: + active_configuration["enable_propagation_node"] = False + + if "propagation" in lxmd_config and "announce_at_start" in lxmd_config["propagation"]: + active_configuration["node_announce_at_start"] = lxmd_config["propagation"].as_bool("announce_at_start") + else: + active_configuration["node_announce_at_start"] = False + + if "propagation" in lxmd_config and "announce_interval" in lxmd_config["propagation"]: + active_configuration["node_announce_interval"] = lxmd_config["propagation"].as_int("announce_interval")*60 + else: + active_configuration["node_announce_interval"] = None + + if "propagation" in lxmd_config and "message_storage_limit" in lxmd_config["propagation"]: + active_configuration["message_storage_limit"] = lxmd_config["propagation"].as_float("message_storage_limit") + if active_configuration["message_storage_limit"] < 0.005: + active_configuration["message_storage_limit"] = 0.005 + else: + active_configuration["message_storage_limit"] = 2000 + + if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: + active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") + else: + active_configuration["prioritised_lxmf_destinations"] = [] + + # Load various settings + if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]: + targetloglevel = lxmd_config["logging"].as_int("loglevel") + + active_configuration["ignored_lxmf_destinations"] = [] + if os.path.isfile(ignoredpath): + try: + fh = open(ignoredpath, "rb") + ignored_input = fh.read() + fh.close() + + ignored_hash_strs = ignored_input.splitlines() + + for hash_str in ignored_hash_strs: + if len(hash_str) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2: + try: + ignored_hash = bytes.fromhex(hash_str.decode("utf-8")) + active_configuration["ignored_lxmf_destinations"].append(ignored_hash) + + except Exception as e: + RNS.log("Could not decode RNS Identity hash from: "+str(hash_str), RNS.LOG_DEBUG) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + + except Exception as e: + RNS.log("Error while loading list of ignored destinations: "+str(e), RNS.LOG_ERROR) + + except Exception as e: + RNS.log("Could not apply LXM Daemon configuration. The contained exception was: "+str(e), RNS.LOG_ERROR) + raise e + exit(3) def lxmf_delivery(lxm): - # TODO: Implement delivery callback - pass + global active_configuration + RNS.log("Received "+str(lxm), RNS.LOG_DEBUG) + if active_configuration["on_inbound"]: + RNS.log("Calling external program to handle message", RNS.LOG_DEBUG) + else: + RNS.log("No action defined for inbound messages, ignoring", RNS.LOG_EXTREME) + + +def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbound = None, verbosity = 0, quietness = 0, service = False): + global configpath, ignoredpath, identitypath, storagedir + global lxmd_config, active_configuration, targetloglevel + global message_router, lxmf_destination -def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0, service = False): targetloglevel = 3+verbosity-quietness if service: @@ -73,6 +171,7 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0, configdir = RNS.Reticulum.userdir+"/.lxmd" configpath = configdir+"/config" + ignoredpath = configdir+"/ignored" identitypath = configdir+"/identity" storagedir = configdir+"/storage" @@ -107,7 +206,7 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0, RNS.log("Loaded Primary Identity %s" % (str(identity))) else: RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR) - nomadnet.panic() + exit(4) except Exception as e: RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR) RNS.log("The contained exception was: %s" % (str(e)), RNS.LOG_ERROR) @@ -141,7 +240,7 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0, RNS.log("LXMF Router ready to receive on "+RNS.prettyhexrep(lxmf_destination.hash)) - if active_configuration["enable_propagation_node"]: + if run_pn or active_configuration["enable_propagation_node"]: message_router.set_message_storage_limit(megabytes=active_configuration["message_storage_limit"]) for dest_str in active_configuration["prioritised_lxmf_destinations"]: try: @@ -158,9 +257,49 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0, RNS.log("Started lxmd version {version}".format(version=__version__), RNS.LOG_NOTICE) + threading.Thread(target=deferred_start_jobs, daemon=True).start() + while True: time.sleep(1) +def jobs(): + global active_configuration, last_peer_announce, last_node_announce + global message_router, lxmf_destination + + while True: + try: + if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]: + RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME) + message_router.announce(lxmf_destination.hash) + last_peer_announce = time.time() + + if time.time() > last_node_announce + active_configuration["node_announce_interval"]: + RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME) + message_router.announce_propagation_node() + last_node_announce = time.time() + + except Exception as e: + RNS.log("An error occurred while running periodic jobs. The contained exception was: "+str(e), RNS.LOG_ERROR) + + time.sleep(JOBS_INTERVAL) + +def deferred_start_jobs(): + global active_configuration, last_peer_announce, last_node_announce + global message_router, lxmf_destination + time.sleep(DEFFERED_JOBS_DELAY) + RNS.log("Running deferred start jobs") + if active_configuration["peer_announce_at_start"]: + RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME) + message_router.announce(lxmf_destination.hash) + + if active_configuration["node_announce_at_start"]: + RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME) + message_router.announce_propagation_node() + + last_peer_announce = time.time() + last_node_announce = time.time() + threading.Thread(target=jobs, daemon=True).start() + def main(): try: parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon") @@ -184,6 +323,7 @@ def main(): configdir = args.config, rnsconfigdir=args.rnsconfig, run_pn=args.propagation_node, + on_inbound=args.on_inbound, verbosity=args.verbose, quietness=args.quiet, service=args.service @@ -245,6 +385,19 @@ display_name = Anonymous Peer # destination when the LXM Daemon starts up. announce_at_start = no +# You can also announce the delivery destination +# at a specified interval. This is not enabled by +# default. +# announce_interval = 360 + +# You can configure an external program to be run +# every time a message is received. The program +# will receive as an argument the full path to the +# message saved as a file. The example below will +# simply result in the message getting deleted as +# soon as it has been received. +# on_inbound = rm + [logging] # Valid log levels are 0 through 7: