Implemented client-side propagation stamp generation and inclusion in outbound propagation messages
This commit is contained in:
parent
606a723e31
commit
704b37dc16
7 changed files with 228 additions and 90 deletions
|
|
@ -159,6 +159,7 @@ class LXMRouter:
|
|||
|
||||
self.identity = identity
|
||||
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
self.propagation_destination.set_default_app_data(self.get_propagation_node_app_data)
|
||||
self.control_destination = None
|
||||
self.client_propagation_messages_received = 0
|
||||
self.client_propagation_messages_served = 0
|
||||
|
|
@ -286,22 +287,24 @@ class LXMRouter:
|
|||
if destination_hash in self.delivery_destinations:
|
||||
self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface)
|
||||
|
||||
def get_propagation_node_app_data(self):
|
||||
node_state = self.propagation_node and not self.from_static_only
|
||||
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost]
|
||||
metadata = {}
|
||||
announce_data = [ False, # 0: Legacy LXMF PN support
|
||||
int(time.time()), # 1: Current node timebase
|
||||
node_state, # 2: Boolean flag signalling propagation node state
|
||||
self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes
|
||||
self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs
|
||||
stamp_cost, # 5: Propagation stamp cost for this node
|
||||
metadata ] # 6: Node metadata
|
||||
|
||||
return msgpack.packb(announce_data)
|
||||
|
||||
def announce_propagation_node(self):
|
||||
def delayed_announce():
|
||||
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
|
||||
node_state = self.propagation_node and not self.from_static_only
|
||||
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost]
|
||||
metadata = {}
|
||||
announce_data = [ False, # 0: Legacy LXMF PN support
|
||||
int(time.time()), # 1: Current node timebase
|
||||
node_state, # 2: Boolean flag signalling propagation node state
|
||||
self.propagation_per_transfer_limit, # 3: Per-transfer limit for message propagation in kilobytes
|
||||
self.propagation_per_sync_limit, # 4: Limit for incoming propagation node syncs
|
||||
stamp_cost, # 5: Propagation stamp cost for this node
|
||||
metadata ] # 6: Node metadata
|
||||
|
||||
data = msgpack.packb(announce_data)
|
||||
self.propagation_destination.announce(app_data=data)
|
||||
self.propagation_destination.announce(app_data=self.get_propagation_node_app_data())
|
||||
|
||||
da_thread = threading.Thread(target=delayed_announce)
|
||||
da_thread.setDaemon(True)
|
||||
|
|
@ -380,6 +383,29 @@ class LXMRouter:
|
|||
def get_outbound_propagation_node(self):
|
||||
return self.outbound_propagation_node
|
||||
|
||||
def get_outbound_propagation_cost(self):
|
||||
target_propagation_cost = None
|
||||
pn_destination_hash = self.get_outbound_propagation_node()
|
||||
pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash)
|
||||
if pn_announce_data_is_valid(pn_app_data):
|
||||
pn_config = msgpack.unpackb(pn_app_data)
|
||||
target_propagation_cost = pn_config[5][0]
|
||||
|
||||
if not target_propagation_cost:
|
||||
RNS.log(f"Could not retrieve cached propagation node config. Requesting path to propagation node to get target propagation cost...", RNS.LOG_DEBUG)
|
||||
RNS.Transport.request_path(pn_destination_hash)
|
||||
timeout = time.time() + LXMRouter.PATH_REQUEST_WAIT
|
||||
while not RNS.Identity.recall_app_data(pn_destination_hash) and time.time() < timeout:
|
||||
time.sleep(0.5)
|
||||
|
||||
pn_app_data = RNS.Identity.recall_app_data(pn_destination_hash)
|
||||
if pn_announce_data_is_valid(pn_app_data):
|
||||
pn_config = msgpack.unpackb(pn_app_data)
|
||||
target_propagation_cost = pn_config[5][0]
|
||||
|
||||
if not target_propagation_cost: RNS.log("Propagation node stamp cost still unavailable after path request", RNS.LOG_ERROR)
|
||||
return target_propagation_cost
|
||||
|
||||
def set_inbound_propagation_node(self, destination_hash):
|
||||
# TODO: Implement
|
||||
raise NotImplementedError("Inbound/outbound propagation node differentiation is currently not implemented")
|
||||
|
|
@ -1525,12 +1551,12 @@ class LXMRouter:
|
|||
else:
|
||||
return False
|
||||
|
||||
def cancel_outbound(self, message_id):
|
||||
def cancel_outbound(self, message_id, cancel_state=LXMessage.CANCELLED):
|
||||
try:
|
||||
if message_id in self.pending_deferred_stamps:
|
||||
lxm = self.pending_deferred_stamps[message_id]
|
||||
RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG)
|
||||
lxm.state = LXMessage.CANCELLED
|
||||
lxm.state = cancel_state
|
||||
LXStamper.cancel_work(message_id)
|
||||
|
||||
lxmessage = None
|
||||
|
|
@ -1539,7 +1565,7 @@ class LXMRouter:
|
|||
lxmessage = lxm
|
||||
|
||||
if lxmessage != None:
|
||||
lxmessage.state = LXMessage.CANCELLED
|
||||
lxmessage.state = cancel_state
|
||||
if lxmessage in self.pending_outbound:
|
||||
RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG)
|
||||
if lxmessage.representation == LXMessage.RESOURCE:
|
||||
|
|
@ -1574,11 +1600,9 @@ class LXMRouter:
|
|||
# destination to reply without generating a stamp.
|
||||
if lxmessage.include_ticket:
|
||||
ticket = self.generate_ticket(lxmessage.destination_hash)
|
||||
if ticket:
|
||||
lxmessage.fields[FIELD_TICKET] = ticket
|
||||
if ticket: lxmessage.fields[FIELD_TICKET] = ticket
|
||||
|
||||
if not lxmessage.packed:
|
||||
lxmessage.pack()
|
||||
if not lxmessage.packed: lxmessage.pack()
|
||||
|
||||
unknown_path_requested = False
|
||||
if not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC:
|
||||
|
|
@ -1593,16 +1617,13 @@ class LXMRouter:
|
|||
RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG)
|
||||
lxmessage.defer_stamp = False
|
||||
|
||||
if not lxmessage.defer_stamp:
|
||||
while not unknown_path_requested and self.processing_outbound:
|
||||
time.sleep(0.05)
|
||||
if not lxmessage.defer_stamp and not (lxmessage.desired_method == LXMessage.PROPAGATED and lxmessage.defer_propagation_stamp):
|
||||
while not unknown_path_requested and self.processing_outbound: time.sleep(0.05)
|
||||
|
||||
self.pending_outbound.append(lxmessage)
|
||||
if not unknown_path_requested:
|
||||
self.process_outbound()
|
||||
if not unknown_path_requested: self.process_outbound()
|
||||
|
||||
else:
|
||||
self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
|
||||
else: self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
|
||||
|
||||
def get_outbound_progress(self, lxm_hash):
|
||||
for lxm in self.pending_outbound:
|
||||
|
|
@ -1626,6 +1647,17 @@ class LXMRouter:
|
|||
|
||||
return None
|
||||
|
||||
def get_outbound_lxm_propagation_stamp_cost(self, lxm_hash):
|
||||
for lxm in self.pending_outbound:
|
||||
if lxm.hash == lxm_hash:
|
||||
return lxm.propagation_target_cost
|
||||
|
||||
for lxm_id in self.pending_deferred_stamps:
|
||||
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
|
||||
return self.pending_deferred_stamps[lxm_id].stamp_cost
|
||||
|
||||
return None
|
||||
|
||||
|
||||
### Message Routing & Delivery ########################
|
||||
#######################################################
|
||||
|
|
@ -2022,7 +2054,12 @@ class LXMRouter:
|
|||
self.lxmf_propagation(lxmf_data, stamp_value=stamp_value)
|
||||
self.client_propagation_messages_received += 1
|
||||
|
||||
if stamps_valid: packet.prove()
|
||||
if len(validated_messages) == len(messages): packet.prove()
|
||||
else:
|
||||
RNS.log("Propagation transfer from client contained messages with invalid stamps", RNS.LOG_NOTICE)
|
||||
reject_data = msgpack.packb([LXMPeer.ERROR_INVALID_STAMP])
|
||||
RNS.Packet(packet.link, reject_data).send()
|
||||
packet.link.teardown()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR)
|
||||
|
|
@ -2281,29 +2318,87 @@ class LXMRouter:
|
|||
|
||||
return
|
||||
|
||||
RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
|
||||
generated_stamp = selected_lxm.get_stamp()
|
||||
if generated_stamp:
|
||||
selected_lxm.stamp = generated_stamp
|
||||
selected_lxm.defer_stamp = False
|
||||
selected_lxm.packed = None
|
||||
selected_lxm.pack()
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
self.pending_outbound.append(selected_lxm)
|
||||
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
|
||||
else:
|
||||
if selected_lxm.state == LXMessage.CANCELLED:
|
||||
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
|
||||
selected_lxm.stamp_generation_failed = True
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
|
||||
selected_lxm.failed_callback(lxmessage)
|
||||
if selected_lxm.stamp == None: stamp_generation_success = False
|
||||
else: stamp_generation_success = True
|
||||
|
||||
if selected_lxm.desired_method == LXMessage.PROPAGATED:
|
||||
if selected_lxm.propagation_stamp == None: propagation_stamp_generation_success = False
|
||||
else: propagation_stamp_generation_success = True
|
||||
else: propagation_stamp_generation_success = True
|
||||
|
||||
if stamp_generation_success == False:
|
||||
RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
|
||||
generated_stamp = selected_lxm.get_stamp()
|
||||
if generated_stamp:
|
||||
selected_lxm.stamp = generated_stamp
|
||||
selected_lxm.defer_stamp = False
|
||||
selected_lxm.packed = None
|
||||
selected_lxm.pack()
|
||||
stamp_generation_success = True
|
||||
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
|
||||
else:
|
||||
RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
|
||||
if selected_lxm.state == LXMessage.CANCELLED:
|
||||
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
|
||||
selected_lxm.stamp_generation_failed = True
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
|
||||
selected_lxm.failed_callback(lxmessage)
|
||||
else:
|
||||
RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
|
||||
selected_lxm.stamp_generation_failed = True
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
self.fail_message(selected_lxm)
|
||||
|
||||
if propagation_stamp_generation_success == False:
|
||||
RNS.log(f"Starting propagation stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
|
||||
pn_target_cost = self.get_outbound_propagation_cost()
|
||||
if pn_target_cost == None:
|
||||
RNS.log("Failed to get propagation node stamp cost, cannot generate propagation stamp", RNS.LOG_ERROR)
|
||||
selected_lxm.stamp_generation_failed = True
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
self.fail_message(selected_lxm)
|
||||
|
||||
else:
|
||||
propagation_stamp = selected_lxm.get_propagation_stamp(target_cost=pn_target_cost)
|
||||
RNS.log(f"Generated propagation stamp: {RNS.hexrep(propagation_stamp)}")
|
||||
if propagation_stamp:
|
||||
selected_lxm.propagation_stamp = propagation_stamp
|
||||
selected_lxm.defer_propagation_stamp = False
|
||||
selected_lxm.packed = None
|
||||
selected_lxm.pack()
|
||||
propagation_stamp_generation_success = True
|
||||
RNS.log(f"Propagation stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
|
||||
else:
|
||||
if selected_lxm.state == LXMessage.CANCELLED:
|
||||
RNS.log(f"Message cancelled during deferred propagation stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
|
||||
selected_lxm.stamp_generation_failed = True
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
|
||||
selected_lxm.failed_callback(lxmessage)
|
||||
else:
|
||||
RNS.log(f"Deferred propagation stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
|
||||
selected_lxm.stamp_generation_failed = True
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
self.fail_message(selected_lxm)
|
||||
|
||||
if stamp_generation_success and propagation_stamp_generation_success:
|
||||
self.pending_deferred_stamps.pop(selected_message_id)
|
||||
self.pending_outbound.append(selected_lxm)
|
||||
|
||||
def propagation_transfer_signalling_packet(self, data, packet):
|
||||
try:
|
||||
unpacked = msgpack.unpackb(data)
|
||||
if type(unpacked) == list and len(unpacked) >= 1:
|
||||
signal = unpacked[0]
|
||||
if signal == LXMPeer.ERROR_INVALID_STAMP:
|
||||
RNS.log("Message rejected by propagation node", RNS.LOG_ERROR)
|
||||
if hasattr(packet, "link") and hasattr(packet.link, "for_lxmessage"):
|
||||
lxm = packet.link.for_lxmessage
|
||||
RNS.log(f"Invalid propagation stamp on {lxm}", RNS.LOG_ERROR)
|
||||
self.cancel_outbound(lxm.message_id, cancel_state=LXMessage.REJECTED)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"An error occurred while processing propagation transfer signalling. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
|
||||
def process_outbound(self, sender = None):
|
||||
if self.processing_outbound:
|
||||
|
|
@ -2347,7 +2442,7 @@ class LXMRouter:
|
|||
|
||||
elif lxmessage.state == LXMessage.REJECTED:
|
||||
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
||||
self.pending_outbound.remove(lxmessage)
|
||||
if lxmessage in self.pending_outbound: self.pending_outbound.remove(lxmessage)
|
||||
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
|
||||
lxmessage.failed_callback(lxmessage)
|
||||
|
||||
|
|
@ -2512,6 +2607,8 @@ class LXMRouter:
|
|||
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
|
||||
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
|
||||
self.outbound_propagation_link.set_packet_callback(self.propagation_transfer_signalling_packet)
|
||||
self.outbound_propagation_link.for_lxmessage = lxmessage
|
||||
else:
|
||||
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
|
||||
RNS.Transport.request_path(self.outbound_propagation_node)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue