Implemented propagation node peering key generation and peering cost signalling

This commit is contained in:
Mark Qvist 2025-10-31 13:53:59 +01:00
commit 434267784d
6 changed files with 257 additions and 96 deletions

View file

@ -51,6 +51,7 @@ class LXMFPropagationAnnounceHandler:
propagation_sync_limit = int(data[4]) propagation_sync_limit = int(data[4])
propagation_stamp_cost = int(data[5][0]) propagation_stamp_cost = int(data[5][0])
propagation_stamp_cost_flexibility = int(data[5][1]) propagation_stamp_cost_flexibility = int(data[5][1])
peering_cost = int(data[5][2])
metadata = data[6] metadata = data[6]
if destination_hash in self.lxmrouter.static_peers: if destination_hash in self.lxmrouter.static_peers:
@ -60,6 +61,7 @@ class LXMFPropagationAnnounceHandler:
propagation_sync_limit=propagation_sync_limit, propagation_sync_limit=propagation_sync_limit,
propagation_stamp_cost=propagation_stamp_cost, propagation_stamp_cost=propagation_stamp_cost,
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility, propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
peering_cost=peering_cost,
metadata=metadata) metadata=metadata)
else: else:
@ -72,6 +74,7 @@ class LXMFPropagationAnnounceHandler:
propagation_sync_limit=propagation_sync_limit, propagation_sync_limit=propagation_sync_limit,
propagation_stamp_cost=propagation_stamp_cost, propagation_stamp_cost=propagation_stamp_cost,
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility, propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
peering_cost=peering_cost,
metadata=metadata) metadata=metadata)
else: else:

View file

@ -154,18 +154,20 @@ def pn_announce_data_is_valid(data):
if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data") if len(data) < 7: raise ValueError("Invalid announce data: Insufficient peer data")
else: else:
try: int(data[1]) try: int(data[1])
except: raise ValueError("Invalid announce data: Could not decode peer timebase") except: raise ValueError("Invalid announce data: Could not decode timebase")
if data[2] != True and data[2] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status") if data[2] != True and data[2] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status")
try: int(data[3]) try: int(data[3])
except: raise ValueError("Invalid announce data: Could not decode peer propagation transfer limit") except: raise ValueError("Invalid announce data: Could not decode propagation transfer limit")
try: int(data[4]) try: int(data[4])
except: raise ValueError("Invalid announce data: Could not decode peer propagation sync limit") except: raise ValueError("Invalid announce data: Could not decode propagation sync limit")
if type(data[4]) != list: raise ValueError("Invalid announce data: Could not decode peer stamp costs") if type(data[4]) != list: raise ValueError("Invalid announce data: Could not decode stamp costs")
try: int(data[5][0]) try: int(data[5][0])
except: raise ValueError("Invalid announce data: Could not decode peer target stamp cost") except: raise ValueError("Invalid announce data: Could not decode target stamp cost")
try: int(data[5][1]) try: int(data[5][1])
except: raise ValueError("Invalid announce data: Could not decode peer stamp cost flexibility") except: raise ValueError("Invalid announce data: Could not decode stamp cost flexibility")
if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode peer metadata") try: int(data[5][2])
except: raise ValueError("Invalid announce data: Could not decode peering cost")
if type(data[6]) != dict: raise ValueError("Invalid announce data: Could not decode metadata")
except Exception as e: except Exception as e:
RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG) RNS.log(f"Could not validate propagation node announce data: {e}", RNS.LOG_DEBUG)

View file

