Implemented deferred multiprocessor stamp generation in the background

This commit is contained in:
Mark Qvist 2024-09-07 22:40:13 +02:00
commit 40eb014c91
2 changed files with 200 additions and 91 deletions

View file

@ -95,6 +95,7 @@ class LXMRouter:
self.delivery_per_transfer_limit = delivery_limit
self.enforce_ratchets = enforce_ratchets
self._enforce_stamps = enforce_stamps
self.pending_deferred_stamps = {}
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
@ -110,6 +111,7 @@ class LXMRouter:
self.cost_file_lock = threading.Lock()
self.ticket_file_lock = threading.Lock()
self.stamp_gen_lock = threading.Lock()
if identity == None:
identity = RNS.Identity()
@ -237,8 +239,7 @@ class LXMRouter:
if display_name != None:
def get_app_data():
return self.get_announce_app_data(delivery_destination)
return self.get_announce_app_data(delivery_destination.hash)
delivery_destination.set_default_app_data(get_app_data)
self.delivery_destinations[delivery_destination.hash] = delivery_destination
@ -540,6 +541,7 @@ class LXMRouter:
#######################################################
JOB_OUTBOUND_INTERVAL = 1
JOB_STAMPS_INTERVAL = 1
JOB_LINKS_INTERVAL = 1
JOB_TRANSIENT_INTERVAL = 60
JOB_STORE_INTERVAL = 120
@ -550,6 +552,9 @@ class LXMRouter:
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
self.process_outbound()
if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0:
threading.Thread(target=self.process_deferred_stamps, daemon=True).start()
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
self.clean_links()
@ -721,6 +726,14 @@ class LXMRouter:
return None
def get_outbound_ticket_expiry(self, destination_hash):
if destination_hash in self.available_tickets["outbound"]:
entry = self.available_tickets["outbound"][destination_hash]
if entry[0] > time.time():
return entry[0]
return None
def get_inbound_tickets(self, destination_hash):
now = time.time()
available_tickets = []
@ -916,6 +929,29 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not save available tickets to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def reload_available_tickets(self):
RNS.log("Reloading available tickets from storage", RNS.LOG_DEBUG)
try:
with self.ticket_file_lock:
with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file:
data = available_tickets_file.read()
self.available_tickets = msgpack.unpackb(data)
if not type(self.available_tickets) == dict:
RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
if not "outbound" in self.available_tickets:
RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["outbound"] = {}
if not "inbound" in self.available_tickets:
RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["inbound"] = {}
if not "last_deliveries" in self.available_tickets:
RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["last_deliveries"] = {}
except Exception as e:
RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)
def exit_handler(self):
if self.propagation_node:
try:
@ -1188,19 +1224,24 @@ class LXMRouter:
while self.processing_outbound:
time.sleep(0.1)
self.pending_outbound.append(lxmessage)
if lxmessage.defer_stamp and lxmessage.stamp_cost == None:
RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG)
lxmessage.defer_stamp = False
if not lxmessage.defer_stamp:
self.pending_outbound.append(lxmessage)
self.process_outbound()
else:
self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
def get_outbound_progress(self, lxm_hash):
for lxm in self.pending_outbound:
if lxm.hash == lxm_hash:
return lxm.progress
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].progress
return None
@ -1208,6 +1249,10 @@ class LXMRouter:
for lxm in self.pending_outbound:
if lxm.hash == lxm_hash:
return lxm.stamp_cost
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].stamp_cost
return None
@ -1616,13 +1661,51 @@ class LXMRouter:
def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
if lxmessage in self.pending_outbound:
self.pending_outbound.remove(lxmessage)
self.failed_outbound.append(lxmessage)
lxmessage.state = LXMessage.FAILED
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
def process_deferred_stamps(self):
if len(self.pending_deferred_stamps) > 0:
RNS.log(f"Processing deferred stamps...", RNS.LOG_DEBUG) # TODO: Remove
if self.stamp_gen_lock.locked():
RNS.log(f"A stamp is already generating, returning...", RNS.LOG_DEBUG) # TODO: Remove
return
else:
with self.stamp_gen_lock:
selected_lxm = None
selected_message_id = None
for message_id in self.pending_deferred_stamps:
lxmessage = self.pending_deferred_stamps[message_id]
if selected_lxm == None:
selected_lxm = lxmessage
selected_message_id = message_id
if selected_lxm != None:
RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
generated_stamp = selected_lxm.get_stamp()
if generated_stamp:
selected_lxm.stamp = generated_stamp
selected_lxm.defer_stamp = False
selected_lxm.packed = None
selected_lxm.pack()
self.pending_deferred_stamps.pop(selected_message_id)
self.pending_outbound.append(selected_lxm)
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
else:
RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
self.fail_message(selected_lxm)
def process_outbound(self, sender = None):
if self.processing_outbound:
return
@ -1641,14 +1724,6 @@ class LXMRouter:
self.pending_outbound.remove(lxmessage)
else:
RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
# Handle potentially deferred stamp generation
if lxmessage.defer_stamp and lxmessage.stamp == None:
RNS.log(f"Generating deferred stamp for {lxmessage} now", RNS.LOG_DEBUG)
lxmessage.stamp = lxmessage.get_stamp()
lxmessage.defer_stamp = False
lxmessage.packed = None
lxmessage.pack()
if lxmessage.progress == None or lxmessage.progress < 0.01:
lxmessage.progress = 0.01