Implemented delivery as single packets over links.

This commit is contained in:
Mark Qvist 2021-08-28 15:30:47 +02:00
commit 1e8ef437b9
2 changed files with 26 additions and 10 deletions

View file

@ -228,7 +228,7 @@ class LXMessage:
if self.desired_method == LXMessage.OPPORTUNISTIC:
if self.__destination.type == RNS.Destination.SINGLE:
single_packet_content_limit = LXMessage.RSA_PACKET_MAX_CONTENT
single_packet_content_limit = LXMessage.ENCRYPTED_PACKET_MAX_CONTENT
elif self.__destination.type == RNS.Destination.PLAIN:
single_packet_content_limit = LXMessage.PLAIN_PACKET_MAX_CONTENT
@ -256,9 +256,18 @@ class LXMessage:
if self.method == LXMessage.OPPORTUNISTIC:
self.__as_packet().send().set_delivery_callback(self.__mark_delivered)
self.state = LXMessage.SENT
elif self.method == LXMessage.DIRECT:
self.state = LXMessage.SENDING
self.resource_representation = self.__as_resource()
if self.representation == LXMessage.PACKET:
receipt = self.__as_packet().send()
receipt.set_delivery_callback(self.__mark_delivered)
receipt.set_timeout_callback(self.__link_packet_timed_out)
elif self.representation == LXMessage.RESOURCE:
self.resource_representation = self.__as_resource()
elif self.method == LXMessage.PROPAGATED:
# TODO: Implement propagation
pass
@ -309,6 +318,10 @@ class LXMessage:
resource.link.teardown()
self.state = LXMessage.OUTBOUND
def __link_packet_timed_out(self, packet_receipt):
packet_receipt.destination.teardown()
self.state = LXMessage.OUTBOUND
def __update_transfer_progress(self, resource):
self.progress = resource.progress()
@ -543,14 +556,14 @@ class LXMRouter:
def delivery_packet(self, data, packet):
try:
if packet.destination.type != RNS.Destination.LINK:
if packet.destination_type != RNS.Destination.LINK:
lxmf_data = b""
lxmf_data += packet.destination.hash
lxmf_data += data
else:
lxmf_data = data
if self.lxmf_delivery(lxmf_data, packet.destination.type):
if self.lxmf_delivery(lxmf_data, packet.destination_type):
packet.prove()
except Exception as e:
@ -617,7 +630,7 @@ class LXMRouter:
if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
else:
else:
RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
# Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
@ -647,7 +660,10 @@ class LXMRouter:
lxmessage.set_delivery_destination(direct_link)
lxmessage.send()
else:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif direct_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed", RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(None)