@ -1,8 +1,10 @@
import os import os
import time import time
import threading
import RNS import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
import LXMF.LXStamper as LXStamper
from collections import deque from collections import deque
from .LXMF import APP_NAME from .LXMF import APP_NAME
@ -20,6 +22,7 @@ class LXMPeer:
ERROR_NO_IDENTITY = 0xf0 ERROR_NO_IDENTITY = 0xf0
ERROR_NO_ACCESS = 0xf1 ERROR_NO_ACCESS = 0xf1
ERROR_THROTTLED = 0xf2
ERROR_TIMEOUT = 0xfe ERROR_TIMEOUT = 0xfe
STRATEGY_LAZY = 0x01 STRATEGY_LAZY = 0x01
@ -80,6 +83,11 @@ class LXMPeer:
except: peer.propagation_stamp_cost_flexibility = None except: peer.propagation_stamp_cost_flexibility = None
else: peer.propagation_stamp_cost_flexibility = None else: peer.propagation_stamp_cost_flexibility = None
if "peering_cost" in dictionary:
try: peer.peering_cost = int(dictionary["peering_cost"])
except: peer.peering_cost = None
else: peer.peering_cost = None
if "sync_strategy" in dictionary: if "sync_strategy" in dictionary:
try: peer.sync_strategy = int(dictionary["sync_strategy"]) try: peer.sync_strategy = int(dictionary["sync_strategy"])
except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY except: peer.sync_strategy = LXMPeer.DEFAULT_SYNC_STRATEGY
@ -97,6 +105,8 @@ class LXMPeer:
else: peer.tx_bytes = 0 else: peer.tx_bytes = 0
if "last_sync_attempt" in dictionary: peer.last_sync_attempt = dictionary["last_sync_attempt"] if "last_sync_attempt" in dictionary: peer.last_sync_attempt = dictionary["last_sync_attempt"]
else: peer.last_sync_attempt = 0 else: peer.last_sync_attempt = 0
if "peering_key" in dictionary: peer.peering_key = dictionary["peering_key"]
else: peer.peering_key = None
hm_count = 0 hm_count = 0
for transient_id in dictionary["handled_ids"]: for transient_id in dictionary["handled_ids"]:
@ -123,6 +133,8 @@ class LXMPeer:
dictionary["peering_timebase"] = self.peering_timebase dictionary["peering_timebase"] = self.peering_timebase
dictionary["alive"] = self.alive dictionary["alive"] = self.alive
dictionary["last_heard"] = self.last_heard dictionary["last_heard"] = self.last_heard
dictionary["sync_strategy"] = self.sync_strategy
dictionary["peering_key"] = self.peering_key
dictionary["destination_hash"] = self.destination_hash dictionary["destination_hash"] = self.destination_hash
dictionary["link_establishment_rate"] = self.link_establishment_rate dictionary["link_establishment_rate"] = self.link_establishment_rate
dictionary["sync_transfer_rate"] = self.sync_transfer_rate dictionary["sync_transfer_rate"] = self.sync_transfer_rate
@ -130,7 +142,7 @@ class LXMPeer:
dictionary["propagation_sync_limit"] = self.propagation_sync_limit dictionary["propagation_sync_limit"] = self.propagation_sync_limit
dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost dictionary["propagation_stamp_cost"] = self.propagation_stamp_cost
dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility dictionary["propagation_stamp_cost_flexibility"] = self.propagation_stamp_cost_flexibility
dictionary["sync_strategy"] = self.sync_strategy dictionary["peering_cost"] = self.peering_cost
dictionary["last_sync_attempt"] = self.last_sync_attempt dictionary["last_sync_attempt"] = self.last_sync_attempt
dictionary["offered"] = self.offered dictionary["offered"] = self.offered
dictionary["outgoing"] = self.outgoing dictionary["outgoing"] = self.outgoing
@ -155,16 +167,18 @@ class LXMPeer:
return peer_bytes return peer_bytes
def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY): def __init__(self, router, destination_hash, sync_strategy=DEFAULT_SYNC_STRATEGY):
self.alive = False self.alive = False
self.last_heard = 0 self.last_heard = 0
self.sync_strategy = sync_strategy self.sync_strategy = sync_strategy
self.peering_key = None
self.peering_cost = None
self.next_sync_attempt = 0 self.next_sync_attempt = 0
self.last_sync_attempt = 0 self.last_sync_attempt = 0
self.sync_backoff = 0 self.sync_backoff = 0
self.peering_timebase = 0 self.peering_timebase = 0
self.link_establishment_rate = 0 self.link_establishment_rate = 0
self.sync_transfer_rate = 0 self.sync_transfer_rate = 0
self.propagation_transfer_limit = None self.propagation_transfer_limit = None
self.propagation_sync_limit = None self.propagation_sync_limit = None
@ -185,6 +199,8 @@ class LXMPeer:
self._hm_counts_synced = False self._hm_counts_synced = False
self._um_counts_synced = False self._um_counts_synced = False
self._peering_key_lock = threading.Lock()
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
@ -199,11 +215,74 @@ class LXMPeer:
self.destination = None self.destination = None
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING) RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
def peering_key_ready(self):
if not self.peering_cost: return False
if type(self.peering_key) == list and len(self.peering_key) == 2:
value = self.peering_key[1]
if value >= self.peering_cost: return True
else:
RNS.log(f"Peering key value mismatch for {self}. Current value is {value}, but peer requires {self.peering_cost}. Scheduling regeneration...", RNS.LOG_WARNING)
self.peering_key = None
return False
def peering_key_value(self):
if type(self.peering_key) == list and len(self.peering_key) == 2: return self.peering_key[1]
else: return None
def generate_peering_key(self):
if self.peering_cost == None: return False
with self._peering_key_lock:
if self.peering_key != None: return True
else:
RNS.log(f"Generating peering key for {self}", RNS.LOG_NOTICE)
if self.router.identity == None:
RNS.log(f"Could not update peering key for {self} since the local LXMF router identity is not configured", RNS.LOG_ERROR)
return False
if self.identity == None:
self.identity = RNS.Identity.recall(destination_hash)
if self.identity == None:
RNS.log(f"Could not update peering key for {self} since its identity could not be recalled", RNS.LOG_ERROR)
return False
key_material = self.identity.hash+self.router.identity.hash
peering_key, value = LXStamper.generate_stamp(key_material, self.peering_cost, expand_rounds=LXStamper.WORKBLOCK_EXPAND_ROUNDS_PEERING)
if value >= self.peering_cost:
self.peering_key = [peering_key, value]
RNS.log(f"Peering key successfully generated for {self}", RNS.LOG_NOTICE)
return True
return False
def sync(self): def sync(self):
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG) RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
self.last_sync_attempt = time.time() self.last_sync_attempt = time.time()
if time.time() > self.next_sync_attempt: sync_time_reached = time.time() > self.next_sync_attempt
stamp_costs_known = self.propagation_stamp_cost != None and self.propagation_stamp_cost_flexibility != None and self.peering_cost != None
peering_key_ready = self.peering_key_ready()
sync_checks = sync_time_reached and stamp_costs_known and peering_key_ready
if not sync_checks:
try:
if not sync_time_reached:
postpone_reason = " due to previous failures"
if self.last_sync_attempt > self.last_heard: self.alive = False
elif not stamp_costs_known:
postpone_reason = " since its required stamp costs are not yet known"
elif not peering_key_ready:
postpone_reason = " since a peering key has not been generated yet"
def job(): self.generate_peering_key()
threading.Thread(target=job, daemon=True).start()
delay = self.next_sync_attempt-time.time()
postpone_delay = " for {RNS.prettytime({delay})}" if delay > 0 else ""
RNS.log(f"Postponing sync with peer {RNS.prettyhexrep(self.destination_hash)}{postpone_delay}{postpone_reason}", RNS.LOG_DEBUG)
except Exception as e:
RNS.trace_exception(e)
else:
if not RNS.Transport.has_path(self.destination_hash): if not RNS.Transport.has_path(self.destination_hash):
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG) RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.destination_hash) RNS.Transport.request_path(self.destination_hash)
@ -219,6 +298,10 @@ class LXMPeer:
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
if self.destination != None: if self.destination != None:
if len(self.unhandled_messages) == 0:
RNS.log(f"Sync requested for {self}, but no unhandled messages exist for peer. Sync complete.", RNS.LOG_DEBUG)
return
if len(self.unhandled_messages) > 0: if len(self.unhandled_messages) > 0:
if self.currently_transferring_messages != None: if self.currently_transferring_messages != None:
RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR) RNS.log(f"Sync requested for {self}, but current message transfer index was not clear. Aborting.", RNS.LOG_ERROR)
@ -236,23 +319,31 @@ class LXMPeer:
self.alive = True self.alive = True
self.last_heard = time.time() self.last_heard = time.time()
self.sync_backoff = 0 self.sync_backoff = 0
min_accepted_cost = min(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG) RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing sync offer...", RNS.LOG_DEBUG)
unhandled_entries = [] unhandled_entries = []
unhandled_ids = [] unhandled_ids = []
purged_ids = [] purged_ids = []
low_value_ids = []
for transient_id in self.unhandled_messages: for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries: if transient_id in self.router.propagation_entries:
unhandled_entry = [ transient_id, if self.router.get_stamp_value(transient_id) < min_accepted_cost: low_value_ids.append(transient_id)
self.router.get_weight(transient_id), else:
self.router.get_size(transient_id) ] unhandled_entry = [ transient_id,
self.router.get_weight(transient_id),
unhandled_entries.append(unhandled_entry) self.router.get_size(transient_id) ]
unhandled_entries.append(unhandled_entry)
else: purged_ids.append(transient_id) else: purged_ids.append(transient_id)
for transient_id in purged_ids: for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.remove_unhandled_message(transient_id)
for transient_id in low_value_ids:
RNS.log(f"Dropping unhandled message {RNS.prettyhexrep(transient_id)} for peer {RNS.prettyhexrep(self.destination_hash)} since its stamp value is lower than peer requirement of {min_accepted_cost}.", RNS.LOG_DEBUG)
self.remove_unhandled_message(transient_id) self.remove_unhandled_message(transient_id)
unhandled_entries.sort(key=lambda e: e[1], reverse=False) unhandled_entries.sort(key=lambda e: e[1], reverse=False)
@ -284,11 +375,7 @@ class LXMPeer:
self.state = LXMPeer.REQUEST_SENT self.state = LXMPeer.REQUEST_SENT
else: else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) RNS.log(f"Could not request sync to peer {RNS.prettyhexrep(self.destination_hash)} since its identity could not be recalled.", RNS.LOG_ERROR)
else:
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
if self.last_sync_attempt > self.last_heard: self.alive = False
def request_failed(self, request_receipt): def request_failed(self, request_receipt):
RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG) RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG)

View file

@ -45,9 +45,11 @@ class LXMRouter:
ROTATION_HEADROOM_PCT = 10 ROTATION_HEADROOM_PCT = 10
ROTATION_AR_MAX = 0.5 ROTATION_AR_MAX = 0.5
PROPAGATION_COST = 12 PEERING_COST = 10
PROPAGATION_COST_MIN = 10 MAX_PEERING_COST = 12
PROPAGATION_COST_MIN = 13
PROPAGATION_COST_FLEX = 3 PROPAGATION_COST_FLEX = 3
PROPAGATION_COST = 16
PROPAGATION_LIMIT = 256 PROPAGATION_LIMIT = 256
SYNC_LIMIT = PROPAGATION_LIMIT*40 SYNC_LIMIT = PROPAGATION_LIMIT*40
DELIVERY_LIMIT = 1000 DELIVERY_LIMIT = 1000
@ -81,7 +83,8 @@ class LXMRouter:
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT, propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT,
enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None, enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None,
from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT, from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT,
propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX): propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX,
peering_cost=PEERING_COST):
random.seed(os.urandom(10)) random.seed(os.urandom(10))
@ -115,17 +118,20 @@ class LXMRouter:
self.outbound_propagation_link = None self.outbound_propagation_link = None
if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT
if propagation_cost < LXMRouter.PROPAGATION_COST_MIN: propagation_cost = LXMRouter.PROPAGATION_COST_MIN
self.message_storage_limit = None self.message_storage_limit = None
self.information_storage_limit = None self.information_storage_limit = None
self.propagation_per_transfer_limit = propagation_limit self.propagation_per_transfer_limit = propagation_limit
self.propagation_per_sync_limit = sync_limit self.propagation_per_sync_limit = sync_limit
self.delivery_per_transfer_limit = delivery_limit self.delivery_per_transfer_limit = delivery_limit
self.propagation_stamp_cost = propagation_cost self.propagation_stamp_cost = propagation_cost
self.propagation_stamp_cost_flexibility = propagation_cost_flexibility self.propagation_stamp_cost_flexibility = propagation_cost_flexibility
self.enforce_ratchets = enforce_ratchets self.peering_cost = peering_cost
self._enforce_stamps = enforce_stamps self.max_peering_cost = LXMRouter.MAX_PEERING_COST
self.pending_deferred_stamps = {} self.enforce_ratchets = enforce_ratchets
self._enforce_stamps = enforce_stamps
self.pending_deferred_stamps = {}
if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit: if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit:
self.propagation_per_sync_limit = self.propagation_per_transfer_limit self.propagation_per_sync_limit = self.propagation_per_transfer_limit
@ -284,7 +290,7 @@ class LXMRouter:
def delayed_announce(): def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY) time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
node_state = self.propagation_node and not self.from_static_only node_state = self.propagation_node and not self.from_static_only
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility] stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility, self.peering_cost]
metadata = {} metadata = {}
announce_data = [ False, # 0: Legacy LXMF PN support announce_data = [ False, # 0: Legacy LXMF PN support
int(time.time()), # 1: Current node timebase int(time.time()), # 1: Current node timebase
@ -719,6 +725,8 @@ class LXMRouter:
"sync_limit": peer.propagation_sync_limit, "sync_limit": peer.propagation_sync_limit,
"target_stamp_cost": peer.propagation_stamp_cost, "target_stamp_cost": peer.propagation_stamp_cost,
"stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility, "stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility,
"peering_cost": peer.peering_cost,
"peering_key": peer.peering_key_value(),
"network_distance": RNS.Transport.hops_to(peer_id), "network_distance": RNS.Transport.hops_to(peer_id),
"rx_bytes": peer.rx_bytes, "rx_bytes": peer.rx_bytes,
"tx_bytes": peer.tx_bytes, "tx_bytes": peer.tx_bytes,
@ -739,6 +747,8 @@ class LXMRouter:
"sync_limit": self.propagation_per_sync_limit, "sync_limit": self.propagation_per_sync_limit,
"target_stamp_cost": self.propagation_stamp_cost, "target_stamp_cost": self.propagation_stamp_cost,
"stamp_cost_flexibility": self.propagation_stamp_cost_flexibility, "stamp_cost_flexibility": self.propagation_stamp_cost_flexibility,
"peering_cost": self.peering_cost,
"max_peering_cost": self.max_peering_cost,
"autopeer_maxdepth": self.autopeer_maxdepth, "autopeer_maxdepth": self.autopeer_maxdepth,
"from_static_only": self.from_static_only, "from_static_only": self.from_static_only,
"messagestore": { "messagestore": {
@ -1782,39 +1792,48 @@ class LXMRouter:
### Peer Sync & Propagation ########################### ### Peer Sync & Propagation ###########################
####################################################### #######################################################
def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility): def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility, peering_cost, metadata):
if destination_hash in self.peers: if peering_cost > self.max_peering_cost:
peer = self.peers[destination_hash] if destination_hash in self.peers:
if timestamp > peer.peering_timebase: RNS.log(f"Peer {RNS.prettyhexrep(destination_hash)} increased peering cost beyond local accepted maximum, breaking peering...", RNS.LOG_NOTICE)
peer.alive = True self.unpeer(destination_hash, timestamp)
peer.sync_backoff = 0
peer.next_sync_attempt = 0
peer.peering_timebase = timestamp
peer.last_heard = time.time()
peer.propagation_stamp_cost = propagation_stamp_cost
peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
peer.propagation_transfer_limit = propagation_transfer_limit
if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
else: peer.propagation_sync_limit = propagation_transfer_limit
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
else:
if len(self.peers) < self.max_peers:
peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy)
peer.alive = True
peer.last_heard = time.time()
peer.propagation_stamp_cost = propagation_stamp_cost
peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
peer.propagation_transfer_limit = propagation_transfer_limit
if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
else: peer.propagation_sync_limit = propagation_transfer_limit
self.peers[destination_hash] = peer
RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE)
else: else:
RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG) RNS.log(f"Not peering with {RNS.prettyhexrep(destination_hash)}, since its peering cost of {peering_cost} exceeds local maximum of {self.max_peering_cost}", RNS.LOG_NOTICE)
else:
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp > peer.peering_timebase:
peer.alive = True
peer.sync_backoff = 0
peer.next_sync_attempt = 0
peer.peering_timebase = timestamp
peer.last_heard = time.time()
peer.propagation_stamp_cost = propagation_stamp_cost
peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
peer.peering_cost = peering_cost
peer.propagation_transfer_limit = propagation_transfer_limit
if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
else: peer.propagation_sync_limit = propagation_transfer_limit
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
else:
if len(self.peers) >= self.max_peers: RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
else:
peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy)
peer.alive = True
peer.last_heard = time.time()
peer.propagation_stamp_cost = propagation_stamp_cost
peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
peer.peering_cost = peering_cost
peer.propagation_transfer_limit = propagation_transfer_limit
if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
else: peer.propagation_sync_limit = propagation_transfer_limit
self.peers[destination_hash] = peer
RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE)
def unpeer(self, destination_hash, timestamp = None): def unpeer(self, destination_hash, timestamp = None):
if timestamp == None: if timestamp == None:
@ -2000,8 +2019,8 @@ class LXMRouter:
####################################### #######################################
# TODO: Check propagation stamps here # # TODO: Check propagation stamps here #
####################################### #######################################
target_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
validated_messages = LXStamper.validate_pn_stamps(messages, target_cost) validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost)
for validated_entry in validated_messages: for validated_entry in validated_messages:
lxmf_data = validated_entry[1] lxmf_data = validated_entry[1]
@ -2077,7 +2096,7 @@ class LXMRouter:
# 2: Boolean flag signalling propagation node state # 2: Boolean flag signalling propagation node state
# 3: Per-transfer limit for message propagation in kilobytes # 3: Per-transfer limit for message propagation in kilobytes
# 4: Limit for incoming propagation node syncs # 4: Limit for incoming propagation node syncs
# 5: Propagation stamp cost for this node # 5: Propagation stamp costs for this node
# 6: Node metadata # 6: Node metadata
if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth: if remote_app_data[2] and self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
remote_timebase = remote_app_data[1] remote_timebase = remote_app_data[1]
@ -2085,10 +2104,11 @@ class LXMRouter:
remote_sync_limit = remote_app_data[4] remote_sync_limit = remote_app_data[4]
remote_stamp_cost = remote_app_data[5][0] remote_stamp_cost = remote_app_data[5][0]
remote_stamp_flex = remote_app_data[5][1] remote_stamp_flex = remote_app_data[5][1]
remote_peering_cost = remote_app_data[5][2]
remote_metadata = remote_app_data[6] remote_metadata = remote_app_data[6]
RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug
self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_metadata) self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata)
ms = "" if len(messages) == 1 else "s" ms = "" if len(messages) == 1 else "s"
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE) RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE)
@ -2096,8 +2116,8 @@ class LXMRouter:
####################################### #######################################
# TODO: Check propagation stamps here # # TODO: Check propagation stamps here #
####################################### #######################################
target_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility) min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
validated_messages = LXStamper.validate_pn_stamps(messages, target_cost) validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost)
for validated_entry in validated_messages: for validated_entry in validated_messages:
transient_id = validated_entry[0] transient_id = validated_entry[0]
@ -2177,13 +2197,13 @@ class LXMRouter:
msg_file = open(file_path, "wb") msg_file = open(file_path, "wb")
msg_file.write(lxmf_data); msg_file.close() msg_file.write(lxmf_data); msg_file.close()
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME) RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)} with stamp value {stamp_value}, adding to peer distribution queues...", RNS.LOG_EXTREME)
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], [], stamp_value] self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], [], stamp_value]
self.enqueue_peer_distribution(transient_id, from_peer) self.enqueue_peer_distribution(transient_id, from_peer)
else: else:
# TODO: Add message to sneakernet queues when implemented # TODO: Add message to sneakernet queues when implemented
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG) RNS.log(f"Received propagated LXMF message {RNS.prettyhexrep(transient_id)}, but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG)
return True return True

View file

@ -7,23 +7,24 @@ import math
import itertools import itertools
import multiprocessing import multiprocessing
WORKBLOCK_EXPAND_ROUNDS = 3000 WORKBLOCK_EXPAND_ROUNDS = 3000
WORKBLOCK_EXPAND_ROUNDS_PN = 1000 WORKBLOCK_EXPAND_ROUNDS_PEERING = 20000
STAMP_SIZE = RNS.Identity.HASHLENGTH WORKBLOCK_EXPAND_ROUNDS_PN = 1000
PN_VALIDATION_POOL_MIN_SIZE = 256 STAMP_SIZE = RNS.Identity.HASHLENGTH
PN_VALIDATION_POOL_MIN_SIZE = 256
active_jobs = {} active_jobs = {}
def stamp_workblock(message_id, expand_rounds=WORKBLOCK_EXPAND_ROUNDS): def stamp_workblock(material, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
wb_st = time.time() wb_st = time.time()
workblock = b"" workblock = b""
for n in range(expand_rounds): for n in range(expand_rounds):
workblock += RNS.Cryptography.hkdf(length=256, workblock += RNS.Cryptography.hkdf(length=256,
derive_from=message_id, derive_from=material,
salt=RNS.Identity.full_hash(message_id+msgpack.packb(n)), salt=RNS.Identity.full_hash(material+msgpack.packb(n)),
context=None) context=None)
wb_time = time.time() - wb_st wb_time = time.time() - wb_st
# RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG) RNS.log(f"Stamp workblock size {RNS.prettysize(len(workblock))}, generated in {round(wb_time*1000,2)}ms", RNS.LOG_DEBUG)
return workblock return workblock
@ -81,9 +82,9 @@ def validate_pn_stamps(transient_list, target_cost):
if len(transient_list) <= PN_VALIDATION_POOL_MIN_SIZE or non_mp_platform: return validate_pn_stamps_job_simple(transient_list, target_cost) if len(transient_list) <= PN_VALIDATION_POOL_MIN_SIZE or non_mp_platform: return validate_pn_stamps_job_simple(transient_list, target_cost)
else: return validate_pn_stamps_job_multip(transient_list, target_cost) else: return validate_pn_stamps_job_multip(transient_list, target_cost)
def generate_stamp(message_id, stamp_cost): def generate_stamp(message_id, stamp_cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS):
RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG) RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG)
workblock = stamp_workblock(message_id) workblock = stamp_workblock(message_id, expand_rounds=expand_rounds)
start_time = time.time() start_time = time.time()
stamp = None stamp = None
@ -362,4 +363,12 @@ if __name__ == "__main__":
RNS.loglevel = RNS.LOG_DEBUG RNS.loglevel = RNS.LOG_DEBUG
RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG) RNS.log("Testing LXMF stamp generation", RNS.LOG_DEBUG)
message_id = os.urandom(32) message_id = os.urandom(32)
generate_stamp(message_id, cost) generate_stamp(message_id, cost)
RNS.log("Testing propagation stamp generation", RNS.LOG_DEBUG)
message_id = os.urandom(32)
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PN)
RNS.log("Testing peering key generation", RNS.LOG_DEBUG)
message_id = os.urandom(32)
generate_stamp(message_id, cost, expand_rounds=WORKBLOCK_EXPAND_ROUNDS_PEERING)

View file

@ -164,6 +164,20 @@ def apply_config():
else: else:
active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX active_configuration["propagation_stamp_cost_flexibility"] = LXMF.LXMRouter.PROPAGATION_COST_FLEX
if "propagation" in lxmd_config and "peering_cost" in lxmd_config["propagation"]:
active_configuration["peering_cost"] = lxmd_config["propagation"].as_int("peering_cost")
if active_configuration["peering_cost"] < 0:
active_configuration["peering_cost"] = 0
else:
active_configuration["peering_cost"] = LXMF.LXMRouter.PEERING_COST
if "propagation" in lxmd_config and "remote_peering_cost_max" in lxmd_config["propagation"]:
active_configuration["remote_peering_cost_max"] = lxmd_config["propagation"].as_int("remote_peering_cost_max")
if active_configuration["remote_peering_cost_max"] < 0:
active_configuration["remote_peering_cost_max"] = 0
else:
active_configuration["remote_peering_cost_max"] = LXMF.LXMRouter.MAX_PEERING_COST
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
else: else:
@ -579,9 +593,11 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness =
ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"] ssp = s["static_peers"]; cprr = s["clients"]["client_propagation_messages_received"]
cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"] cprs = s["clients"]["client_propagation_messages_served"]; upi = s["unpeered_propagation_incoming"]
psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"] psc = s["target_stamp_cost"]; scf = s["stamp_cost_flexibility"]
pc = s["peering_cost"]; pcm = s["max_peering_cost"]
print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})") print(f"Messagestore contains {mscnt} messages, {msb} ({ms_util} utilised of {msl})")
print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit") print(f"Accepting propagated messages from {who_str}, {ptl} per-transfer limit")
print(f"Required propagation stamp cost is {psc}, flexibility is {scf}") print(f"Required propagation stamp cost is {psc}, flexibility is {scf}")
print(f"Peering cost is {pc}, max remote peering cost is {pcm}")
print(f"") print(f"")
print(f"Peers : {stp} total (peer limit is {smp})") print(f"Peers : {stp} total (peer limit is {smp})")
print(f" {sdp} discovered, {ssp} static") print(f" {sdp} discovered, {ssp} static")
@ -613,7 +629,13 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness =
h = max(time.time()-p["last_heard"], 0) h = max(time.time()-p["last_heard"], 0)
hops = p["network_distance"] hops = p["network_distance"]
hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away" hs = "hops unknown" if hops == RNS.Transport.PATHFINDER_M else f"{hops} hop away" if hops == 1 else f"{hops} hops away"
pm = p["messages"] pm = p["messages"]; pk = p["peering_key"]
pc = p["peering_cost"]; psc = p["target_stamp_cost"]; psf = p["stamp_cost_flexibility"]
if pc == None: pc = "unknown"
if psc == None: psc = "unknown"
if psf == None: psf = "unknown"
if pk == None: pk = "Not generated"
else: pk = f"Generated, value is {pk}"
if p["last_sync_attempt"] != 0: if p["last_sync_attempt"] != 0:
lsa = p["last_sync_attempt"] lsa = p["last_sync_attempt"]
ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago" ls = f"last synced {RNS.prettytime(max(time.time()-lsa, 0))} ago"
@ -622,9 +644,11 @@ def get_status(configdir = None, rnsconfigdir = None, verbosity = 0, quietness =
sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000) sstr = RNS.prettyspeed(p["str"]); sler = RNS.prettyspeed(p["ler"]); stl = RNS.prettysize(p["transfer_limit"]*1000)
srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"] srxb = RNS.prettysize(p["rx_bytes"]); stxb = RNS.prettysize(p["tx_bytes"]); pmo = pm["offered"]; pmout = pm["outgoing"]
pmi = pm["incoming"]; pmuh = pm["unhandled"] pmi = pm["incoming"]; pmuh = pm["unhandled"]
print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}") print(f"{ind}{t}{RNS.prettyhexrep(peer_id)}")
print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago") print(f"{ind*2}Status : {a}, {hs}, last heard {RNS.prettytime(h)} ago")
print(f"{ind*2}Costs : Propagation {psc} (flex {psf}), peering {pc}")
print(f"{ind*2}Sync key : {pk}")
print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit") print(f"{ind*2}Speeds : {sstr} STR, {sler} LER, {stl} transfer limit")
print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming") print(f"{ind*2}Messages : {pmo} offered, {pmout} outgoing, {pmi} incoming")
print(f"{ind*2}Traffic : {srxb} received, {stxb} sent") print(f"{ind*2}Traffic : {srxb} received, {stxb} sent")
@ -752,6 +776,22 @@ autopeer_maxdepth = 4
# propagation_stamp_cost_flexibility = 3 # propagation_stamp_cost_flexibility = 3
# The peering_cost option configures the target
# value required for a remote node to peer with
# and deliver messages to this node.
# peering_cost = 10
# You can configure the maximum peering cost
# of remote nodes that this node will peer with.
# Setting this to a higher number will allow
# this node to peer with other nodes requiring
# a high peering key value, but will require
# more computation time during initial peering
# when generating the peering key.
# remote_peering_cost_max = 12
# You can tell the LXMF message router to # You can tell the LXMF message router to
# prioritise storage for one or more # prioritise storage for one or more
# destinations. If the message store reaches # destinations. If the message store reaches