diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index 0718478..0cfdfa7 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -41,6 +41,10 @@ class LXMRouter: PR_ALL_MESSAGES = 0x00 + + ### Developer-facing API ############################## + ####################################################### + def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH): random.seed(os.urandom(10)) @@ -110,34 +114,20 @@ class LXMRouter: job_thread.setDaemon(True) job_thread.start() - def exit_handler(self): - if self.propagation_node: - try: - serialised_peers = [] - for peer_id in self.peers: - peer = self.peers[peer_id] - serialised_peers.append(peer.to_bytes()) + def announce(self, destination_hash): + if destination_hash in self.delivery_destinations: + delivery_destination = self.delivery_destinations[destination_hash] + delivery_destination.announce(delivery_destination.display_name.encode("utf-8")) - peers_file = open(self.storagepath+"/peers", "wb") - peers_file.write(msgpack.packb(serialised_peers)) - peers_file.close() - - RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG) - - except Exception as e: - RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) - - try: - if not os.path.isdir(self.storagepath): - os.makedirs(self.storagepath) - - locally_delivered_file = open(self.storagepath+"/local_deliveries", "wb") - locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) - locally_delivered_file.close() - - except Exception as e: - RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def announce_propagation_node(self): + def delayed_announce(): + time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) + data = msgpack.packb([self.propagation_node, int(time.time())]) + self.propagation_destination.announce(app_data=data) + da_thread = threading.Thread(target=delayed_announce) + da_thread.setDaemon(True) + da_thread.start() def register_delivery_identity(self, identity, display_name = None): delivery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery") @@ -167,20 +157,6 @@ class LXMRouter: def get_outbound_propagation_node(self): return self.outbound_propagation_node - def cancel_propagation_node_requests(self): - if self.outbound_propagation_link != None: - self.outbound_propagation_link.teardown() - self.outbound_propagation_link = None - - self.acknowledge_sync_completion() - - def acknowledge_sync_completion(self): - self.propagation_transfer_state = LXMRouter.PR_IDLE - self.propagation_transfer_progress = 0.0 - self.propagation_transfer_last_result = None - self.wants_download_on_path_available_from = None - self.wants_download_on_path_available_to = None - def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES): if max_messages == None: max_messages = LXMRouter.PR_ALL_MESSAGES @@ -224,181 +200,13 @@ class LXMRouter: else: RNS.log("Cannot request LXMF propagation node sync, no default propagation node configured", RNS.LOG_WARNING) - - def request_messages_path_job(self): - job_thread = threading.Thread(target=self.__request_messages_path_job) - job_thread.setDaemon(True) - job_thread.start() - - def __request_messages_path_job(self): - while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < self.wants_download_on_path_available_timeout: - time.sleep(0.1) - - if RNS.Transport.has_path(self.wants_download_on_path_available_from): - self.request_messages_from_propagation_node(self.wants_download_on_path_available_to, self.propagation_transfer_max_messages) - else: - RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG) - self.acknowledge_sync_completion() - - - def has_message(self, transient_id): - if transient_id in self.locally_delivered_transient_ids: - return True - else: - return False - - def message_get_failed(self, request_receipt): - RNS.log("Message list/get request failed", RNS.LOG_DEBUG) + def cancel_propagation_node_requests(self): if self.outbound_propagation_link != None: self.outbound_propagation_link.teardown() + self.outbound_propagation_link = None - def message_list_response(self, request_receipt): - if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: - RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG) - if self.outbound_propagation_link != None: - self.outbound_propagation_link.teardown() - else: - if request_receipt.response != None: - haves = [] - wants = [] - if len(request_receipt.response) > 0: - for transient_id in request_receipt.response: - if self.has_message(transient_id): - haves.append(transient_id) - else: - if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages: - wants.append(transient_id) + self.acknowledge_sync_completion() - request_receipt.link.request( - LXMPeer.MESSAGE_GET_PATH, - [wants, haves], - response_callback=self.message_get_response, - failed_callback=self.message_get_failed, - progress_callback=self.message_get_progress - ) - else: - self.propagation_transfer_state = LXMRouter.PR_COMPLETE - self.propagation_transfer_progress = 1.0 - self.propagation_transfer_last_result = 0 - - def message_get_progress(self, request_receipt): - self.propagation_transfer_state = LXMRouter.PR_RECEIVING - self.propagation_transfer_progress = request_receipt.get_progress() - - def message_get_response(self, request_receipt): - if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: - RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG) - if self.outbound_propagation_link != None: - self.outbound_propagation_link.teardown() - else: - if request_receipt.response != None and len(request_receipt.response) > 0: - haves = [] - for lxmf_data in request_receipt.response: - self.lxmf_propagation(lxmf_data) - haves.append(RNS.Identity.full_hash(lxmf_data)) - - # Return a list of successfully received messages to the node. - # This deletes the messages on the propagation node. - # TODO: Add option to keep messages on node. - request_receipt.link.request( - LXMPeer.MESSAGE_GET_PATH, - [None, haves], - # response_callback=self.message_syncfinal_response, - failed_callback=self.message_get_failed, - # progress_callback=self.message_get_progress - ) - - self.propagation_transfer_state = LXMRouter.PR_COMPLETE - self.propagation_transfer_progress = 1.0 - self.propagation_transfer_last_result = len(request_receipt.response) - - def announce(self, destination_hash): - if destination_hash in self.delivery_destinations: - delivery_destination = self.delivery_destinations[destination_hash] - delivery_destination.announce(delivery_destination.display_name.encode("utf-8")) - - def handle_outbound(self, lxmessage): - lxmessage.state = LXMessage.OUTBOUND - if not lxmessage.packed: - lxmessage.pack() - - lxmessage.determine_transport_encryption() - - while self.processing_outbound: - time.sleep(0.1) - - self.pending_outbound.append(lxmessage) - self.process_outbound() - - def lxmf_delivery(self, lxmf_data, destination_type = None): - try: - message = LXMessage.unpack_from_bytes(lxmf_data) - - if destination_type == RNS.Destination.SINGLE: - message.transport_encrypted = True - message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC - elif destination_type == RNS.Destination.GROUP: - message.transport_encrypted = True - message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES - elif destination_type == RNS.Destination.LINK: - message.transport_encrypted = True - message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC - else: - message.transport_encrypted = False - message.transport_encryption = None - - if message.source_hash in self.ignored_list: - RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) - return False - - if self.__delivery_callback != None and callable(self.__delivery_callback): - try: - self.__delivery_callback(message) - except Exception as e: - RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR) - - return True - - except Exception as e: - RNS.log("Could not assemble LXMF message from received data", RNS.LOG_NOTICE) - RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) - return False - - - def delivery_packet(self, data, packet): - try: - if packet.destination_type != RNS.Destination.LINK: - lxmf_data = b"" - lxmf_data += packet.destination.hash - lxmf_data += data - else: - lxmf_data = data - - if self.lxmf_delivery(lxmf_data, packet.destination_type): - packet.prove() - - except Exception as e: - RNS.log("Exception occurred while parsing incoming LXMF data.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - - def delivery_link_established(self, link): - link.set_packet_callback(self.delivery_packet) - link.set_resource_strategy(RNS.Link.ACCEPT_ALL) - link.set_resource_started_callback(self.resource_transfer_began) - link.set_resource_concluded_callback(self.delivery_resource_concluded) - - def delivery_link_closed(self, link): - pass - - def resource_transfer_began(self, resource): - RNS.log("Transfer began for resource "+str(resource), RNS.LOG_DEBUG) - - def delivery_resource_concluded(self, resource): - RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG) - if resource.status == RNS.Resource.COMPLETE: - self.lxmf_delivery(resource.data.read(), resource.link.type) - - def enable_propagation(self): try: self.messagepath = self.storagepath+"/messagestore" @@ -459,41 +267,160 @@ class LXMRouter: self.propagation_node = False self.announce_propagation_node() - def announce_propagation_node(self): - def delayed_announce(): - time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) - data = msgpack.packb([self.propagation_node, int(time.time())]) - self.propagation_destination.announce(app_data=data) + def ignore_destination(self, destination_hash): + if not destination_hash in self.ignored_list: + self.ignored_list.append(destination_hash) - da_thread = threading.Thread(target=delayed_announce) - da_thread.setDaemon(True) - da_thread.start() + def unignore_destination(self, destination_hash): + if destination_hash in self.ignored_list: + self.ignored_list.remove(destination_hash) - def offer_request(self, path, data, request_id, remote_identity, requested_at): - if remote_identity == None: - return LXMPeer.ERROR_NO_IDENTITY - else: + + ### Utility & Maintenance ############################# + ####################################################### + + def jobloop(self): + while (True): + # TODO: Improve this to scheduling, so manual + # triggers can delay next run + self.jobs() + time.sleep(LXMRouter.PROCESSING_INTERVAL) + + JOB_OUTBOUND_INTERVAL = 1 + JOB_LINKS_INTERVAL = 1 + JOB_TRANSIENT_INTERVAL = 60 + JOB_STORE_INTERVAL = 120 + JOB_PEERSYNC_INTERVAL = 12 + def jobs(self): + self.processing_count += 1 + + if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: + self.process_outbound() + + if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: + self.clean_links() + + if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: + self.clean_transient_id_cache() + + if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: + self.clean_message_store() + + if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: + self.sync_peers() + + def clean_links(self): + closed_links = [] + for link_hash in self.direct_links: + link = self.direct_links[link_hash] + inactive_time = link.inactive_for() + + if inactive_time > LXMRouter.LINK_MAX_INACTIVITY: + link.teardown() + closed_links.append(link_hash) + + for link_hash in closed_links: + cleaned_link = self.direct_links.pop(link_hash) + RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG) + + if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED: + self.outbound_propagation_link = None + self.acknowledge_sync_completion() + RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG) + + def clean_transient_id_cache(self): + now = time.time() + removed_entries = [] + for transient_id in self.locally_delivered_transient_ids: + timestamp = self.locally_delivered_transient_ids[transient_id] + if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.1: + removed_entries.append(transient_id) + + for transient_id in removed_entries: + self.locally_delivered_transient_ids.pop(transient_id) + RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG) + + def clean_message_store(self): + now = time.time() + removed_entries = {} + for transient_id in self.propagation_entries: + entry = self.propagation_entries[transient_id] + filepath = entry[1] + components = filepath.split("_") + + if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2: + timestamp = float(components[1]) + if now > timestamp+LXMRouter.MESSAGE_EXPIRY: + RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG) + removed_entries[transient_id] = filepath + else: + RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) + removed_entries[transient_id] = filepath + + removed_count = 0 + for transient_id in removed_entries: try: - transient_ids = data - wanted_ids = [] + filepath = removed_entries[transient_id] + self.propagation_entries.pop(transient_id) + if os.path.isfile(filepath): + os.unlink(filepath) + removed_count += 1 + except Exception as e: + RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) - for transient_id in transient_ids: - if not transient_id in self.propagation_entries: - wanted_ids.append(transient_id) + if removed_count > 0: + RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG) - if len(wanted_ids) == 0: - return False + def exit_handler(self): + if self.propagation_node: + try: + serialised_peers = [] + for peer_id in self.peers: + peer = self.peers[peer_id] + serialised_peers.append(peer.to_bytes()) - elif len(wanted_ids) == len(transient_ids): - return True + peers_file = open(self.storagepath+"/peers", "wb") + peers_file.write(msgpack.packb(serialised_peers)) + peers_file.close() - else: - return wanted_ids + RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG) except Exception as e: - RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) - return None + RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + try: + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + locally_delivered_file = open(self.storagepath+"/local_deliveries", "wb") + locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids)) + locally_delivered_file.close() + + except Exception as e: + RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + + def __str__(self): + return "" + + + ### Message Download ################################## + ####################################################### + + def request_messages_path_job(self): + job_thread = threading.Thread(target=self.__request_messages_path_job) + job_thread.setDaemon(True) + job_thread.start() + + def __request_messages_path_job(self): + while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < self.wants_download_on_path_available_timeout: + time.sleep(0.1) + + if RNS.Transport.has_path(self.wants_download_on_path_available_from): + self.request_messages_from_propagation_node(self.wants_download_on_path_available_to, self.propagation_transfer_max_messages) + else: + RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG) + self.acknowledge_sync_completion() + def message_get_request(self, path, data, request_id, remote_identity, requested_at): if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY @@ -559,13 +486,228 @@ class LXMRouter: RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG) return None + def message_list_response(self, request_receipt): + if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: + RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG) + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + else: + if request_receipt.response != None: + haves = [] + wants = [] + if len(request_receipt.response) > 0: + for transient_id in request_receipt.response: + if self.has_message(transient_id): + haves.append(transient_id) + else: + if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages: + wants.append(transient_id) + + request_receipt.link.request( + LXMPeer.MESSAGE_GET_PATH, + [wants, haves], + response_callback=self.message_get_response, + failed_callback=self.message_get_failed, + progress_callback=self.message_get_progress + ) + else: + self.propagation_transfer_state = LXMRouter.PR_COMPLETE + self.propagation_transfer_progress = 1.0 + self.propagation_transfer_last_result = 0 + + def message_get_response(self, request_receipt): + if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY: + RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG) + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + else: + if request_receipt.response != None and len(request_receipt.response) > 0: + haves = [] + for lxmf_data in request_receipt.response: + self.lxmf_propagation(lxmf_data) + haves.append(RNS.Identity.full_hash(lxmf_data)) + + # Return a list of successfully received messages to the node. + # This deletes the messages on the propagation node. + # TODO: Add option to keep messages on node. + request_receipt.link.request( + LXMPeer.MESSAGE_GET_PATH, + [None, haves], + # response_callback=self.message_syncfinal_response, + failed_callback=self.message_get_failed, + # progress_callback=self.message_get_progress + ) + + self.propagation_transfer_state = LXMRouter.PR_COMPLETE + self.propagation_transfer_progress = 1.0 + self.propagation_transfer_last_result = len(request_receipt.response) + + def message_get_progress(self, request_receipt): + self.propagation_transfer_state = LXMRouter.PR_RECEIVING + self.propagation_transfer_progress = request_receipt.get_progress() + + def message_get_failed(self, request_receipt): + RNS.log("Message list/get request failed", RNS.LOG_DEBUG) + if self.outbound_propagation_link != None: + self.outbound_propagation_link.teardown() + + def acknowledge_sync_completion(self): + self.propagation_transfer_state = LXMRouter.PR_IDLE + self.propagation_transfer_progress = 0.0 + self.propagation_transfer_last_result = None + self.wants_download_on_path_available_from = None + self.wants_download_on_path_available_to = None + + def has_message(self, transient_id): + if transient_id in self.locally_delivered_transient_ids: + return True + else: + return False + + + ### Message Routing & Delivery ######################## + ####################################################### + + def handle_outbound(self, lxmessage): + lxmessage.state = LXMessage.OUTBOUND + if not lxmessage.packed: + lxmessage.pack() + + lxmessage.determine_transport_encryption() + + while self.processing_outbound: + time.sleep(0.1) + + self.pending_outbound.append(lxmessage) + self.process_outbound() + + def lxmf_delivery(self, lxmf_data, destination_type = None): + try: + message = LXMessage.unpack_from_bytes(lxmf_data) + + if destination_type == RNS.Destination.SINGLE: + message.transport_encrypted = True + message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC + elif destination_type == RNS.Destination.GROUP: + message.transport_encrypted = True + message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES + elif destination_type == RNS.Destination.LINK: + message.transport_encrypted = True + message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC + else: + message.transport_encrypted = False + message.transport_encryption = None + + if message.source_hash in self.ignored_list: + RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG) + return False + + if self.__delivery_callback != None and callable(self.__delivery_callback): + try: + self.__delivery_callback(message) + except Exception as e: + RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR) + + return True + + except Exception as e: + RNS.log("Could not assemble LXMF message from received data", RNS.LOG_NOTICE) + RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) + return False + + def delivery_packet(self, data, packet): + try: + if packet.destination_type != RNS.Destination.LINK: + lxmf_data = b"" + lxmf_data += packet.destination.hash + lxmf_data += data + else: + lxmf_data = data + + if self.lxmf_delivery(lxmf_data, packet.destination_type): + packet.prove() + + except Exception as e: + RNS.log("Exception occurred while parsing incoming LXMF data.", RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + + def delivery_link_established(self, link): + link.set_packet_callback(self.delivery_packet) + link.set_resource_strategy(RNS.Link.ACCEPT_ALL) + link.set_resource_started_callback(self.resource_transfer_began) + link.set_resource_concluded_callback(self.delivery_resource_concluded) + + def delivery_link_closed(self, link): + pass + + def resource_transfer_began(self, resource): + RNS.log("Transfer began for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG) + + def delivery_resource_concluded(self, resource): + RNS.log("Transfer concluded for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG) + if resource.status == RNS.Resource.COMPLETE: + self.lxmf_delivery(resource.data.read(), resource.link.type) + + + ### Peer Sync & Propagation ########################### + ####################################################### + + def peer(self, destination_hash, timestamp): + if destination_hash in self.peers: + peer = self.peers[destination_hash] + peer.alive = True + peer.peering_timebase = timestamp + peer.last_heard = time.time() + else: + peer = LXMPeer(self, destination_hash) + peer.alive = True + peer.last_heard = time.time() + self.peers[destination_hash] = peer + RNS.log("Peered with "+str(peer.destination)) + + def unpeer(self, destination_hash, timestamp = None): + if timestamp == None: + timestamp = int(time.time()) + + if destination_hash in self.peers: + peer = self.peers[destination_hash] + + if timestamp >= peer.peering_timebase: + self.peers.pop(destination_hash) + RNS.log("Broke peering with "+str(peer.destination)) + + def sync_peers(self): + culled_peers = [] + waiting_peers = [] + for peer_id in self.peers: + peer = self.peers[peer_id] + if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: + culled_peers.append(peer_id) + else: + if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: + waiting_peers.append(peer) + + if len(waiting_peers) > 0: + RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG) + selected_index = random.randint(0,len(waiting_peers)-1) + selected_peer = waiting_peers[selected_index] + RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG) + selected_peer.sync() + + for peer_id in culled_peers: + RNS.log("Removing peer "+RNS.prettyhexrep(peer_id)+" due to excessive unreachability", RNS.LOG_WARNING) + try: + if peer_id in self.peers: + self.peers.pop(peer_id) + except Exception as e: + RNS.log("Error while removing peer "+RNS.prettyhexrep(peer_id)+". The contained exception was: "+str(e), RNS.LOG_ERROR) + def propagation_link_established(self, link): link.set_packet_callback(self.propagation_packet) link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_started_callback(self.resource_transfer_began) link.set_resource_concluded_callback(self.propagation_resource_concluded) - def propagation_packet(self, data, packet): try: if packet.destination_type != RNS.Destination.LINK: @@ -584,6 +726,30 @@ class LXMRouter: RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + def offer_request(self, path, data, request_id, remote_identity, requested_at): + if remote_identity == None: + return LXMPeer.ERROR_NO_IDENTITY + else: + try: + transient_ids = data + wanted_ids = [] + + for transient_id in transient_ids: + if not transient_id in self.propagation_entries: + wanted_ids.append(transient_id) + + if len(wanted_ids) == 0: + return False + + elif len(wanted_ids) == len(transient_ids): + return True + + else: + return wanted_ids + + except Exception as e: + RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG) + return None def propagation_resource_concluded(self, resource): RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG) @@ -621,7 +787,6 @@ class LXMRouter: except Exception as e: RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) - def lxmf_propagation(self, lxmf_data): try: if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD: @@ -662,162 +827,6 @@ class LXMRouter: RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) return False - - def peer(self, destination_hash, timestamp): - if destination_hash in self.peers: - peer = self.peers[destination_hash] - peer.alive = True - peer.peering_timebase = timestamp - peer.last_heard = time.time() - else: - peer = LXMPeer(self, destination_hash) - peer.alive = True - peer.last_heard = time.time() - self.peers[destination_hash] = peer - RNS.log("Peered with "+str(peer.destination)) - - def unpeer(self, destination_hash, timestamp = None): - if timestamp == None: - timestamp = int(time.time()) - - if destination_hash in self.peers: - peer = self.peers[destination_hash] - - if timestamp >= peer.peering_timebase: - self.peers.pop(destination_hash) - RNS.log("Broke peering with "+str(peer.destination)) - - - def jobloop(self): - while (True): - # TODO: Improve this to scheduling, so manual - # triggers can delay next run - self.jobs() - time.sleep(LXMRouter.PROCESSING_INTERVAL) - - JOB_OUTBOUND_INTERVAL = 1 - JOB_LINKS_INTERVAL = 1 - JOB_TRANSIENT_INTERVAL = 60 - JOB_STORE_INTERVAL = 120 - JOB_PEERSYNC_INTERVAL = 12 - def jobs(self): - self.processing_count += 1 - - if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0: - self.process_outbound() - - if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0: - self.clean_links() - - if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0: - self.clean_transient_id_cache() - - if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0: - self.clean_message_store() - - if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0: - self.sync_peers() - - - def clean_links(self): - closed_links = [] - for link_hash in self.direct_links: - link = self.direct_links[link_hash] - inactive_time = link.inactive_for() - - if inactive_time > LXMRouter.LINK_MAX_INACTIVITY: - link.teardown() - closed_links.append(link_hash) - - for link_hash in closed_links: - cleaned_link = self.direct_links.pop(link_hash) - RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG) - - if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED: - self.outbound_propagation_link = None - self.acknowledge_sync_completion() - RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG) - - def clean_transient_id_cache(self): - now = time.time() - removed_entries = [] - for transient_id in self.locally_delivered_transient_ids: - timestamp = self.locally_delivered_transient_ids[transient_id] - if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.1: - removed_entries.append(transient_id) - - for transient_id in removed_entries: - self.locally_delivered_transient_ids.pop(transient_id) - RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG) - - - def clean_message_store(self): - now = time.time() - removed_entries = {} - for transient_id in self.propagation_entries: - entry = self.propagation_entries[transient_id] - filepath = entry[1] - components = filepath.split("_") - - if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2: - timestamp = float(components[1]) - if now > timestamp+LXMRouter.MESSAGE_EXPIRY: - RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG) - removed_entries[transient_id] = filepath - else: - RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING) - removed_entries[transient_id] = filepath - - removed_count = 0 - for transient_id in removed_entries: - try: - filepath = removed_entries[transient_id] - self.propagation_entries.pop(transient_id) - if os.path.isfile(filepath): - os.unlink(filepath) - removed_count += 1 - except Exception as e: - RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR) - - if removed_count > 0: - RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG) - - - def sync_peers(self): - culled_peers = [] - waiting_peers = [] - for peer_id in self.peers: - peer = self.peers[peer_id] - if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE: - culled_peers.append(peer_id) - else: - if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0: - waiting_peers.append(peer) - - if len(waiting_peers) > 0: - RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG) - selected_index = random.randint(0,len(waiting_peers)-1) - selected_peer = waiting_peers[selected_index] - RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG) - selected_peer.sync() - - for peer_id in culled_peers: - RNS.log("Removing peer "+RNS.prettyhexrep(peer_id)+" due to excessive unreachability", RNS.LOG_WARNING) - try: - if peer_id in self.peers: - self.peers.pop(peer_id) - except Exception as e: - RNS.log("Error while removing peer "+RNS.prettyhexrep(peer_id)+". The contained exception was: "+str(e), RNS.LOG_ERROR) - - - def ignore_destination(self, destination_hash): - if not destination_hash in self.ignored_list: - self.ignored_list.append(destination_hash) - - def unignore_destination(self, destination_hash): - if destination_hash in self.ignored_list: - self.ignored_list.remove(destination_hash) - def fail_message(self, lxmessage): RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG) @@ -828,9 +837,6 @@ class LXMRouter: if lxmessage.failed_callback != None and callable(lxmessage.failed_callback): lxmessage.failed_callback(lxmessage) - def __str__(self): - return "" - def process_outbound(self, sender = None): if self.processing_outbound: return