backend.workers.cleanup_tempfiles
Delete old datasets
1""" 2Delete old datasets 3""" 4import shutil 5import re 6import json 7from datetime import datetime 8from pathlib import Path 9 10from backend.lib.worker import BasicWorker 11from common.lib.dataset import DataSet 12from common.lib.exceptions import WorkerInterruptedException, DataSetException 13 14 15class TempFileCleaner(BasicWorker): 16 """ 17 Clean up discarded temporary files 18 19 If 4CAT crashes while processing something, it may result in staging 20 folders that are never cleaned up. This worker checks for finished 21 datasets with staging area folders and cleans them up. 22 23 Also cleans up orphaned result files for datasets that no longer exist. 24 """ 25 type = "clean-temp-files" 26 max_workers = 1 27 28 # Use tracking file to delay deletion of files that may still be in use 29 days_to_keep = 7 30 31 @classmethod 32 def ensure_job(cls, config=None): 33 """ 34 Ensure that the temp file cleaner is always running 35 36 This is used to ensure that the temp file cleaner is always running, and 37 if it is not, it will be started by the WorkerManager. 38 39 :return: Job parameters for the worker 40 """ 41 return {"remote_id": "localhost", "interval": 10800} 42 43 def work(self): 44 """ 45 Go through result files, and for each one check if it should still 46 exist 47 :return: 48 """ 49 # Load tracking file 50 tracking_file = self.config.get('PATH_DATA').joinpath(".temp_file_cleaner") 51 if not tracking_file.exists(): 52 tracked_files = {} 53 else: 54 tracked_files = json.loads(tracking_file.read_text()) 55 56 result_files = Path(self.config.get('PATH_DATA')).glob("*") 57 for file in result_files: 58 if file.stem.startswith("."): 59 # skip hidden files 60 continue 61 62 if self.interrupted: 63 tracking_file.write_text(json.dumps(tracked_files)) 64 raise WorkerInterruptedException("Interrupted while cleaning up orphaned result files") 65 66 # the key of the dataset files belong to can be extracted from the 67 # file name in a predictable way. 68 possible_keys = re.findall(r"[abcdef0-9]{32}", file.stem) 69 if not possible_keys: 70 self.log.warning("File %s does not seem to be a result file - clean up manually" % file) 71 continue 72 73 # if for whatever reason there are multiple hashes in the filename, 74 # the key would always be the last one 75 key = possible_keys.pop() 76 77 try: 78 dataset = DataSet(key=key, db=self.db, modules=self.modules) 79 except DataSetException: 80 # the dataset has been deleted since, but the result file still 81 # exists - should be safe to clean up 82 if file.name not in tracked_files: 83 self.log.info(f"No matching dataset with key {key} for file {file}; marking for deletion") 84 tracked_files[file.name] = datetime.now().timestamp() + (self.days_to_keep * 86400) 85 elif tracked_files[file.name] < datetime.now().timestamp(): 86 self.log.info(f"File {file} marked for deletion since {datetime.fromtimestamp(tracked_files[file.name]).strftime('%Y-%m-%d %H:%M:%S')}, deleting file") 87 if file.is_dir(): 88 try: 89 shutil.rmtree(file) 90 except PermissionError: 91 self.log.info(f"Folder {file} does not belong to a dataset but cannot be deleted (no " 92 f"permissions), skipping") 93 94 else: 95 try: 96 file.unlink() 97 except FileNotFoundError: 98 # the file has been deleted since 99 pass 100 101 # Remove from tracking 102 del tracked_files[file.name] 103 104 continue 105 106 if file.is_dir() and "-staging" in file.stem and dataset.is_finished(): 107 # staging area exists but dataset is marked as finished 108 # if the dataset is finished, the staging area should have been 109 # compressed into a zip file, or deleted, so this is also safe 110 # to clean up 111 if file.name not in tracked_files: 112 self.log.info("Dataset %s is finished, but staging area remains at %s, marking for deletion" % (dataset.key, str(file))) 113 tracked_files[file.name] = datetime.now().timestamp() + (self.days_to_keep * 86400) 114 elif tracked_files[file.name] < datetime.now().timestamp(): 115 self.log.info("Dataset %s is finished, but staging area remains at %s, deleting folder" % (dataset.key, str(file))) 116 shutil.rmtree(file) 117 118 # Update tracked files 119 tracking_file.write_text(json.dumps(tracked_files)) 120 121 self.job.finish()
16class TempFileCleaner(BasicWorker): 17 """ 18 Clean up discarded temporary files 19 20 If 4CAT crashes while processing something, it may result in staging 21 folders that are never cleaned up. This worker checks for finished 22 datasets with staging area folders and cleans them up. 23 24 Also cleans up orphaned result files for datasets that no longer exist. 25 """ 26 type = "clean-temp-files" 27 max_workers = 1 28 29 # Use tracking file to delay deletion of files that may still be in use 30 days_to_keep = 7 31 32 @classmethod 33 def ensure_job(cls, config=None): 34 """ 35 Ensure that the temp file cleaner is always running 36 37 This is used to ensure that the temp file cleaner is always running, and 38 if it is not, it will be started by the WorkerManager. 39 40 :return: Job parameters for the worker 41 """ 42 return {"remote_id": "localhost", "interval": 10800} 43 44 def work(self): 45 """ 46 Go through result files, and for each one check if it should still 47 exist 48 :return: 49 """ 50 # Load tracking file 51 tracking_file = self.config.get('PATH_DATA').joinpath(".temp_file_cleaner") 52 if not tracking_file.exists(): 53 tracked_files = {} 54 else: 55 tracked_files = json.loads(tracking_file.read_text()) 56 57 result_files = Path(self.config.get('PATH_DATA')).glob("*") 58 for file in result_files: 59 if file.stem.startswith("."): 60 # skip hidden files 61 continue 62 63 if self.interrupted: 64 tracking_file.write_text(json.dumps(tracked_files)) 65 raise WorkerInterruptedException("Interrupted while cleaning up orphaned result files") 66 67 # the key of the dataset files belong to can be extracted from the 68 # file name in a predictable way. 69 possible_keys = re.findall(r"[abcdef0-9]{32}", file.stem) 70 if not possible_keys: 71 self.log.warning("File %s does not seem to be a result file - clean up manually" % file) 72 continue 73 74 # if for whatever reason there are multiple hashes in the filename, 75 # the key would always be the last one 76 key = possible_keys.pop() 77 78 try: 79 dataset = DataSet(key=key, db=self.db, modules=self.modules) 80 except DataSetException: 81 # the dataset has been deleted since, but the result file still 82 # exists - should be safe to clean up 83 if file.name not in tracked_files: 84 self.log.info(f"No matching dataset with key {key} for file {file}; marking for deletion") 85 tracked_files[file.name] = datetime.now().timestamp() + (self.days_to_keep * 86400) 86 elif tracked_files[file.name] < datetime.now().timestamp(): 87 self.log.info(f"File {file} marked for deletion since {datetime.fromtimestamp(tracked_files[file.name]).strftime('%Y-%m-%d %H:%M:%S')}, deleting file") 88 if file.is_dir(): 89 try: 90 shutil.rmtree(file) 91 except PermissionError: 92 self.log.info(f"Folder {file} does not belong to a dataset but cannot be deleted (no " 93 f"permissions), skipping") 94 95 else: 96 try: 97 file.unlink() 98 except FileNotFoundError: 99 # the file has been deleted since 100 pass 101 102 # Remove from tracking 103 del tracked_files[file.name] 104 105 continue 106 107 if file.is_dir() and "-staging" in file.stem and dataset.is_finished(): 108 # staging area exists but dataset is marked as finished 109 # if the dataset is finished, the staging area should have been 110 # compressed into a zip file, or deleted, so this is also safe 111 # to clean up 112 if file.name not in tracked_files: 113 self.log.info("Dataset %s is finished, but staging area remains at %s, marking for deletion" % (dataset.key, str(file))) 114 tracked_files[file.name] = datetime.now().timestamp() + (self.days_to_keep * 86400) 115 elif tracked_files[file.name] < datetime.now().timestamp(): 116 self.log.info("Dataset %s is finished, but staging area remains at %s, deleting folder" % (dataset.key, str(file))) 117 shutil.rmtree(file) 118 119 # Update tracked files 120 tracking_file.write_text(json.dumps(tracked_files)) 121 122 self.job.finish()
Clean up discarded temporary files
If 4CAT crashes while processing something, it may result in staging folders that are never cleaned up. This worker checks for finished datasets with staging area folders and cleans them up.
Also cleans up orphaned result files for datasets that no longer exist.
@classmethod
def
ensure_job(cls, config=None):
32 @classmethod 33 def ensure_job(cls, config=None): 34 """ 35 Ensure that the temp file cleaner is always running 36 37 This is used to ensure that the temp file cleaner is always running, and 38 if it is not, it will be started by the WorkerManager. 39 40 :return: Job parameters for the worker 41 """ 42 return {"remote_id": "localhost", "interval": 10800}
Ensure that the temp file cleaner is always running
This is used to ensure that the temp file cleaner is always running, and if it is not, it will be started by the WorkerManager.
Returns
Job parameters for the worker
def
work(self):
44 def work(self): 45 """ 46 Go through result files, and for each one check if it should still 47 exist 48 :return: 49 """ 50 # Load tracking file 51 tracking_file = self.config.get('PATH_DATA').joinpath(".temp_file_cleaner") 52 if not tracking_file.exists(): 53 tracked_files = {} 54 else: 55 tracked_files = json.loads(tracking_file.read_text()) 56 57 result_files = Path(self.config.get('PATH_DATA')).glob("*") 58 for file in result_files: 59 if file.stem.startswith("."): 60 # skip hidden files 61 continue 62 63 if self.interrupted: 64 tracking_file.write_text(json.dumps(tracked_files)) 65 raise WorkerInterruptedException("Interrupted while cleaning up orphaned result files") 66 67 # the key of the dataset files belong to can be extracted from the 68 # file name in a predictable way. 69 possible_keys = re.findall(r"[abcdef0-9]{32}", file.stem) 70 if not possible_keys: 71 self.log.warning("File %s does not seem to be a result file - clean up manually" % file) 72 continue 73 74 # if for whatever reason there are multiple hashes in the filename, 75 # the key would always be the last one 76 key = possible_keys.pop() 77 78 try: 79 dataset = DataSet(key=key, db=self.db, modules=self.modules) 80 except DataSetException: 81 # the dataset has been deleted since, but the result file still 82 # exists - should be safe to clean up 83 if file.name not in tracked_files: 84 self.log.info(f"No matching dataset with key {key} for file {file}; marking for deletion") 85 tracked_files[file.name] = datetime.now().timestamp() + (self.days_to_keep * 86400) 86 elif tracked_files[file.name] < datetime.now().timestamp(): 87 self.log.info(f"File {file} marked for deletion since {datetime.fromtimestamp(tracked_files[file.name]).strftime('%Y-%m-%d %H:%M:%S')}, deleting file") 88 if file.is_dir(): 89 try: 90 shutil.rmtree(file) 91 except PermissionError: 92 self.log.info(f"Folder {file} does not belong to a dataset but cannot be deleted (no " 93 f"permissions), skipping") 94 95 else: 96 try: 97 file.unlink() 98 except FileNotFoundError: 99 # the file has been deleted since 100 pass 101 102 # Remove from tracking 103 del tracked_files[file.name] 104 105 continue 106 107 if file.is_dir() and "-staging" in file.stem and dataset.is_finished(): 108 # staging area exists but dataset is marked as finished 109 # if the dataset is finished, the staging area should have been 110 # compressed into a zip file, or deleted, so this is also safe 111 # to clean up 112 if file.name not in tracked_files: 113 self.log.info("Dataset %s is finished, but staging area remains at %s, marking for deletion" % (dataset.key, str(file))) 114 tracked_files[file.name] = datetime.now().timestamp() + (self.days_to_keep * 86400) 115 elif tracked_files[file.name] < datetime.now().timestamp(): 116 self.log.info("Dataset %s is finished, but staging area remains at %s, deleting folder" % (dataset.key, str(file))) 117 shutil.rmtree(file) 118 119 # Update tracked files 120 tracking_file.write_text(json.dumps(tracked_files)) 121 122 self.job.finish()
Go through result files, and for each one check if it should still exist