2022-06-17 08:54:04 +02:00
import os
import time
import RNS
import RNS . vendor . umsgpack as msgpack
2025-01-21 20:20:39 +01:00
from collections import deque
2022-06-17 08:54:04 +02:00
from . LXMF import APP_NAME
class LXMPeer :
OFFER_REQUEST_PATH = " /offer "
MESSAGE_GET_PATH = " /get "
IDLE = 0x00
LINK_ESTABLISHING = 0x01
LINK_READY = 0x02
REQUEST_SENT = 0x03
RESPONSE_RECEIVED = 0x04
RESOURCE_TRANSFERRING = 0x05
ERROR_NO_IDENTITY = 0xf0
2022-06-17 13:42:44 +02:00
ERROR_NO_ACCESS = 0xf1
2025-01-24 14:05:12 +01:00
ERROR_TIMEOUT = 0xfe
2022-06-17 08:54:04 +02:00
# Maximum amount of time a peer can
# be unreachable before it is removed
2023-10-16 01:50:01 +02:00
MAX_UNREACHABLE = 14 * 24 * 60 * 60
2022-06-17 08:54:04 +02:00
2022-12-20 23:58:09 +01:00
# Everytime consecutive time a sync
# link fails to establish, add this
# amount off time to wait before the
# next sync is attempted.
SYNC_BACKOFF_STEP = 12 * 60
# How long to wait for an answer to
# peer path requests before deferring
# sync to later.
PATH_REQUEST_GRACE = 7.5
2022-06-17 08:54:04 +02:00
@staticmethod
def from_bytes ( peer_bytes , router ) :
dictionary = msgpack . unpackb ( peer_bytes )
2025-01-21 16:33:39 +01:00
peer_destination_hash = dictionary [ " destination_hash " ]
peer_peering_timebase = dictionary [ " peering_timebase " ]
peer_alive = dictionary [ " alive " ]
peer_last_heard = dictionary [ " last_heard " ]
peer = LXMPeer ( router , peer_destination_hash )
peer . peering_timebase = peer_peering_timebase
peer . alive = peer_alive
peer . last_heard = peer_last_heard
2022-06-17 08:54:04 +02:00
2023-02-17 12:29:00 +01:00
if " link_establishment_rate " in dictionary :
peer . link_establishment_rate = dictionary [ " link_establishment_rate " ]
else :
peer . link_establishment_rate = 0
2025-01-13 14:35:14 +01:00
if " sync_transfer_rate " in dictionary :
peer . sync_transfer_rate = dictionary [ " sync_transfer_rate " ]
else :
peer . sync_transfer_rate = 0
2022-06-17 08:54:04 +02:00
2024-03-01 22:37:54 +01:00
if " propagation_transfer_limit " in dictionary :
try :
peer . propagation_transfer_limit = float ( dictionary [ " propagation_transfer_limit " ] )
except Exception as e :
peer . propagation_transfer_limit = None
else :
peer . propagation_transfer_limit = None
2025-01-22 01:37:09 +01:00
if " offered " in dictionary :
peer . offered = dictionary [ " offered " ]
else :
peer . offered = 0
if " outgoing " in dictionary :
peer . outgoing = dictionary [ " outgoing " ]
else :
peer . outgoing = 0
if " incoming " in dictionary :
peer . incoming = dictionary [ " incoming " ]
else :
peer . incoming = 0
if " rx_bytes " in dictionary :
peer . rx_bytes = dictionary [ " rx_bytes " ]
else :
peer . rx_bytes = 0
if " tx_bytes " in dictionary :
peer . tx_bytes = dictionary [ " tx_bytes " ]
else :
peer . tx_bytes = 0
2025-01-23 16:27:01 +01:00
if " last_sync_attempt " in dictionary :
peer . last_sync_attempt = dictionary [ " last_sync_attempt " ]
else :
peer . last_sync_attempt = 0
2024-03-01 22:37:54 +01:00
2025-01-21 16:33:39 +01:00
hm_count = 0
2022-06-17 08:54:04 +02:00
for transient_id in dictionary [ " handled_ids " ] :
if transient_id in router . propagation_entries :
2025-01-21 16:33:39 +01:00
peer . add_handled_message ( transient_id )
hm_count + = 1
2022-06-17 08:54:04 +02:00
2025-01-21 16:33:39 +01:00
um_count = 0
2022-06-17 08:54:04 +02:00
for transient_id in dictionary [ " unhandled_ids " ] :
if transient_id in router . propagation_entries :
2025-01-21 16:33:39 +01:00
peer . add_unhandled_message ( transient_id )
um_count + = 1
peer . _hm_count = hm_count
peer . _um_count = um_count
peer . _hm_counts_synced = True
peer . _um_counts_synced = True
2022-06-17 08:54:04 +02:00
2025-01-21 10:46:59 +01:00
del dictionary
2022-06-17 08:54:04 +02:00
return peer
def to_bytes ( self ) :
dictionary = { }
dictionary [ " peering_timebase " ] = self . peering_timebase
dictionary [ " alive " ] = self . alive
dictionary [ " last_heard " ] = self . last_heard
dictionary [ " destination_hash " ] = self . destination_hash
2023-02-17 12:29:00 +01:00
dictionary [ " link_establishment_rate " ] = self . link_establishment_rate
2025-01-13 14:35:14 +01:00
dictionary [ " sync_transfer_rate " ] = self . sync_transfer_rate
2024-03-01 22:37:54 +01:00
dictionary [ " propagation_transfer_limit " ] = self . propagation_transfer_limit
2025-01-23 16:27:01 +01:00
dictionary [ " last_sync_attempt " ] = self . last_sync_attempt
2025-01-22 01:37:09 +01:00
dictionary [ " offered " ] = self . offered
dictionary [ " outgoing " ] = self . outgoing
dictionary [ " incoming " ] = self . incoming
dictionary [ " rx_bytes " ] = self . rx_bytes
dictionary [ " tx_bytes " ] = self . tx_bytes
2022-06-17 08:54:04 +02:00
handled_ids = [ ]
for transient_id in self . handled_messages :
handled_ids . append ( transient_id )
unhandled_ids = [ ]
for transient_id in self . unhandled_messages :
unhandled_ids . append ( transient_id )
dictionary [ " handled_ids " ] = handled_ids
dictionary [ " unhandled_ids " ] = unhandled_ids
2025-01-21 16:33:39 +01:00
peer_bytes = msgpack . packb ( dictionary )
del dictionary
return peer_bytes
2022-06-17 08:54:04 +02:00
def __init__ ( self , router , destination_hash ) :
self . alive = False
self . last_heard = 0
2022-12-20 23:58:09 +01:00
self . next_sync_attempt = 0
self . last_sync_attempt = 0
self . sync_backoff = 0
2022-06-17 08:54:04 +02:00
self . peering_timebase = 0
2023-02-17 12:29:00 +01:00
self . link_establishment_rate = 0
2025-01-13 14:35:14 +01:00
self . sync_transfer_rate = 0
2024-03-01 22:37:54 +01:00
self . propagation_transfer_limit = None
2025-01-21 20:20:39 +01:00
self . handled_messages_queue = deque ( )
self . unhandled_messages_queue = deque ( )
2022-06-17 08:54:04 +02:00
2025-01-22 01:37:09 +01:00
self . offered = 0 # Messages offered to this peer
self . outgoing = 0 # Messages transferred to this peer
self . incoming = 0 # Messages received from this peer
self . rx_bytes = 0 # Bytes received from this peer
self . tx_bytes = 0 # Bytes sent to this peer
2025-01-21 16:33:39 +01:00
self . _hm_count = 0
self . _um_count = 0
self . _hm_counts_synced = False
self . _um_counts_synced = False
2022-06-17 08:54:04 +02:00
self . link = None
self . state = LXMPeer . IDLE
2024-03-01 22:37:54 +01:00
self . last_offer = [ ]
2022-06-17 08:54:04 +02:00
self . router = router
self . destination_hash = destination_hash
self . identity = RNS . Identity . recall ( destination_hash )
2024-10-04 11:22:39 +02:00
if self . identity != None :
self . destination = RNS . Destination ( self . identity , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " propagation " )
else :
2025-01-21 10:46:59 +01:00
self . destination = None
2024-10-04 11:22:39 +02:00
RNS . log ( f " Could not recall identity for LXMF propagation peer { RNS . prettyhexrep ( self . destination_hash ) } , will retry identity resolution on next sync " , RNS . LOG_WARNING )
2022-06-17 08:54:04 +02:00
def sync ( self ) :
RNS . log ( " Initiating LXMF Propagation Node sync with peer " + RNS . prettyhexrep ( self . destination_hash ) , RNS . LOG_DEBUG )
2022-12-20 23:58:09 +01:00
self . last_sync_attempt = time . time ( )
2022-06-17 08:54:04 +02:00
2022-12-20 23:58:09 +01:00
if time . time ( ) > self . next_sync_attempt :
if not RNS . Transport . has_path ( self . destination_hash ) :
RNS . log ( " No path to peer " + RNS . prettyhexrep ( self . destination_hash ) + " exists, requesting... " , RNS . LOG_DEBUG )
RNS . Transport . request_path ( self . destination_hash )
time . sleep ( LXMPeer . PATH_REQUEST_GRACE )
if not RNS . Transport . has_path ( self . destination_hash ) :
RNS . log ( " Path request was not answered, retrying sync with peer " + RNS . prettyhexrep ( self . destination_hash ) + " later " , RNS . LOG_DEBUG )
2022-06-17 08:54:04 +02:00
else :
2022-12-20 23:58:09 +01:00
if self . identity == None :
self . identity = RNS . Identity . recall ( destination_hash )
2024-10-04 11:22:39 +02:00
if self . identity != None :
self . destination = RNS . Destination ( self . identity , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " propagation " )
2022-12-20 23:58:09 +01:00
2024-10-04 11:22:39 +02:00
if self . destination != None :
2022-12-20 23:58:09 +01:00
if len ( self . unhandled_messages ) > 0 :
if self . state == LXMPeer . IDLE :
RNS . log ( " Establishing link for sync to peer " + RNS . prettyhexrep ( self . destination_hash ) + " ... " , RNS . LOG_DEBUG )
self . sync_backoff + = LXMPeer . SYNC_BACKOFF_STEP
self . next_sync_attempt = time . time ( ) + self . sync_backoff
self . link = RNS . Link ( self . destination , established_callback = self . link_established , closed_callback = self . link_closed )
self . state = LXMPeer . LINK_ESTABLISHING
else :
if self . state == LXMPeer . LINK_READY :
self . alive = True
self . last_heard = time . time ( )
self . sync_backoff = 0
RNS . log ( " Synchronisation link to peer " + RNS . prettyhexrep ( self . destination_hash ) + " established, preparing request... " , RNS . LOG_DEBUG )
2024-03-01 22:37:54 +01:00
unhandled_entries = [ ]
2022-12-20 23:58:09 +01:00
unhandled_ids = [ ]
purged_ids = [ ]
for transient_id in self . unhandled_messages :
if transient_id in self . router . propagation_entries :
2024-03-01 22:37:54 +01:00
unhandled_entry = [
transient_id ,
self . router . get_weight ( transient_id ) ,
self . router . get_size ( transient_id ) ,
]
unhandled_entries . append ( unhandled_entry )
2022-12-20 23:58:09 +01:00
else :
purged_ids . append ( transient_id )
for transient_id in purged_ids :
RNS . log ( " Dropping unhandled message " + RNS . prettyhexrep ( transient_id ) + " for peer " + RNS . prettyhexrep ( self . destination_hash ) + " since it no longer exists in the message store. " , RNS . LOG_DEBUG )
2025-01-21 16:33:39 +01:00
self . remove_unhandled_message ( transient_id )
2022-12-20 23:58:09 +01:00
2024-03-01 22:37:54 +01:00
unhandled_entries . sort ( key = lambda e : e [ 1 ] , reverse = False )
2024-03-01 23:48:12 +01:00
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
2024-03-01 22:37:54 +01:00
for unhandled_entry in unhandled_entries :
transient_id = unhandled_entry [ 0 ]
weight = unhandled_entry [ 1 ]
lxm_size = unhandled_entry [ 2 ]
2024-03-02 09:09:51 +01:00
next_size = cumulative_size + ( lxm_size + per_message_overhead )
if self . propagation_transfer_limit != None and next_size > ( self . propagation_transfer_limit * 1000 ) :
2025-01-30 11:30:45 +01:00
if lxm_size + per_message_overhead > ( self . propagation_transfer_limit * 1000 ) :
2025-01-30 11:23:18 +01:00
self . remove_unhandled_message ( transient_id )
self . add_handled_message ( transient_id )
2025-01-30 11:36:11 +01:00
RNS . log ( f " Message { RNS . prettyhexrep ( transient_id ) } exceeds transfer limit for { self } , considering handled " , RNS . LOG_DEBUG )
2024-03-01 22:37:54 +01:00
else :
2024-03-01 23:48:12 +01:00
cumulative_size + = ( lxm_size + per_message_overhead )
2024-03-01 22:37:54 +01:00
unhandled_ids . append ( transient_id )
2025-01-22 01:37:09 +01:00
RNS . log ( f " Offering { len ( unhandled_ids ) } messages to peer { RNS . prettyhexrep ( self . destination . hash ) } " , RNS . LOG_VERBOSE )
2024-03-01 22:37:54 +01:00
self . last_offer = unhandled_ids
2025-01-21 10:46:59 +01:00
self . link . request ( LXMPeer . OFFER_REQUEST_PATH , unhandled_ids , response_callback = self . offer_response , failed_callback = self . request_failed )
2022-12-20 23:58:09 +01:00
self . state = LXMPeer . REQUEST_SENT
2024-02-29 23:02:16 +01:00
2022-12-20 23:58:09 +01:00
else :
RNS . log ( " Could not request sync to peer " + RNS . prettyhexrep ( self . destination_hash ) + " since its identity could not be recalled. " , RNS . LOG_ERROR )
2024-02-29 23:02:16 +01:00
2022-12-20 23:58:09 +01:00
else :
RNS . log ( " Postponing sync with peer " + RNS . prettyhexrep ( self . destination_hash ) + " for " + RNS . prettytime ( self . next_sync_attempt - time . time ( ) ) + " due to previous failures " , RNS . LOG_DEBUG )
if self . last_sync_attempt > self . last_heard :
self . alive = False
2022-06-17 08:54:04 +02:00
def request_failed ( self , request_receipt ) :
RNS . log ( " Sync request to peer " + str ( self . destination ) + " failed " , RNS . LOG_DEBUG )
if self . link != None :
self . link . teardown ( )
2024-02-29 23:02:16 +01:00
self . state = LXMPeer . IDLE
2022-06-17 08:54:04 +02:00
def offer_response ( self , request_receipt ) :
try :
self . state = LXMPeer . RESPONSE_RECEIVED
response = request_receipt . response
wanted_messages = [ ]
wanted_message_ids = [ ]
if response == LXMPeer . ERROR_NO_IDENTITY :
if self . link != None :
2025-01-22 01:37:09 +01:00
RNS . log ( " Remote peer indicated that no identification was received, retrying... " , RNS . LOG_VERBOSE )
2024-03-01 22:37:54 +01:00
self . link . identify ( )
2022-06-17 08:54:04 +02:00
self . state = LXMPeer . LINK_READY
self . sync ( )
2025-01-22 01:37:09 +01:00
return
elif response == LXMPeer . ERROR_NO_ACCESS :
RNS . log ( " Remote indicated that access was denied, breaking peering " , RNS . LOG_VERBOSE )
self . router . unpeer ( self . destination_hash )
return
2022-06-17 08:54:04 +02:00
elif response == False :
# Peer already has all advertised messages
2024-03-01 22:37:54 +01:00
for transient_id in self . last_offer :
if transient_id in self . unhandled_messages :
2025-01-21 16:33:39 +01:00
self . add_handled_message ( transient_id )
self . remove_unhandled_message ( transient_id )
2024-03-01 22:37:54 +01:00
2022-06-17 08:54:04 +02:00
elif response == True :
# Peer wants all advertised messages
2024-03-01 22:37:54 +01:00
for transient_id in self . last_offer :
2025-01-21 10:46:59 +01:00
wanted_messages . append ( self . router . propagation_entries [ transient_id ] )
2022-06-17 08:54:04 +02:00
wanted_message_ids . append ( transient_id )
else :
# Peer wants some advertised messages
2024-03-01 22:37:54 +01:00
for transient_id in self . last_offer . copy ( ) :
2022-06-17 08:54:04 +02:00
# If the peer did not want the message, it has
# already received it from another peer.
if not transient_id in response :
2025-01-21 16:33:39 +01:00
self . add_handled_message ( transient_id )
self . remove_unhandled_message ( transient_id )
2022-06-17 08:54:04 +02:00
for transient_id in response :
2025-01-21 10:46:59 +01:00
wanted_messages . append ( self . router . propagation_entries [ transient_id ] )
2022-06-17 08:54:04 +02:00
wanted_message_ids . append ( transient_id )
if len ( wanted_messages ) > 0 :
2025-01-22 01:37:09 +01:00
RNS . log ( " Peer wanted " + str ( len ( wanted_messages ) ) + " of the available messages " , RNS . LOG_VERBOSE )
2022-06-17 08:54:04 +02:00
lxm_list = [ ]
for message_entry in wanted_messages :
file_path = message_entry [ 1 ]
if os . path . isfile ( file_path ) :
file = open ( file_path , " rb " )
lxmf_data = file . read ( )
file . close ( )
lxm_list . append ( lxmf_data )
data = msgpack . packb ( [ time . time ( ) , lxm_list ] )
resource = RNS . Resource ( data , self . link , callback = self . resource_concluded )
resource . transferred_messages = wanted_message_ids
2025-01-13 14:35:14 +01:00
resource . sync_transfer_started = time . time ( )
2022-06-17 08:54:04 +02:00
self . state = LXMPeer . RESOURCE_TRANSFERRING
2024-02-29 23:02:16 +01:00
2022-06-17 08:54:04 +02:00
else :
2025-01-22 01:37:09 +01:00
RNS . log ( " Peer " + RNS . prettyhexrep ( self . destination_hash ) + " did not request any of the available messages, sync completed " , RNS . LOG_VERBOSE )
self . offered + = len ( self . last_offer )
2024-02-29 20:12:54 +01:00
if self . link != None :
self . link . teardown ( )
self . link = None
2022-06-17 08:54:04 +02:00
self . state = LXMPeer . IDLE
except Exception as e :
RNS . log ( " Error while handling offer response from peer " + str ( self . destination ) , RNS . LOG_ERROR )
RNS . log ( " The contained exception was: " + str ( e ) , RNS . LOG_ERROR )
if self . link != None :
self . link . teardown ( )
self . link = None
self . state = LXMPeer . IDLE
def resource_concluded ( self , resource ) :
if resource . status == RNS . Resource . COMPLETE :
for transient_id in resource . transferred_messages :
2025-01-21 16:33:39 +01:00
self . add_handled_message ( transient_id )
self . remove_unhandled_message ( transient_id )
2024-02-29 20:12:54 +01:00
if self . link != None :
self . link . teardown ( )
self . link = None
2022-06-17 08:54:04 +02:00
self . state = LXMPeer . IDLE
2024-02-29 20:12:54 +01:00
2025-01-13 14:35:14 +01:00
rate_str = " "
if hasattr ( resource , " sync_transfer_started " ) and resource . sync_transfer_started :
self . sync_transfer_rate = ( resource . get_transfer_size ( ) * 8 ) / ( time . time ( ) - resource . sync_transfer_started )
rate_str = f " at { RNS . prettyspeed ( self . sync_transfer_rate ) } "
2025-01-22 01:37:09 +01:00
RNS . log ( f " Syncing { len ( resource . transferred_messages ) } messages to peer { RNS . prettyhexrep ( self . destination_hash ) } completed { rate_str } " , RNS . LOG_VERBOSE )
2022-06-17 08:54:04 +02:00
self . alive = True
self . last_heard = time . time ( )
2025-01-22 01:37:09 +01:00
self . offered + = len ( self . last_offer )
self . outgoing + = len ( resource . transferred_messages )
self . tx_bytes + = resource . get_data_size ( )
2024-02-29 20:12:54 +01:00
2022-06-17 08:54:04 +02:00
else :
2025-01-22 01:37:09 +01:00
RNS . log ( " Resource transfer for LXMF peer sync failed to " + str ( self . destination ) , RNS . LOG_VERBOSE )
2022-06-17 08:54:04 +02:00
if self . link != None :
self . link . teardown ( )
2024-02-29 23:02:16 +01:00
self . link = None
2024-02-29 20:12:54 +01:00
self . state = LXMPeer . IDLE
2022-06-17 08:54:04 +02:00
def link_established ( self , link ) :
self . link . identify ( self . router . identity )
2023-02-17 12:29:00 +01:00
link_establishment_rate = link . get_establishment_rate ( )
if link_establishment_rate != None :
self . link_establishment_rate = link_establishment_rate
2022-06-17 08:54:04 +02:00
self . state = LXMPeer . LINK_READY
2022-12-20 23:58:09 +01:00
self . next_sync_attempt = 0
2022-06-17 08:54:04 +02:00
self . sync ( )
def link_closed ( self , link ) :
self . link = None
self . state = LXMPeer . IDLE
2025-01-21 20:20:39 +01:00
def queued_items ( self ) :
return len ( self . handled_messages_queue ) > 0 or len ( self . unhandled_messages_queue ) > 0
def queue_unhandled_message ( self , transient_id ) :
self . unhandled_messages_queue . append ( transient_id )
def queue_handled_message ( self , transient_id ) :
self . handled_messages_queue . append ( transient_id )
def process_queues ( self ) :
if len ( self . unhandled_messages_queue ) > 0 or len ( self . handled_messages_queue ) > 0 :
# TODO: Remove debug
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
handled_messages = self . handled_messages
unhandled_messages = self . unhandled_messages
while len ( self . handled_messages_queue ) > 0 :
transient_id = self . handled_messages_queue . pop ( )
if not transient_id in handled_messages :
self . add_handled_message ( transient_id )
if transient_id in unhandled_messages :
self . remove_unhandled_message ( transient_id )
while len ( self . unhandled_messages_queue ) > 0 :
transient_id = self . unhandled_messages_queue . pop ( )
if not transient_id in handled_messages and not transient_id in unhandled_messages :
self . add_unhandled_message ( transient_id )
del handled_messages , unhandled_messages
# TODO: Remove debug
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
2025-01-21 16:33:39 +01:00
@property
def handled_messages ( self ) :
pes = self . router . propagation_entries . copy ( )
2025-01-21 16:44:24 +01:00
hm = list ( filter ( lambda tid : self . destination_hash in pes [ tid ] [ 4 ] , pes ) )
2025-01-21 16:33:39 +01:00
self . _hm_count = len ( hm ) ; del pes
2025-01-21 16:51:25 +01:00
self . _hm_counts_synced = True
2025-01-21 16:33:39 +01:00
return hm
@property
def unhandled_messages ( self ) :
pes = self . router . propagation_entries . copy ( )
2025-01-21 16:44:24 +01:00
um = list ( filter ( lambda tid : self . destination_hash in pes [ tid ] [ 5 ] , pes ) )
2025-01-21 16:33:39 +01:00
self . _um_count = len ( um ) ; del pes
2025-01-21 16:51:25 +01:00
self . _um_counts_synced = True
2025-01-21 16:33:39 +01:00
return um
@property
def handled_message_count ( self ) :
if not self . _hm_counts_synced :
self . _update_counts ( )
return self . _hm_count
@property
def unhandled_message_count ( self ) :
if not self . _um_counts_synced :
self . _update_counts ( )
return self . _um_count
2025-01-29 01:26:36 +01:00
@property
def acceptance_rate ( self ) :
return 0 if self . offered == 0 else ( self . outgoing / self . offered )
2025-01-21 16:33:39 +01:00
def _update_counts ( self ) :
if not self . _hm_counts_synced :
hm = self . handled_messages ; del hm
if not self . _um_counts_synced :
um = self . unhandled_messages ; del um
def add_handled_message ( self , transient_id ) :
if transient_id in self . router . propagation_entries :
if not self . destination_hash in self . router . propagation_entries [ transient_id ] [ 4 ] :
self . router . propagation_entries [ transient_id ] [ 4 ] . append ( self . destination_hash )
self . _hm_counts_synced = False
def add_unhandled_message ( self , transient_id ) :
if transient_id in self . router . propagation_entries :
if not self . destination_hash in self . router . propagation_entries [ transient_id ] [ 5 ] :
self . router . propagation_entries [ transient_id ] [ 5 ] . append ( self . destination_hash )
self . _um_count + = 1
def remove_handled_message ( self , transient_id ) :
if transient_id in self . router . propagation_entries :
if self . destination_hash in self . router . propagation_entries [ transient_id ] [ 4 ] :
self . router . propagation_entries [ transient_id ] [ 4 ] . remove ( self . destination_hash )
self . _hm_counts_synced = False
def remove_unhandled_message ( self , transient_id ) :
if transient_id in self . router . propagation_entries :
if self . destination_hash in self . router . propagation_entries [ transient_id ] [ 5 ] :
self . router . propagation_entries [ transient_id ] [ 5 ] . remove ( self . destination_hash )
self . _um_counts_synced = False
2022-06-17 08:54:04 +02:00
def __str__ ( self ) :
if self . destination_hash :
return RNS . prettyhexrep ( self . destination_hash )
else :
return " <Unknown> "