common.lib.dmi_service_manager
DMI Service Manager
1""" 2DMI Service Manager 3""" 4import datetime 5import os 6import json 7import time 8from json import JSONDecodeError 9from werkzeug.utils import secure_filename 10 11import requests 12from pathlib import Path 13 14 15__author__ = "Dale Wahl" 16__credits__ = ["Dale Wahl"] 17__maintainer__ = "Dale Wahl" 18__email__ = "4cat@oilab.eu" 19 20from common.lib.helpers import strip_tags 21 22 23class DmiServiceManagerException(Exception): 24 """ 25 Raised when there is a problem with the configuration settings. 26 """ 27 pass 28 29class DsmOutOfMemory(DmiServiceManagerException): 30 """ 31 Raised when there is a problem with the configuration settings. 32 """ 33 pass 34 35 36class DsmConnectionError(DmiServiceManagerException): 37 """ 38 Raised when there is a problem with the configuration settings. 39 """ 40 pass 41 42class DmiServiceManager: 43 """ 44 Class to manage interactions with a DMI Service Manager server. 45 46 Found here: 47 https://github.com/digitalmethodsinitiative/dmi_service_manager 48 """ 49 def __init__(self, processor): 50 """ 51 """ 52 self.processor = processor 53 self.local_or_remote = processor.config.get("dmi-service-manager.ac_local_or_remote") 54 self.server_address = processor.config.get("dmi-service-manager.ab_server_address").rstrip("/") + "/api/" 55 56 self.processed_files = 0 57 58 self.num_files_to_process = None 59 self.server_file_collection_name = None 60 self.server_results_folder_name = None 61 self.path_to_files = None 62 self.path_to_results = None 63 64 def check_gpu_memory_available(self, service_endpoint): 65 """ 66 Returns tuple with True if server has some memory available and False otherwise as well as the JSON response 67 from server containing the memory information. 68 """ 69 api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint 70 resp = requests.get(api_endpoint, timeout=30) 71 if resp.status_code == 200: 72 return resp.json() 73 elif resp.status_code == 503: 74 # TODO: retry later (increase delay in dmi_service_manager class and interrupt w/ retry)? DSM could possibly manage jobs in queue 75 # Processor could run CPU mode, but DSM needs to run different container (container fails if GPU enabled but not available) 76 raise DsmOutOfMemory("DMI Service Manager server out of GPU memory.") 77 else: 78 try: 79 reason = resp.json()['reason'] 80 except JSONDecodeError: 81 reason = strip_tags(resp.text) 82 raise DsmConnectionError(f"Connection Error {resp.status_code}: {reason}") 83 84 def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name): 85 """ 86 Process files according to DMI Service Manager local or remote settings 87 """ 88 self.num_files_to_process = len(filenames) 89 90 # Upload files if necessary 91 if self.local_or_remote == "local": 92 # Relative to PATH_DATA which should be where Docker mounts the container volume 93 # TODO: path is just the staging_area name, but what if we move staging areas? DMI Service manager needs to know... 94 path_to_files = input_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute()) 95 path_to_results = output_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute()) 96 97 elif self.local_or_remote == "remote": 98 99 # Upload files 100 self.server_file_collection_name = server_file_collection_name 101 self.server_results_folder_name = server_results_folder_name 102 path_to_files, path_to_results = self.send_files(server_file_collection_name, server_results_folder_name, filenames, input_file_dir) 103 104 else: 105 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 106 107 self.path_to_files = path_to_files 108 self.path_to_results = path_to_results 109 return path_to_files, path_to_results 110 111 def check_progress(self): 112 if self.local_or_remote == "local": 113 current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results)) 114 elif self.local_or_remote == "remote": 115 existing_files = self.request_folder_files(self.server_file_collection_name) 116 current_completed = len(existing_files.get(self.server_results_folder_name, [])) 117 else: 118 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 119 120 if current_completed != self.processed_files: 121 self.processor.dataset.update_status( 122 f"Processed {current_completed} of {self.num_files_to_process} files") 123 self.processor.dataset.update_progress(current_completed / self.num_files_to_process) 124 self.processed_files = current_completed 125 126 def check_service_exists(self): 127 """" 128 Check for services created with the current dataset key. 129 130 Returns None, if none found else the related service jobs and their details 131 """ 132 # Check to see if service already created 133 resp = requests.get(self.server_address + "jobs/details_query/", json={"details_key": "$.request_json.4CAT_dataset_key", "details_value": self.processor.dataset.key}, timeout=30) 134 if resp.status_code == 200: 135 # Check if service is already running 136 if len(resp.json()["jobs"]) > 0: 137 return resp.json()["jobs"] 138 else: 139 return None 140 141 def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True, callback=None): 142 """ 143 Send request and wait for results to be ready. 144 145 Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False. 146 If counts the number of files in the output folder and compares it to the number of input files. 147 """ 148 if self.local_or_remote == "local": 149 service_endpoint += "_local" 150 elif self.local_or_remote == "remote": 151 service_endpoint += "_remote" 152 else: 153 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 154 155 existing_service = self.check_service_exists() 156 if existing_service: 157 if len(existing_service) > 1: 158 raise DmiServiceManagerException("Multiple services found with the same dataset key.") 159 else: 160 existing_service = existing_service[0] 161 if existing_service['status'] == 'complete': 162 # Service already completed 163 return True 164 elif existing_service['status'] == 'error': 165 results = json.loads(existing_service['results']) 166 raise DmiServiceManagerException(f"DMI Service Manager Error: {results['error']}") 167 else: 168 # Service already running 169 results_url = self.server_address + f"jobs/{existing_service.get('id')}" 170 self.processor.dataset.update_status(f"Waiting for results from {service_endpoint}...") 171 else: 172 # Create a new service 173 # Append dataset key to data 174 data["4CAT_dataset_key"] = self.processor.dataset.key 175 api_endpoint = self.server_address + service_endpoint 176 try: 177 resp = requests.post(api_endpoint, json=data, timeout=30) 178 except requests.exceptions.ConnectionError as e : 179 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}") 180 181 if resp.status_code == 202: 182 # New request successful 183 results_url = resp.json()['result_url'] 184 else: 185 try: 186 resp_json = resp.json() 187 if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists": 188 # Request already exists; get DMI SM database key 189 raise DmiServiceManagerException(f"Request already exists; check that DMI SM is up to date") 190 elif resp.status_code == 404: 191 # Could be local vs remote not set correctly 192 raise DsmConnectionError(f"404: {resp.url} not found; DMI Service Manager may not be set up for this service") 193 else: 194 raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}") 195 except JSONDecodeError: 196 # Unexpected Error 197 raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}") 198 199 # Wait for results to be ready 200 self.processor.dataset.update_status(f"Generating results for {service_endpoint}...") 201 202 check_time = 0 203 success = False 204 connection_error = 0 205 last_status = None 206 while True: 207 # If interrupted is called, attempt to finish dataset while server still running 208 if self.processor.interrupted: 209 self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...", is_final=True) 210 break 211 212 # Send request to check status every wait_period seconds 213 if (time.time() - check_time) > wait_period: 214 check_time = time.time() 215 if callback: 216 callback(self) 217 218 try: 219 result = requests.get(results_url, timeout=30) 220 except requests.exceptions.ConnectionError as e: 221 # Have seen the Service Manager fail particularly when another processor is uploading many consecutive files 222 connection_error += 1 223 if connection_error > 3: 224 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}") 225 continue 226 227 if result.status_code != 200 or (result.json and result.json().get('status') != "success"): 228 # Unexpected response from DMI SM 229 connection_error += 1 230 if connection_error > 3: 231 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(result.status_code)}: {str(result.json()) if 'json' in result.headers.get('Content-Type', '') else str(result.text)}") 232 continue 233 service_status = result.json()["job"] 234 235 # Update progress 236 if check_process: 237 # Check for message 238 status_message = service_status.get('message', '') 239 current_completed = service_status.get('processed_records', False) 240 if status_message: 241 # Update status message if changed 242 if last_status != status_message: 243 last_status = service_status.get('message', '') 244 self.processor.dataset.update_status(last_status) 245 if current_completed and self.processed_files != int(current_completed): 246 self.processor.dataset.update_progress(int(current_completed) / self.num_files_to_process) 247 self.processed_files = int(current_completed) 248 else: 249 # This service does not provide status message; check progress via file count 250 self.check_progress() 251 252 if service_status['status'] in ["created", "running", "pending"]: 253 # Still running 254 continue 255 elif service_status['status'] in ["complete", "error"]: 256 results = json.loads(service_status['results']) 257 if not results: 258 # This should not be the case if the service was written well (unless the DMI SM crashed?) 259 #TODO test if timing issue? 260 connection_error += 1 261 if connection_error > 3: 262 raise DmiServiceManagerException(f"Unable to read DMI SM results: {service_status}") 263 if int(results['returncode']) == 0: 264 # Complete without error 265 self.processor.dataset.update_status(f"Completed {service_endpoint}!") 266 success = True 267 break 268 else: 269 error = results['error'] 270 if "CUDA error: out of memory" in error: 271 raise DsmOutOfMemory("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.") 272 else: 273 raise DmiServiceManagerException(f"Error {service_endpoint}: " + error) 274 else: 275 # Something botched 276 raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json())) 277 else: 278 time.sleep(1) 279 280 return success 281 282 def process_results(self, local_output_dir): 283 if self.local_or_remote == "local": 284 # Output files are already in local directory 285 pass 286 elif self.local_or_remote == "remote": 287 results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name) 288 self.processor.dataset.log(f"Downloading results from {results_path}...") 289 # Collect result filenames from server 290 result_files = self.request_folder_files(results_path) 291 for path, files in result_files.items(): 292 if path == '.': 293 self.download_results(files, results_path, local_output_dir) 294 else: 295 Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True) 296 self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path)) 297 298 def request_folder_files(self, folder_name): 299 """ 300 Request files from a folder on the DMI Service Manager server. 301 """ 302 filename_url = f"{self.server_address}list_filenames/{folder_name}" 303 retries = 0 304 while True: 305 try: 306 filename_response = requests.get(filename_url, timeout=30) 307 break 308 except requests.exceptions.ConnectionError as e: 309 retries += 1 310 if retries > 3: 311 raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading files from: {folder_name}") 312 continue 313 314 # Check if 4CAT has access to this server 315 if filename_response.status_code == 403: 316 raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server") 317 elif filename_response.status_code in [400, 405]: 318 raise DsmConnectionError(f"400: DMI Service Manager server {filename_response.json()['reason']}") 319 elif filename_response.status_code == 404: 320 # Folder not found; no files 321 return {} 322 elif filename_response.status_code != 200: 323 raise DmiServiceManagerException(f"Unknown response from DMI Service Manager: {filename_response.status_code} - {filename_response.reason}") 324 return filename_response.json() 325 326 def send_files(self, file_collection_name, results_name, files_to_upload, dir_with_files): 327 """ 328 Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management. 329 The path on the server to both the files and results will be returned. 330 331 A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be 332 stored there. A unique results folder will be created under the results_name folder so that multiple results 333 can be created based on a file collection if needed (without needing to re-upload files). 334 335 :param str file_collection_name: Name of collection; files will be uploaded to 'files' subfolder 336 :param str results_name: Name of results subfolder where output will be stored (and can be downloaded) 337 :param list files_to_upload: List of filenames to upload 338 :param Path dir_with_files: Local Path to files 339 :param Dataset dataset: Dataset object for status updates and unique key 340 :return Path, Path: Server's Path to files, Server's Path to results 341 """ 342 data = {'folder_name': file_collection_name} 343 344 # Check if files have already been sent 345 self.processor.dataset.update_status("Connecting to DMI Service Manager...") 346 existing_files = self.request_folder_files(file_collection_name) 347 uploaded_files = existing_files.get('4cat_uploads', []) 348 if len(uploaded_files) > 0: 349 self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files))) 350 351 # Compare files with previously uploaded 352 to_upload_filenames = [filename for filename in files_to_upload if filename not in uploaded_files] 353 total_files_to_upload = len(to_upload_filenames) 354 355 # Check if results folder exists 356 empty_placeholder = None 357 if results_name not in existing_files: 358 total_files_to_upload += 1 359 # Create a blank file to upload into results folder 360 empty_placeholder = f"4CAT_{results_name}_blank.txt" 361 with open(dir_with_files.joinpath(empty_placeholder), 'w') as file: 362 file.write('') 363 to_upload_filenames = [empty_placeholder] + to_upload_filenames 364 365 if total_files_to_upload > 0: 366 api_upload_endpoint = f"{self.server_address}send_files" 367 368 self.processor.dataset.update_status(f"Uploading {total_files_to_upload} files") 369 files_uploaded = 0 370 while to_upload_filenames: 371 upload_file = to_upload_filenames.pop() 372 self.processor.dataset.log(f"Uploading {upload_file}") 373 # Upload files 374 if upload_file == empty_placeholder: 375 # Upload a blank results file to results folder 376 response = requests.post(api_upload_endpoint, 377 files=[(results_name, open(dir_with_files.joinpath(upload_file), 'rb'))], 378 data=data, timeout=120) 379 else: 380 # All other files uploading to general upload folder belonging to parent dataset collection 381 response = requests.post(api_upload_endpoint, 382 files=[('4cat_uploads', open(dir_with_files.joinpath(upload_file), 'rb'))], 383 data=data, timeout=120) 384 385 if response.status_code == 200: 386 files_uploaded += 1 387 if files_uploaded % 1000 == 0: 388 self.processor.dataset.update_status(f"Uploaded {files_uploaded} of {total_files_to_upload} files!") 389 self.processor.dataset.update_progress(files_uploaded / total_files_to_upload) 390 elif response.status_code == 403: 391 raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server") 392 elif response.status_code == 405: 393 raise DsmConnectionError("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)") 394 else: 395 self.processor.dataset.log(f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file}") 396 397 try: 398 response_json = response.json() 399 except JSONDecodeError: 400 response_json = None 401 if response_json and "errors" in response.json(): 402 self.processor.dataset.log( 403 f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file} - {response.json()['errors']}") 404 405 self.processor.dataset.update_status(f"Uploaded {files_uploaded} files!") 406 407 server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads") 408 server_path_to_results = Path(file_collection_name).joinpath(results_name) 409 410 return server_path_to_files, server_path_to_results 411 412 def download_results(self, filenames_to_download, folder_name, local_output_dir, timeout=30): 413 """ 414 Download results from the DMI Service Manager server. 415 416 :param list filenames_to_download: List of filenames to download 417 :param str folder_name: Name of subfolder where files are localed (e.g. "results_name" or "files") 418 :param Path local_output_dir: Local Path to download files to 419 :param int timeout: Number of seconds to wait for a response from the server 420 """ 421 # Download the result files 422 api_upload_endpoint = f"{self.server_address}download/" 423 total_files_to_download = len(filenames_to_download) 424 files_downloaded = 0 425 self.processor.dataset.update_status(f"Downloading {total_files_to_download} files from {folder_name}...") 426 for filename in filenames_to_download: 427 retries = 0 428 while True: 429 try: 430 file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=timeout) 431 break 432 except requests.exceptions.ConnectionError as e: 433 retries += 1 434 if retries > 3: 435 raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading file: {folder_name}/{filename}") 436 continue 437 files_downloaded += 1 438 if files_downloaded % 1000 == 0: 439 self.processor.dataset.update_status(f"Downloaded {files_downloaded} of {total_files_to_download} files") 440 self.processor.dataset.update_progress(files_downloaded / total_files_to_download) 441 442 with open(local_output_dir.joinpath(filename), 'wb') as file: 443 file.write(file_response.content) 444 445 def sanitize_filenames(self, filename): 446 """ 447 If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to 448 ensure our filenames match what the server expects. 449 """ 450 if self.local_or_remote == "local": 451 return filename 452 elif self.local_or_remote == "remote": 453 return secure_filename(filename) 454 else: 455 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 456 457 @staticmethod 458 def get_folder_name(dataset): 459 """ 460 Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the 461 parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times). 462 463 This is only relevant for remote mode based on file management. 464 """ 465 return datetime.datetime.fromtimestamp(dataset.timestamp).strftime("%Y-%m-%d-%H%M%S") + '-' + \ 466 ''.join(e if e.isalnum() else '_' for e in dataset.get_label()) + '-' + \ 467 str(dataset.key) 468 469 @staticmethod 470 def count_local_files(directory): 471 """ 472 Get number of files in directory 473 """ 474 return len(os.listdir(directory))
24class DmiServiceManagerException(Exception): 25 """ 26 Raised when there is a problem with the configuration settings. 27 """ 28 pass
Raised when there is a problem with the configuration settings.
30class DsmOutOfMemory(DmiServiceManagerException): 31 """ 32 Raised when there is a problem with the configuration settings. 33 """ 34 pass
Raised when there is a problem with the configuration settings.
37class DsmConnectionError(DmiServiceManagerException): 38 """ 39 Raised when there is a problem with the configuration settings. 40 """ 41 pass
Raised when there is a problem with the configuration settings.
43class DmiServiceManager: 44 """ 45 Class to manage interactions with a DMI Service Manager server. 46 47 Found here: 48 https://github.com/digitalmethodsinitiative/dmi_service_manager 49 """ 50 def __init__(self, processor): 51 """ 52 """ 53 self.processor = processor 54 self.local_or_remote = processor.config.get("dmi-service-manager.ac_local_or_remote") 55 self.server_address = processor.config.get("dmi-service-manager.ab_server_address").rstrip("/") + "/api/" 56 57 self.processed_files = 0 58 59 self.num_files_to_process = None 60 self.server_file_collection_name = None 61 self.server_results_folder_name = None 62 self.path_to_files = None 63 self.path_to_results = None 64 65 def check_gpu_memory_available(self, service_endpoint): 66 """ 67 Returns tuple with True if server has some memory available and False otherwise as well as the JSON response 68 from server containing the memory information. 69 """ 70 api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint 71 resp = requests.get(api_endpoint, timeout=30) 72 if resp.status_code == 200: 73 return resp.json() 74 elif resp.status_code == 503: 75 # TODO: retry later (increase delay in dmi_service_manager class and interrupt w/ retry)? DSM could possibly manage jobs in queue 76 # Processor could run CPU mode, but DSM needs to run different container (container fails if GPU enabled but not available) 77 raise DsmOutOfMemory("DMI Service Manager server out of GPU memory.") 78 else: 79 try: 80 reason = resp.json()['reason'] 81 except JSONDecodeError: 82 reason = strip_tags(resp.text) 83 raise DsmConnectionError(f"Connection Error {resp.status_code}: {reason}") 84 85 def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name): 86 """ 87 Process files according to DMI Service Manager local or remote settings 88 """ 89 self.num_files_to_process = len(filenames) 90 91 # Upload files if necessary 92 if self.local_or_remote == "local": 93 # Relative to PATH_DATA which should be where Docker mounts the container volume 94 # TODO: path is just the staging_area name, but what if we move staging areas? DMI Service manager needs to know... 95 path_to_files = input_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute()) 96 path_to_results = output_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute()) 97 98 elif self.local_or_remote == "remote": 99 100 # Upload files 101 self.server_file_collection_name = server_file_collection_name 102 self.server_results_folder_name = server_results_folder_name 103 path_to_files, path_to_results = self.send_files(server_file_collection_name, server_results_folder_name, filenames, input_file_dir) 104 105 else: 106 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 107 108 self.path_to_files = path_to_files 109 self.path_to_results = path_to_results 110 return path_to_files, path_to_results 111 112 def check_progress(self): 113 if self.local_or_remote == "local": 114 current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results)) 115 elif self.local_or_remote == "remote": 116 existing_files = self.request_folder_files(self.server_file_collection_name) 117 current_completed = len(existing_files.get(self.server_results_folder_name, [])) 118 else: 119 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 120 121 if current_completed != self.processed_files: 122 self.processor.dataset.update_status( 123 f"Processed {current_completed} of {self.num_files_to_process} files") 124 self.processor.dataset.update_progress(current_completed / self.num_files_to_process) 125 self.processed_files = current_completed 126 127 def check_service_exists(self): 128 """" 129 Check for services created with the current dataset key. 130 131 Returns None, if none found else the related service jobs and their details 132 """ 133 # Check to see if service already created 134 resp = requests.get(self.server_address + "jobs/details_query/", json={"details_key": "$.request_json.4CAT_dataset_key", "details_value": self.processor.dataset.key}, timeout=30) 135 if resp.status_code == 200: 136 # Check if service is already running 137 if len(resp.json()["jobs"]) > 0: 138 return resp.json()["jobs"] 139 else: 140 return None 141 142 def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True, callback=None): 143 """ 144 Send request and wait for results to be ready. 145 146 Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False. 147 If counts the number of files in the output folder and compares it to the number of input files. 148 """ 149 if self.local_or_remote == "local": 150 service_endpoint += "_local" 151 elif self.local_or_remote == "remote": 152 service_endpoint += "_remote" 153 else: 154 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 155 156 existing_service = self.check_service_exists() 157 if existing_service: 158 if len(existing_service) > 1: 159 raise DmiServiceManagerException("Multiple services found with the same dataset key.") 160 else: 161 existing_service = existing_service[0] 162 if existing_service['status'] == 'complete': 163 # Service already completed 164 return True 165 elif existing_service['status'] == 'error': 166 results = json.loads(existing_service['results']) 167 raise DmiServiceManagerException(f"DMI Service Manager Error: {results['error']}") 168 else: 169 # Service already running 170 results_url = self.server_address + f"jobs/{existing_service.get('id')}" 171 self.processor.dataset.update_status(f"Waiting for results from {service_endpoint}...") 172 else: 173 # Create a new service 174 # Append dataset key to data 175 data["4CAT_dataset_key"] = self.processor.dataset.key 176 api_endpoint = self.server_address + service_endpoint 177 try: 178 resp = requests.post(api_endpoint, json=data, timeout=30) 179 except requests.exceptions.ConnectionError as e : 180 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}") 181 182 if resp.status_code == 202: 183 # New request successful 184 results_url = resp.json()['result_url'] 185 else: 186 try: 187 resp_json = resp.json() 188 if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists": 189 # Request already exists; get DMI SM database key 190 raise DmiServiceManagerException(f"Request already exists; check that DMI SM is up to date") 191 elif resp.status_code == 404: 192 # Could be local vs remote not set correctly 193 raise DsmConnectionError(f"404: {resp.url} not found; DMI Service Manager may not be set up for this service") 194 else: 195 raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}") 196 except JSONDecodeError: 197 # Unexpected Error 198 raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}") 199 200 # Wait for results to be ready 201 self.processor.dataset.update_status(f"Generating results for {service_endpoint}...") 202 203 check_time = 0 204 success = False 205 connection_error = 0 206 last_status = None 207 while True: 208 # If interrupted is called, attempt to finish dataset while server still running 209 if self.processor.interrupted: 210 self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...", is_final=True) 211 break 212 213 # Send request to check status every wait_period seconds 214 if (time.time() - check_time) > wait_period: 215 check_time = time.time() 216 if callback: 217 callback(self) 218 219 try: 220 result = requests.get(results_url, timeout=30) 221 except requests.exceptions.ConnectionError as e: 222 # Have seen the Service Manager fail particularly when another processor is uploading many consecutive files 223 connection_error += 1 224 if connection_error > 3: 225 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}") 226 continue 227 228 if result.status_code != 200 or (result.json and result.json().get('status') != "success"): 229 # Unexpected response from DMI SM 230 connection_error += 1 231 if connection_error > 3: 232 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(result.status_code)}: {str(result.json()) if 'json' in result.headers.get('Content-Type', '') else str(result.text)}") 233 continue 234 service_status = result.json()["job"] 235 236 # Update progress 237 if check_process: 238 # Check for message 239 status_message = service_status.get('message', '') 240 current_completed = service_status.get('processed_records', False) 241 if status_message: 242 # Update status message if changed 243 if last_status != status_message: 244 last_status = service_status.get('message', '') 245 self.processor.dataset.update_status(last_status) 246 if current_completed and self.processed_files != int(current_completed): 247 self.processor.dataset.update_progress(int(current_completed) / self.num_files_to_process) 248 self.processed_files = int(current_completed) 249 else: 250 # This service does not provide status message; check progress via file count 251 self.check_progress() 252 253 if service_status['status'] in ["created", "running", "pending"]: 254 # Still running 255 continue 256 elif service_status['status'] in ["complete", "error"]: 257 results = json.loads(service_status['results']) 258 if not results: 259 # This should not be the case if the service was written well (unless the DMI SM crashed?) 260 #TODO test if timing issue? 261 connection_error += 1 262 if connection_error > 3: 263 raise DmiServiceManagerException(f"Unable to read DMI SM results: {service_status}") 264 if int(results['returncode']) == 0: 265 # Complete without error 266 self.processor.dataset.update_status(f"Completed {service_endpoint}!") 267 success = True 268 break 269 else: 270 error = results['error'] 271 if "CUDA error: out of memory" in error: 272 raise DsmOutOfMemory("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.") 273 else: 274 raise DmiServiceManagerException(f"Error {service_endpoint}: " + error) 275 else: 276 # Something botched 277 raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json())) 278 else: 279 time.sleep(1) 280 281 return success 282 283 def process_results(self, local_output_dir): 284 if self.local_or_remote == "local": 285 # Output files are already in local directory 286 pass 287 elif self.local_or_remote == "remote": 288 results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name) 289 self.processor.dataset.log(f"Downloading results from {results_path}...") 290 # Collect result filenames from server 291 result_files = self.request_folder_files(results_path) 292 for path, files in result_files.items(): 293 if path == '.': 294 self.download_results(files, results_path, local_output_dir) 295 else: 296 Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True) 297 self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path)) 298 299 def request_folder_files(self, folder_name): 300 """ 301 Request files from a folder on the DMI Service Manager server. 302 """ 303 filename_url = f"{self.server_address}list_filenames/{folder_name}" 304 retries = 0 305 while True: 306 try: 307 filename_response = requests.get(filename_url, timeout=30) 308 break 309 except requests.exceptions.ConnectionError as e: 310 retries += 1 311 if retries > 3: 312 raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading files from: {folder_name}") 313 continue 314 315 # Check if 4CAT has access to this server 316 if filename_response.status_code == 403: 317 raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server") 318 elif filename_response.status_code in [400, 405]: 319 raise DsmConnectionError(f"400: DMI Service Manager server {filename_response.json()['reason']}") 320 elif filename_response.status_code == 404: 321 # Folder not found; no files 322 return {} 323 elif filename_response.status_code != 200: 324 raise DmiServiceManagerException(f"Unknown response from DMI Service Manager: {filename_response.status_code} - {filename_response.reason}") 325 return filename_response.json() 326 327 def send_files(self, file_collection_name, results_name, files_to_upload, dir_with_files): 328 """ 329 Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management. 330 The path on the server to both the files and results will be returned. 331 332 A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be 333 stored there. A unique results folder will be created under the results_name folder so that multiple results 334 can be created based on a file collection if needed (without needing to re-upload files). 335 336 :param str file_collection_name: Name of collection; files will be uploaded to 'files' subfolder 337 :param str results_name: Name of results subfolder where output will be stored (and can be downloaded) 338 :param list files_to_upload: List of filenames to upload 339 :param Path dir_with_files: Local Path to files 340 :param Dataset dataset: Dataset object for status updates and unique key 341 :return Path, Path: Server's Path to files, Server's Path to results 342 """ 343 data = {'folder_name': file_collection_name} 344 345 # Check if files have already been sent 346 self.processor.dataset.update_status("Connecting to DMI Service Manager...") 347 existing_files = self.request_folder_files(file_collection_name) 348 uploaded_files = existing_files.get('4cat_uploads', []) 349 if len(uploaded_files) > 0: 350 self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files))) 351 352 # Compare files with previously uploaded 353 to_upload_filenames = [filename for filename in files_to_upload if filename not in uploaded_files] 354 total_files_to_upload = len(to_upload_filenames) 355 356 # Check if results folder exists 357 empty_placeholder = None 358 if results_name not in existing_files: 359 total_files_to_upload += 1 360 # Create a blank file to upload into results folder 361 empty_placeholder = f"4CAT_{results_name}_blank.txt" 362 with open(dir_with_files.joinpath(empty_placeholder), 'w') as file: 363 file.write('') 364 to_upload_filenames = [empty_placeholder] + to_upload_filenames 365 366 if total_files_to_upload > 0: 367 api_upload_endpoint = f"{self.server_address}send_files" 368 369 self.processor.dataset.update_status(f"Uploading {total_files_to_upload} files") 370 files_uploaded = 0 371 while to_upload_filenames: 372 upload_file = to_upload_filenames.pop() 373 self.processor.dataset.log(f"Uploading {upload_file}") 374 # Upload files 375 if upload_file == empty_placeholder: 376 # Upload a blank results file to results folder 377 response = requests.post(api_upload_endpoint, 378 files=[(results_name, open(dir_with_files.joinpath(upload_file), 'rb'))], 379 data=data, timeout=120) 380 else: 381 # All other files uploading to general upload folder belonging to parent dataset collection 382 response = requests.post(api_upload_endpoint, 383 files=[('4cat_uploads', open(dir_with_files.joinpath(upload_file), 'rb'))], 384 data=data, timeout=120) 385 386 if response.status_code == 200: 387 files_uploaded += 1 388 if files_uploaded % 1000 == 0: 389 self.processor.dataset.update_status(f"Uploaded {files_uploaded} of {total_files_to_upload} files!") 390 self.processor.dataset.update_progress(files_uploaded / total_files_to_upload) 391 elif response.status_code == 403: 392 raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server") 393 elif response.status_code == 405: 394 raise DsmConnectionError("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)") 395 else: 396 self.processor.dataset.log(f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file}") 397 398 try: 399 response_json = response.json() 400 except JSONDecodeError: 401 response_json = None 402 if response_json and "errors" in response.json(): 403 self.processor.dataset.log( 404 f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file} - {response.json()['errors']}") 405 406 self.processor.dataset.update_status(f"Uploaded {files_uploaded} files!") 407 408 server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads") 409 server_path_to_results = Path(file_collection_name).joinpath(results_name) 410 411 return server_path_to_files, server_path_to_results 412 413 def download_results(self, filenames_to_download, folder_name, local_output_dir, timeout=30): 414 """ 415 Download results from the DMI Service Manager server. 416 417 :param list filenames_to_download: List of filenames to download 418 :param str folder_name: Name of subfolder where files are localed (e.g. "results_name" or "files") 419 :param Path local_output_dir: Local Path to download files to 420 :param int timeout: Number of seconds to wait for a response from the server 421 """ 422 # Download the result files 423 api_upload_endpoint = f"{self.server_address}download/" 424 total_files_to_download = len(filenames_to_download) 425 files_downloaded = 0 426 self.processor.dataset.update_status(f"Downloading {total_files_to_download} files from {folder_name}...") 427 for filename in filenames_to_download: 428 retries = 0 429 while True: 430 try: 431 file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=timeout) 432 break 433 except requests.exceptions.ConnectionError as e: 434 retries += 1 435 if retries > 3: 436 raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading file: {folder_name}/{filename}") 437 continue 438 files_downloaded += 1 439 if files_downloaded % 1000 == 0: 440 self.processor.dataset.update_status(f"Downloaded {files_downloaded} of {total_files_to_download} files") 441 self.processor.dataset.update_progress(files_downloaded / total_files_to_download) 442 443 with open(local_output_dir.joinpath(filename), 'wb') as file: 444 file.write(file_response.content) 445 446 def sanitize_filenames(self, filename): 447 """ 448 If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to 449 ensure our filenames match what the server expects. 450 """ 451 if self.local_or_remote == "local": 452 return filename 453 elif self.local_or_remote == "remote": 454 return secure_filename(filename) 455 else: 456 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 457 458 @staticmethod 459 def get_folder_name(dataset): 460 """ 461 Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the 462 parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times). 463 464 This is only relevant for remote mode based on file management. 465 """ 466 return datetime.datetime.fromtimestamp(dataset.timestamp).strftime("%Y-%m-%d-%H%M%S") + '-' + \ 467 ''.join(e if e.isalnum() else '_' for e in dataset.get_label()) + '-' + \ 468 str(dataset.key) 469 470 @staticmethod 471 def count_local_files(directory): 472 """ 473 Get number of files in directory 474 """ 475 return len(os.listdir(directory))
Class to manage interactions with a DMI Service Manager server.
Found here: https://github.com/digitalmethodsinitiative/dmi_service_manager
50 def __init__(self, processor): 51 """ 52 """ 53 self.processor = processor 54 self.local_or_remote = processor.config.get("dmi-service-manager.ac_local_or_remote") 55 self.server_address = processor.config.get("dmi-service-manager.ab_server_address").rstrip("/") + "/api/" 56 57 self.processed_files = 0 58 59 self.num_files_to_process = None 60 self.server_file_collection_name = None 61 self.server_results_folder_name = None 62 self.path_to_files = None 63 self.path_to_results = None
65 def check_gpu_memory_available(self, service_endpoint): 66 """ 67 Returns tuple with True if server has some memory available and False otherwise as well as the JSON response 68 from server containing the memory information. 69 """ 70 api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint 71 resp = requests.get(api_endpoint, timeout=30) 72 if resp.status_code == 200: 73 return resp.json() 74 elif resp.status_code == 503: 75 # TODO: retry later (increase delay in dmi_service_manager class and interrupt w/ retry)? DSM could possibly manage jobs in queue 76 # Processor could run CPU mode, but DSM needs to run different container (container fails if GPU enabled but not available) 77 raise DsmOutOfMemory("DMI Service Manager server out of GPU memory.") 78 else: 79 try: 80 reason = resp.json()['reason'] 81 except JSONDecodeError: 82 reason = strip_tags(resp.text) 83 raise DsmConnectionError(f"Connection Error {resp.status_code}: {reason}")
Returns tuple with True if server has some memory available and False otherwise as well as the JSON response from server containing the memory information.
85 def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name): 86 """ 87 Process files according to DMI Service Manager local or remote settings 88 """ 89 self.num_files_to_process = len(filenames) 90 91 # Upload files if necessary 92 if self.local_or_remote == "local": 93 # Relative to PATH_DATA which should be where Docker mounts the container volume 94 # TODO: path is just the staging_area name, but what if we move staging areas? DMI Service manager needs to know... 95 path_to_files = input_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute()) 96 path_to_results = output_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute()) 97 98 elif self.local_or_remote == "remote": 99 100 # Upload files 101 self.server_file_collection_name = server_file_collection_name 102 self.server_results_folder_name = server_results_folder_name 103 path_to_files, path_to_results = self.send_files(server_file_collection_name, server_results_folder_name, filenames, input_file_dir) 104 105 else: 106 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 107 108 self.path_to_files = path_to_files 109 self.path_to_results = path_to_results 110 return path_to_files, path_to_results
Process files according to DMI Service Manager local or remote settings
112 def check_progress(self): 113 if self.local_or_remote == "local": 114 current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results)) 115 elif self.local_or_remote == "remote": 116 existing_files = self.request_folder_files(self.server_file_collection_name) 117 current_completed = len(existing_files.get(self.server_results_folder_name, [])) 118 else: 119 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 120 121 if current_completed != self.processed_files: 122 self.processor.dataset.update_status( 123 f"Processed {current_completed} of {self.num_files_to_process} files") 124 self.processor.dataset.update_progress(current_completed / self.num_files_to_process) 125 self.processed_files = current_completed
127 def check_service_exists(self): 128 """" 129 Check for services created with the current dataset key. 130 131 Returns None, if none found else the related service jobs and their details 132 """ 133 # Check to see if service already created 134 resp = requests.get(self.server_address + "jobs/details_query/", json={"details_key": "$.request_json.4CAT_dataset_key", "details_value": self.processor.dataset.key}, timeout=30) 135 if resp.status_code == 200: 136 # Check if service is already running 137 if len(resp.json()["jobs"]) > 0: 138 return resp.json()["jobs"] 139 else: 140 return None
" Check for services created with the current dataset key.
Returns None, if none found else the related service jobs and their details
142 def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True, callback=None): 143 """ 144 Send request and wait for results to be ready. 145 146 Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False. 147 If counts the number of files in the output folder and compares it to the number of input files. 148 """ 149 if self.local_or_remote == "local": 150 service_endpoint += "_local" 151 elif self.local_or_remote == "remote": 152 service_endpoint += "_remote" 153 else: 154 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'") 155 156 existing_service = self.check_service_exists() 157 if existing_service: 158 if len(existing_service) > 1: 159 raise DmiServiceManagerException("Multiple services found with the same dataset key.") 160 else: 161 existing_service = existing_service[0] 162 if existing_service['status'] == 'complete': 163 # Service already completed 164 return True 165 elif existing_service['status'] == 'error': 166 results = json.loads(existing_service['results']) 167 raise DmiServiceManagerException(f"DMI Service Manager Error: {results['error']}") 168 else: 169 # Service already running 170 results_url = self.server_address + f"jobs/{existing_service.get('id')}" 171 self.processor.dataset.update_status(f"Waiting for results from {service_endpoint}...") 172 else: 173 # Create a new service 174 # Append dataset key to data 175 data["4CAT_dataset_key"] = self.processor.dataset.key 176 api_endpoint = self.server_address + service_endpoint 177 try: 178 resp = requests.post(api_endpoint, json=data, timeout=30) 179 except requests.exceptions.ConnectionError as e : 180 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}") 181 182 if resp.status_code == 202: 183 # New request successful 184 results_url = resp.json()['result_url'] 185 else: 186 try: 187 resp_json = resp.json() 188 if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists": 189 # Request already exists; get DMI SM database key 190 raise DmiServiceManagerException(f"Request already exists; check that DMI SM is up to date") 191 elif resp.status_code == 404: 192 # Could be local vs remote not set correctly 193 raise DsmConnectionError(f"404: {resp.url} not found; DMI Service Manager may not be set up for this service") 194 else: 195 raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}") 196 except JSONDecodeError: 197 # Unexpected Error 198 raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}") 199 200 # Wait for results to be ready 201 self.processor.dataset.update_status(f"Generating results for {service_endpoint}...") 202 203 check_time = 0 204 success = False 205 connection_error = 0 206 last_status = None 207 while True: 208 # If interrupted is called, attempt to finish dataset while server still running 209 if self.processor.interrupted: 210 self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...", is_final=True) 211 break 212 213 # Send request to check status every wait_period seconds 214 if (time.time() - check_time) > wait_period: 215 check_time = time.time() 216 if callback: 217 callback(self) 218 219 try: 220 result = requests.get(results_url, timeout=30) 221 except requests.exceptions.ConnectionError as e: 222 # Have seen the Service Manager fail particularly when another processor is uploading many consecutive files 223 connection_error += 1 224 if connection_error > 3: 225 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}") 226 continue 227 228 if result.status_code != 200 or (result.json and result.json().get('status') != "success"): 229 # Unexpected response from DMI SM 230 connection_error += 1 231 if connection_error > 3: 232 raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(result.status_code)}: {str(result.json()) if 'json' in result.headers.get('Content-Type', '') else str(result.text)}") 233 continue 234 service_status = result.json()["job"] 235 236 # Update progress 237 if check_process: 238 # Check for message 239 status_message = service_status.get('message', '') 240 current_completed = service_status.get('processed_records', False) 241 if status_message: 242 # Update status message if changed 243 if last_status != status_message: 244 last_status = service_status.get('message', '') 245 self.processor.dataset.update_status(last_status) 246 if current_completed and self.processed_files != int(current_completed): 247 self.processor.dataset.update_progress(int(current_completed) / self.num_files_to_process) 248 self.processed_files = int(current_completed) 249 else: 250 # This service does not provide status message; check progress via file count 251 self.check_progress() 252 253 if service_status['status'] in ["created", "running", "pending"]: 254 # Still running 255 continue 256 elif service_status['status'] in ["complete", "error"]: 257 results = json.loads(service_status['results']) 258 if not results: 259 # This should not be the case if the service was written well (unless the DMI SM crashed?) 260 #TODO test if timing issue? 261 connection_error += 1 262 if connection_error > 3: 263 raise DmiServiceManagerException(f"Unable to read DMI SM results: {service_status}") 264 if int(results['returncode']) == 0: 265 # Complete without error 266 self.processor.dataset.update_status(f"Completed {service_endpoint}!") 267 success = True 268 break 269 else: 270 error = results['error'] 271 if "CUDA error: out of memory" in error: 272 raise DsmOutOfMemory("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.") 273 else: 274 raise DmiServiceManagerException(f"Error {service_endpoint}: " + error) 275 else: 276 # Something botched 277 raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json())) 278 else: 279 time.sleep(1) 280 281 return success
Send request and wait for results to be ready.
Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False. If counts the number of files in the output folder and compares it to the number of input files.
283 def process_results(self, local_output_dir): 284 if self.local_or_remote == "local": 285 # Output files are already in local directory 286 pass 287 elif self.local_or_remote == "remote": 288 results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name) 289 self.processor.dataset.log(f"Downloading results from {results_path}...") 290 # Collect result filenames from server 291 result_files = self.request_folder_files(results_path) 292 for path, files in result_files.items(): 293 if path == '.': 294 self.download_results(files, results_path, local_output_dir) 295 else: 296 Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True) 297 self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path))
299 def request_folder_files(self, folder_name): 300 """ 301 Request files from a folder on the DMI Service Manager server. 302 """ 303 filename_url = f"{self.server_address}list_filenames/{folder_name}" 304 retries = 0 305 while True: 306 try: 307 filename_response = requests.get(filename_url, timeout=30) 308 break 309 except requests.exceptions.ConnectionError as e: 310 retries += 1 311 if retries > 3: 312 raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading files from: {folder_name}") 313 continue 314 315 # Check if 4CAT has access to this server 316 if filename_response.status_code == 403: 317 raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server") 318 elif filename_response.status_code in [400, 405]: 319 raise DsmConnectionError(f"400: DMI Service Manager server {filename_response.json()['reason']}") 320 elif filename_response.status_code == 404: 321 # Folder not found; no files 322 return {} 323 elif filename_response.status_code != 200: 324 raise DmiServiceManagerException(f"Unknown response from DMI Service Manager: {filename_response.status_code} - {filename_response.reason}") 325 return filename_response.json()
Request files from a folder on the DMI Service Manager server.
327 def send_files(self, file_collection_name, results_name, files_to_upload, dir_with_files): 328 """ 329 Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management. 330 The path on the server to both the files and results will be returned. 331 332 A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be 333 stored there. A unique results folder will be created under the results_name folder so that multiple results 334 can be created based on a file collection if needed (without needing to re-upload files). 335 336 :param str file_collection_name: Name of collection; files will be uploaded to 'files' subfolder 337 :param str results_name: Name of results subfolder where output will be stored (and can be downloaded) 338 :param list files_to_upload: List of filenames to upload 339 :param Path dir_with_files: Local Path to files 340 :param Dataset dataset: Dataset object for status updates and unique key 341 :return Path, Path: Server's Path to files, Server's Path to results 342 """ 343 data = {'folder_name': file_collection_name} 344 345 # Check if files have already been sent 346 self.processor.dataset.update_status("Connecting to DMI Service Manager...") 347 existing_files = self.request_folder_files(file_collection_name) 348 uploaded_files = existing_files.get('4cat_uploads', []) 349 if len(uploaded_files) > 0: 350 self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files))) 351 352 # Compare files with previously uploaded 353 to_upload_filenames = [filename for filename in files_to_upload if filename not in uploaded_files] 354 total_files_to_upload = len(to_upload_filenames) 355 356 # Check if results folder exists 357 empty_placeholder = None 358 if results_name not in existing_files: 359 total_files_to_upload += 1 360 # Create a blank file to upload into results folder 361 empty_placeholder = f"4CAT_{results_name}_blank.txt" 362 with open(dir_with_files.joinpath(empty_placeholder), 'w') as file: 363 file.write('') 364 to_upload_filenames = [empty_placeholder] + to_upload_filenames 365 366 if total_files_to_upload > 0: 367 api_upload_endpoint = f"{self.server_address}send_files" 368 369 self.processor.dataset.update_status(f"Uploading {total_files_to_upload} files") 370 files_uploaded = 0 371 while to_upload_filenames: 372 upload_file = to_upload_filenames.pop() 373 self.processor.dataset.log(f"Uploading {upload_file}") 374 # Upload files 375 if upload_file == empty_placeholder: 376 # Upload a blank results file to results folder 377 response = requests.post(api_upload_endpoint, 378 files=[(results_name, open(dir_with_files.joinpath(upload_file), 'rb'))], 379 data=data, timeout=120) 380 else: 381 # All other files uploading to general upload folder belonging to parent dataset collection 382 response = requests.post(api_upload_endpoint, 383 files=[('4cat_uploads', open(dir_with_files.joinpath(upload_file), 'rb'))], 384 data=data, timeout=120) 385 386 if response.status_code == 200: 387 files_uploaded += 1 388 if files_uploaded % 1000 == 0: 389 self.processor.dataset.update_status(f"Uploaded {files_uploaded} of {total_files_to_upload} files!") 390 self.processor.dataset.update_progress(files_uploaded / total_files_to_upload) 391 elif response.status_code == 403: 392 raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server") 393 elif response.status_code == 405: 394 raise DsmConnectionError("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)") 395 else: 396 self.processor.dataset.log(f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file}") 397 398 try: 399 response_json = response.json() 400 except JSONDecodeError: 401 response_json = None 402 if response_json and "errors" in response.json(): 403 self.processor.dataset.log( 404 f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file} - {response.json()['errors']}") 405 406 self.processor.dataset.update_status(f"Uploaded {files_uploaded} files!") 407 408 server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads") 409 server_path_to_results = Path(file_collection_name).joinpath(results_name) 410 411 return server_path_to_files, server_path_to_results
Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management. The path on the server to both the files and results will be returned.
A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be stored there. A unique results folder will be created under the results_name folder so that multiple results can be created based on a file collection if needed (without needing to re-upload files).
Parameters
- str file_collection_name: Name of collection; files will be uploaded to 'files' subfolder
- str results_name: Name of results subfolder where output will be stored (and can be downloaded)
- list files_to_upload: List of filenames to upload
- Path dir_with_files: Local Path to files
- Dataset dataset: Dataset object for status updates and unique key
Returns
Server's Path to files, Server's Path to results
413 def download_results(self, filenames_to_download, folder_name, local_output_dir, timeout=30): 414 """ 415 Download results from the DMI Service Manager server. 416 417 :param list filenames_to_download: List of filenames to download 418 :param str folder_name: Name of subfolder where files are localed (e.g. "results_name" or "files") 419 :param Path local_output_dir: Local Path to download files to 420 :param int timeout: Number of seconds to wait for a response from the server 421 """ 422 # Download the result files 423 api_upload_endpoint = f"{self.server_address}download/" 424 total_files_to_download = len(filenames_to_download) 425 files_downloaded = 0 426 self.processor.dataset.update_status(f"Downloading {total_files_to_download} files from {folder_name}...") 427 for filename in filenames_to_download: 428 retries = 0 429 while True: 430 try: 431 file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=timeout) 432 break 433 except requests.exceptions.ConnectionError as e: 434 retries += 1 435 if retries > 3: 436 raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading file: {folder_name}/{filename}") 437 continue 438 files_downloaded += 1 439 if files_downloaded % 1000 == 0: 440 self.processor.dataset.update_status(f"Downloaded {files_downloaded} of {total_files_to_download} files") 441 self.processor.dataset.update_progress(files_downloaded / total_files_to_download) 442 443 with open(local_output_dir.joinpath(filename), 'wb') as file: 444 file.write(file_response.content)
Download results from the DMI Service Manager server.
Parameters
- list filenames_to_download: List of filenames to download
- str folder_name: Name of subfolder where files are localed (e.g. "results_name" or "files")
- Path local_output_dir: Local Path to download files to
- int timeout: Number of seconds to wait for a response from the server
446 def sanitize_filenames(self, filename): 447 """ 448 If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to 449 ensure our filenames match what the server expects. 450 """ 451 if self.local_or_remote == "local": 452 return filename 453 elif self.local_or_remote == "remote": 454 return secure_filename(filename) 455 else: 456 raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to ensure our filenames match what the server expects.
458 @staticmethod 459 def get_folder_name(dataset): 460 """ 461 Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the 462 parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times). 463 464 This is only relevant for remote mode based on file management. 465 """ 466 return datetime.datetime.fromtimestamp(dataset.timestamp).strftime("%Y-%m-%d-%H%M%S") + '-' + \ 467 ''.join(e if e.isalnum() else '_' for e in dataset.get_label()) + '-' + \ 468 str(dataset.key)
Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times).
This is only relevant for remote mode based on file management.