Prevent LXM persist race in write_to_directory when messages change state within very short time spans
This commit is contained in:
parent
2ac2b100ae
commit
575fb7d77d
1 changed files with 18 additions and 15 deletions
|
|
@ -8,6 +8,7 @@ import multiprocessing
|
||||||
|
|
||||||
import LXMF.LXStamper as LXStamper
|
import LXMF.LXStamper as LXStamper
|
||||||
from .LXMF import APP_NAME, compression_support_from_app_data
|
from .LXMF import APP_NAME, compression_support_from_app_data
|
||||||
|
from threading import Lock
|
||||||
|
|
||||||
|
|
||||||
class LXMessage:
|
class LXMessage:
|
||||||
|
|
@ -184,6 +185,7 @@ class LXMessage:
|
||||||
self.__delivery_destination = None
|
self.__delivery_destination = None
|
||||||
self.__delivery_callback = None
|
self.__delivery_callback = None
|
||||||
self.__pn_encrypted_data = None
|
self.__pn_encrypted_data = None
|
||||||
|
self.__persist_lock = Lock()
|
||||||
self.failed_callback = None
|
self.failed_callback = None
|
||||||
|
|
||||||
self.deferred_stamp_generating = False
|
self.deferred_stamp_generating = False
|
||||||
|
|
@ -674,23 +676,24 @@ class LXMessage:
|
||||||
file_path = directory_path+"/"+file_name
|
file_path = directory_path+"/"+file_name
|
||||||
tmp_path = file_path+".tmp."+str(os.getpid() or time.time())
|
tmp_path = file_path+".tmp."+str(os.getpid() or time.time())
|
||||||
|
|
||||||
try:
|
with self.__persist_lock:
|
||||||
with open(tmp_path, "wb") as file:
|
|
||||||
file.write(self.packed_container())
|
|
||||||
file.flush()
|
|
||||||
try: os.fsync(file.fileno())
|
|
||||||
except OSError as e: RNS.log(f"Error while waiting for persist fsync for {self}: {e}", RNS.LOG_WARNING)
|
|
||||||
|
|
||||||
os.replace(tmp_path, file_path)
|
|
||||||
return file_path
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
try:
|
try:
|
||||||
if os.path.exists(tmp_path): os.unlink(tmp_path)
|
with open(tmp_path, "wb") as file:
|
||||||
except Exception as e: RNS.log(f"Error while cleaning temporary file {tmp_path} for {self}: {e}", RNS.LOG_ERROR)
|
file.write(self.packed_container())
|
||||||
|
file.flush()
|
||||||
|
try: os.fsync(file.fileno())
|
||||||
|
except OSError as e: RNS.log(f"Error while waiting for persist fsync for {self}: {e}", RNS.LOG_WARNING)
|
||||||
|
|
||||||
RNS.log(f"Error while writing LXMF message to file \"{file_path}\". The contained exception was: {e}", RNS.LOG_ERROR)
|
os.replace(tmp_path, file_path)
|
||||||
return None
|
return file_path
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
if os.path.exists(tmp_path): os.unlink(tmp_path)
|
||||||
|
except Exception as e: RNS.log(f"Error while cleaning temporary file {tmp_path} for {self}: {e}", RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
RNS.log(f"Error while writing LXMF message to file \"{file_path}\". The contained exception was: {e}", RNS.LOG_ERROR)
|
||||||
|
return None
|
||||||
|
|
||||||
def as_uri(self, finalise=True):
|
def as_uri(self, finalise=True):
|
||||||
if not self.packed:
|
if not self.packed:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue