Implemented ticket exchanges. Implemented stamp generation and validation by tickets.
This commit is contained in:
parent
74cbd11473
commit
62f5a9eead
4 changed files with 251 additions and 35 deletions
|
|
@ -9,6 +9,7 @@ import RNS
|
|||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
from .LXMF import APP_NAME
|
||||
from .LXMF import FIELD_TICKET
|
||||
|
||||
from .LXMPeer import LXMPeer
|
||||
from .LXMessage import LXMessage
|
||||
|
|
@ -105,8 +106,10 @@ class LXMRouter:
|
|||
self.locally_delivered_transient_ids = {}
|
||||
self.locally_processed_transient_ids = {}
|
||||
self.outbound_stamp_costs = {}
|
||||
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
|
||||
|
||||
self.cost_file_lock = threading.Lock()
|
||||
self.ticket_file_lock = threading.Lock()
|
||||
|
||||
if identity == None:
|
||||
identity = RNS.Identity()
|
||||
|
|
@ -164,6 +167,31 @@ class LXMRouter:
|
|||
except Exception as e:
|
||||
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
try:
|
||||
if os.path.isfile(self.storagepath+"/available_tickets"):
|
||||
with self.ticket_file_lock:
|
||||
with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file:
|
||||
data = available_tickets_file.read()
|
||||
self.available_tickets = msgpack.unpackb(data)
|
||||
if not type(self.available_tickets) == dict:
|
||||
RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR)
|
||||
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
|
||||
if not "outbound" in self.available_tickets:
|
||||
RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
|
||||
self.available_tickets["outbound"] = {}
|
||||
if not "inbound" in self.available_tickets:
|
||||
RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
|
||||
self.available_tickets["inbound"] = {}
|
||||
if not "last_deliveries" in self.available_tickets:
|
||||
RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
|
||||
self.available_tickets["last_deliveries"] = {}
|
||||
|
||||
self.clean_available_tickets()
|
||||
self.save_available_tickets()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
atexit.register(self.exit_handler)
|
||||
|
||||
job_thread = threading.Thread(target=self.jobloop)
|
||||
|
|
@ -191,6 +219,10 @@ class LXMRouter:
|
|||
da_thread.start()
|
||||
|
||||
def register_delivery_identity(self, identity, display_name = None, stamp_cost = None):
|
||||
if len(self.delivery_destinations) != 0:
|
||||
RNS.log("Currently only one delivery identity is supported per LXMF router instance", RNS.LOG_ERROR)
|
||||
return None
|
||||
|
||||
if not os.path.isdir(self.ratchetpath):
|
||||
os.makedirs(self.ratchetpath)
|
||||
|
||||
|
|
@ -647,6 +679,61 @@ class LXMRouter:
|
|||
|
||||
return weight
|
||||
|
||||
def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY):
|
||||
now = time.time()
|
||||
ticket = None
|
||||
if destination_hash in self.available_tickets["last_deliveries"]:
|
||||
last_delivery = self.available_tickets["last_deliveries"][destination_hash]
|
||||
elapsed = now - last_delivery
|
||||
if elapsed < LXMessage.TICKET_INTERVAL:
|
||||
RNS.log(f"A ticket for {RNS.prettyhexrep(destination_hash)} was already delivered {RNS.prettytime(elapsed)} ago, not including another ticket yet", RNS.LOG_DEBUG)
|
||||
return None
|
||||
|
||||
if destination_hash in self.available_tickets["inbound"]:
|
||||
for ticket in self.available_tickets["inbound"][destination_hash]:
|
||||
ticket_entry = self.available_tickets["inbound"][destination_hash][ticket]
|
||||
expires = ticket_entry[0]; validity_left = expires - now
|
||||
if validity_left > LXMessage.TICKET_RENEW:
|
||||
RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG)
|
||||
return [expires, ticket]
|
||||
|
||||
else:
|
||||
self.available_tickets["inbound"][destination_hash] = {}
|
||||
|
||||
RNS.log(f"No generated tickets for {RNS.prettyhexrep(destination_hash)} with enough validity found, generating a new one", RNS.LOG_DEBUG)
|
||||
expires = now+expiry
|
||||
ticket = os.urandom(LXMessage.TICKET_LENGTH)
|
||||
self.available_tickets["inbound"][destination_hash][ticket] = [expires]
|
||||
self.save_available_tickets()
|
||||
|
||||
return [expires, ticket]
|
||||
|
||||
def remember_ticket(self, destination_hash, ticket_entry):
|
||||
expires = ticket_entry[0]-time.time()
|
||||
RNS.log(f"Remembering ticket for {RNS.prettyhexrep(destination_hash)}, expires in {RNS.prettytime(expires)}", RNS.LOG_DEBUG)
|
||||
self.available_tickets["outbound"][destination_hash] = [ticket_entry[0], ticket_entry[1]]
|
||||
|
||||
def get_outbound_ticket(self, destination_hash):
|
||||
if destination_hash in self.available_tickets["outbound"]:
|
||||
entry = self.available_tickets["outbound"][destination_hash]
|
||||
if entry[0] > time.time():
|
||||
return entry[1]
|
||||
|
||||
return None
|
||||
|
||||
def get_inbound_tickets(self, destination_hash):
|
||||
now = time.time()
|
||||
available_tickets = []
|
||||
if destination_hash in self.available_tickets["inbound"]:
|
||||
for inbound_ticket in self.available_tickets["inbound"][destination_hash]:
|
||||
if now < self.available_tickets["inbound"][destination_hash][inbound_ticket][0]:
|
||||
available_tickets.append(inbound_ticket)
|
||||
|
||||
if len(available_tickets) == 0:
|
||||
return None
|
||||
else:
|
||||
return available_tickets
|
||||
|
||||
def get_size(self, transient_id):
|
||||
lxm_size = self.propagation_entries[transient_id][3]
|
||||
return lxm_size
|
||||
|
|
@ -778,13 +865,57 @@ class LXMRouter:
|
|||
if not os.path.isdir(self.storagepath):
|
||||
os.makedirs(self.storagepath)
|
||||
|
||||
locally_processed_file = open(self.storagepath+"/outbound_stamp_costs", "wb")
|
||||
locally_processed_file.write(msgpack.packb(self.outbound_stamp_costs))
|
||||
locally_processed_file.close()
|
||||
outbound_stamp_costs_file = open(self.storagepath+"/outbound_stamp_costs", "wb")
|
||||
outbound_stamp_costs_file.write(msgpack.packb(self.outbound_stamp_costs))
|
||||
outbound_stamp_costs_file.close()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def clean_available_tickets(self):
|
||||
try:
|
||||
# Clean outbound tickets
|
||||
expired_outbound = []
|
||||
for destination_hash in self.available_tickets["outbound"]:
|
||||
entry = self.available_tickets["outbound"][destination_hash]
|
||||
if time.time() > entry[0]:
|
||||
expired_outbound.append(destination_hash)
|
||||
|
||||
for destination_hash in expired_outbound:
|
||||
RNS.log(f"Cleaning expired outbound ticket for {destination_hash}") # TODO: Remove
|
||||
self.available_tickets["outbound"].pop(destination_hash)
|
||||
|
||||
# Clean inbound tickets
|
||||
for destination_hash in self.available_tickets["inbound"]:
|
||||
expired_inbound = []
|
||||
for inbound_ticket in self.available_tickets["inbound"][destination_hash]:
|
||||
entry = self.available_tickets["inbound"][destination_hash][inbound_ticket]
|
||||
ticket_expiry = entry[0]
|
||||
if time.time() > ticket_expiry+LXMessage.TICKET_GRACE:
|
||||
expired_inbound.append(inbound_ticket)
|
||||
|
||||
for inbound_ticket in expired_inbound:
|
||||
RNS.log(f"Cleaning expired inbound ticket for {destination_hash}") # TODO: Remove
|
||||
self.available_tickets["inbound"][destination_hash].pop(destination_hash)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
def save_available_tickets(self):
|
||||
with self.ticket_file_lock:
|
||||
try:
|
||||
RNS.log("Saving available tickets...", RNS.LOG_DEBUG) # TODO: Remove
|
||||
if not os.path.isdir(self.storagepath):
|
||||
os.makedirs(self.storagepath)
|
||||
|
||||
available_tickets_file = open(self.storagepath+"/available_tickets", "wb")
|
||||
available_tickets_file.write(msgpack.packb(self.available_tickets))
|
||||
available_tickets_file.close()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Could not save available tickets to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def exit_handler(self):
|
||||
if self.propagation_node:
|
||||
try:
|
||||
|
|
@ -1026,14 +1157,29 @@ class LXMRouter:
|
|||
return False
|
||||
|
||||
def handle_outbound(self, lxmessage):
|
||||
destination_hash = lxmessage.get_destination().hash
|
||||
if lxmessage.stamp_cost == None:
|
||||
destination_hash = lxmessage.get_destination().hash
|
||||
if destination_hash in self.outbound_stamp_costs:
|
||||
stamp_cost = self.outbound_stamp_costs[destination_hash][1]
|
||||
lxmessage.stamp_cost = stamp_cost
|
||||
RNS.log(f"No stamp cost set on LXM to {RNS.prettyhexrep(destination_hash)}, autoconfigured to {stamp_cost}, as required by latest announce", RNS.LOG_DEBUG)
|
||||
|
||||
lxmessage.state = LXMessage.OUTBOUND
|
||||
|
||||
# If an outbound ticket is available for this
|
||||
# destination, attach it to the message.
|
||||
lxmessage.outbound_ticket = self.get_outbound_ticket(destination_hash)
|
||||
if lxmessage.outbound_ticket != None and lxmessage.defer_stamp:
|
||||
RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but outbound ticket was applied, processing immediately", RNS.LOG_DEBUG)
|
||||
lxmessage.defer_stamp = False
|
||||
|
||||
# If requested, include a ticket to allow the
|
||||
# destination to reply without generating a stamp.
|
||||
if lxmessage.include_ticket:
|
||||
ticket = self.generate_ticket(lxmessage.destination_hash)
|
||||
if ticket:
|
||||
lxmessage.fields[FIELD_TICKET] = ticket
|
||||
|
||||
if not lxmessage.packed:
|
||||
lxmessage.pack()
|
||||
|
||||
|
|
@ -1073,9 +1219,23 @@ class LXMRouter:
|
|||
try:
|
||||
message = LXMessage.unpack_from_bytes(lxmf_data)
|
||||
|
||||
if message.signature_validated and FIELD_TICKET in message.fields:
|
||||
ticket_entry = message.fields[FIELD_TICKET]
|
||||
if type(ticket_entry) == list and len(ticket_entry) > 1:
|
||||
expires = ticket_entry[0]
|
||||
ticket = ticket_entry[1]
|
||||
|
||||
if time.time() < expires:
|
||||
if type(ticket) == bytes and len(ticket) == LXMessage.TICKET_LENGTH:
|
||||
self.remember_ticket(message.source_hash, ticket_entry)
|
||||
def save_job():
|
||||
self.save_available_tickets()
|
||||
threading.Thread(target=save_job, daemon=True).start()
|
||||
|
||||
required_stamp_cost = self.delivery_destinations[message.destination_hash].stamp_cost
|
||||
if required_stamp_cost != None:
|
||||
if message.validate_stamp(required_stamp_cost):
|
||||
destination_tickets = self.get_inbound_tickets(message.source_hash)
|
||||
if message.validate_stamp(required_stamp_cost, tickets=destination_tickets):
|
||||
message.stamp_valid = True
|
||||
else:
|
||||
message.stamp_valid = False
|
||||
|
|
@ -1471,6 +1631,11 @@ class LXMRouter:
|
|||
if lxmessage.state == LXMessage.DELIVERED:
|
||||
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
||||
self.pending_outbound.remove(lxmessage)
|
||||
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
|
||||
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
|
||||
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
|
||||
self.save_available_tickets()
|
||||
|
||||
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
|
||||
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
|
||||
self.pending_outbound.remove(lxmessage)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue