Edit on GitHub

common.lib.dmi_service_manager

DMI Service Manager

  1"""
  2DMI Service Manager
  3"""
  4import datetime
  5import os
  6import json
  7import time
  8from json import JSONDecodeError
  9from werkzeug.utils import secure_filename
 10
 11import requests
 12from pathlib import Path
 13
 14
 15__author__ = "Dale Wahl"
 16__credits__ = ["Dale Wahl"]
 17__maintainer__ = "Dale Wahl"
 18__email__ = "4cat@oilab.eu"
 19
 20from common.lib.helpers import strip_tags
 21
 22
 23class DmiServiceManagerException(Exception):
 24    """
 25    Raised when there is a problem with the configuration settings.
 26    """
 27    pass
 28
 29class DsmOutOfMemory(DmiServiceManagerException):
 30    """
 31    Raised when there is a problem with the configuration settings.
 32    """
 33    pass
 34
 35
 36class DsmConnectionError(DmiServiceManagerException):
 37    """
 38    Raised when there is a problem with the configuration settings.
 39    """
 40    pass
 41
 42class DmiServiceManager:
 43    """
 44    Class to manage interactions with a DMI Service Manager server.
 45
 46    Found here:
 47    https://github.com/digitalmethodsinitiative/dmi_service_manager
 48    """
 49    def __init__(self, processor):
 50        """
 51        """
 52        self.processor = processor
 53        self.local_or_remote = processor.config.get("dmi-service-manager.ac_local_or_remote")
 54        self.server_address = processor.config.get("dmi-service-manager.ab_server_address").rstrip("/") + "/api/"
 55
 56        self.processed_files = 0
 57
 58        self.num_files_to_process = None
 59        self.server_file_collection_name = None
 60        self.server_results_folder_name = None
 61        self.path_to_files = None
 62        self.path_to_results = None
 63
 64    def check_gpu_memory_available(self, service_endpoint):
 65        """
 66        Returns tuple with True if server has some memory available and  False otherwise as well as the JSON response
 67        from server containing the memory information.
 68        """
 69        api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint
 70        resp = requests.get(api_endpoint, timeout=30)
 71        if resp.status_code == 200:
 72            return resp.json()
 73        elif resp.status_code == 503:
 74            # TODO: retry later (increase delay in dmi_service_manager class and interrupt w/ retry)? DSM could possibly manage jobs in queue
 75            # Processor could run CPU mode, but DSM needs to run different container (container fails if GPU enabled but not available)
 76            raise DsmOutOfMemory("DMI Service Manager server out of GPU memory.")
 77        else:
 78            try:
 79                reason = resp.json()['reason']
 80            except JSONDecodeError:
 81                reason = strip_tags(resp.text)
 82            raise DsmConnectionError(f"Connection Error {resp.status_code}: {reason}")
 83
 84    def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name):
 85        """
 86        Process files according to DMI Service Manager local or remote settings
 87        """
 88        self.num_files_to_process = len(filenames)
 89
 90        # Upload files if necessary
 91        if self.local_or_remote == "local":
 92            # Relative to PATH_DATA which should be where Docker mounts the container volume
 93            # TODO: path is just the staging_area name, but what if we move staging areas? DMI Service manager needs to know...
 94            path_to_files = input_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute())
 95            path_to_results = output_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute())
 96
 97        elif self.local_or_remote == "remote":
 98
 99            # Upload files
100            self.server_file_collection_name = server_file_collection_name
101            self.server_results_folder_name = server_results_folder_name
102            path_to_files, path_to_results = self.send_files(server_file_collection_name, server_results_folder_name, filenames, input_file_dir)
103
104        else:
105            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
106
107        self.path_to_files = path_to_files
108        self.path_to_results = path_to_results
109        return path_to_files, path_to_results
110
111    def check_progress(self):
112        if self.local_or_remote == "local":
113            current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results))
114        elif self.local_or_remote == "remote":
115            existing_files = self.request_folder_files(self.server_file_collection_name)
116            current_completed = len(existing_files.get(self.server_results_folder_name, []))
117        else:
118            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
119
120        if current_completed != self.processed_files:
121            self.processor.dataset.update_status(
122                f"Processed {current_completed} of {self.num_files_to_process} files")
123            self.processor.dataset.update_progress(current_completed / self.num_files_to_process)
124            self.processed_files = current_completed
125
126    def check_service_exists(self):
127        """"
128        Check for services created with the current dataset key.
129
130        Returns None, if none found else the related service jobs and their details
131        """
132        # Check to see if service already created
133        resp = requests.get(self.server_address + "jobs/details_query/", json={"details_key": "$.request_json.4CAT_dataset_key", "details_value": self.processor.dataset.key}, timeout=30)
134        if resp.status_code == 200:
135            # Check if service is already running
136            if len(resp.json()["jobs"]) > 0:
137                return resp.json()["jobs"]
138            else:
139                return None
140
141    def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True, callback=None):
142        """
143        Send request and wait for results to be ready.
144
145        Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False.
146        If counts the number of files in the output folder and compares it to the number of input files.
147        """
148        if self.local_or_remote == "local":
149            service_endpoint += "_local"
150        elif self.local_or_remote == "remote":
151            service_endpoint += "_remote"
152        else:
153            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
154
155        existing_service = self.check_service_exists()
156        if existing_service:
157            if len(existing_service) > 1:
158                raise DmiServiceManagerException("Multiple services found with the same dataset key.")
159            else:
160                existing_service = existing_service[0]
161                if existing_service['status'] == 'complete':
162                    # Service already completed
163                    return True
164                elif existing_service['status'] == 'error':
165                    results = json.loads(existing_service['results'])
166                    raise DmiServiceManagerException(f"DMI Service Manager Error: {results['error']}")
167                else:
168                    # Service already running
169                    results_url = self.server_address + f"jobs/{existing_service.get('id')}"
170                    self.processor.dataset.update_status(f"Waiting for results from {service_endpoint}...")
171        else:
172            # Create a new service
173            # Append dataset key to data
174            data["4CAT_dataset_key"] = self.processor.dataset.key
175            api_endpoint = self.server_address + service_endpoint
176            try:
177                resp = requests.post(api_endpoint, json=data, timeout=30)
178            except requests.exceptions.ConnectionError as e :
179                raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}")
180
181            if resp.status_code == 202:
182                # New request successful
183                results_url = resp.json()['result_url']
184            else:
185                try:
186                    resp_json = resp.json()
187                    if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists":
188                        # Request already exists; get DMI SM database key
189                        raise DmiServiceManagerException(f"Request already exists; check that DMI SM is up to date")
190                    elif resp.status_code == 404:
191                        # Could be local vs remote not set correctly
192                        raise DsmConnectionError(f"404: {resp.url} not found; DMI Service Manager may not be set up for this service")
193                    else:
194                        raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
195                except JSONDecodeError:
196                    # Unexpected Error
197                    raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}")
198
199            # Wait for results to be ready
200            self.processor.dataset.update_status(f"Generating results for {service_endpoint}...")
201
202        check_time = 0
203        success = False
204        connection_error = 0
205        last_status = None
206        while True:
207            # If interrupted is called, attempt to finish dataset while server still running
208            if self.processor.interrupted:
209                self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...", is_final=True)
210                break
211
212            # Send request to check status every wait_period seconds
213            if (time.time() - check_time) > wait_period:
214                check_time = time.time()
215                if callback:
216                    callback(self)
217
218                try:
219                    result = requests.get(results_url, timeout=30)
220                except requests.exceptions.ConnectionError as e:
221                    # Have seen the Service Manager fail particularly when another processor is uploading many consecutive files
222                    connection_error += 1
223                    if connection_error > 3:
224                        raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}")
225                    continue
226
227                if result.status_code != 200 or (result.json and result.json().get('status') != "success"):
228                    # Unexpected response from DMI SM
229                    connection_error += 1
230                    if connection_error > 3:
231                        raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(result.status_code)}: {str(result.json()) if 'json' in result.headers.get('Content-Type', '') else str(result.text)}")
232                    continue
233                service_status = result.json()["job"]
234
235                # Update progress
236                if check_process:
237                    # Check for message
238                    status_message = service_status.get('message', '')
239                    current_completed = service_status.get('processed_records', False)
240                    if status_message:
241                        # Update status message if changed
242                        if last_status != status_message:
243                            last_status = service_status.get('message', '')
244                            self.processor.dataset.update_status(last_status)
245                        if current_completed and self.processed_files != int(current_completed):
246                            self.processor.dataset.update_progress(int(current_completed) / self.num_files_to_process)
247                            self.processed_files = int(current_completed)
248                    else:
249                        # This service does not provide status message; check progress via file count
250                        self.check_progress()
251
252                if service_status['status'] in ["created", "running", "pending"]:
253                    # Still running
254                    continue
255                elif service_status['status'] in ["complete", "error"]:
256                    results = json.loads(service_status['results'])
257                    if not results:
258                        # This should not be the case if the service was written well (unless the DMI SM crashed?)
259                        #TODO test if timing issue?
260                        connection_error += 1
261                        if connection_error > 3:
262                            raise DmiServiceManagerException(f"Unable to read DMI SM results: {service_status}")
263                    if int(results['returncode']) == 0:
264                        # Complete without error
265                        self.processor.dataset.update_status(f"Completed {service_endpoint}!")
266                        success = True
267                        break
268                    else:
269                        error = results['error']
270                        if "CUDA error: out of memory" in error:
271                            raise DsmOutOfMemory("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.")
272                        else:
273                            raise DmiServiceManagerException(f"Error {service_endpoint}: " + error)
274                else:
275                    # Something botched
276                    raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
277            else:
278                time.sleep(1)
279
280        return success
281
282    def process_results(self, local_output_dir):
283        if self.local_or_remote == "local":
284            # Output files are already in local directory
285            pass
286        elif self.local_or_remote == "remote":
287            results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name)
288            self.processor.dataset.log(f"Downloading results from {results_path}...")
289            # Collect result filenames from server
290            result_files = self.request_folder_files(results_path)
291            for path, files in result_files.items():
292                if path == '.':
293                    self.download_results(files, results_path, local_output_dir)
294                else:
295                    Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True)
296                    self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path))
297
298    def request_folder_files(self, folder_name):
299        """
300        Request files from a folder on the DMI Service Manager server.
301        """
302        filename_url = f"{self.server_address}list_filenames/{folder_name}"
303        retries = 0
304        while True:
305            try:
306                filename_response = requests.get(filename_url, timeout=30)
307                break
308            except requests.exceptions.ConnectionError as e:
309                retries += 1
310                if retries > 3:
311                    raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading files from: {folder_name}")
312                continue
313
314        # Check if 4CAT has access to this server
315        if filename_response.status_code == 403:
316            raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server")
317        elif filename_response.status_code in [400, 405]:
318            raise DsmConnectionError(f"400: DMI Service Manager server {filename_response.json()['reason']}")
319        elif filename_response.status_code == 404:
320            # Folder not found; no files
321            return {}
322        elif filename_response.status_code != 200:
323            raise DmiServiceManagerException(f"Unknown response from DMI Service Manager: {filename_response.status_code} - {filename_response.reason}")
324        return filename_response.json()
325
326    def send_files(self, file_collection_name, results_name, files_to_upload, dir_with_files):
327        """
328        Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management.
329        The path on the server to both the files and results will be returned.
330
331        A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be
332        stored there. A unique results folder will be created under the results_name folder so that multiple results
333        can be created based on a file collection if needed (without needing to re-upload files).
334
335        :param str file_collection_name:    Name of collection; files will be uploaded to 'files' subfolder
336        :param str results_name:            Name of results subfolder where output will be stored (and can be downloaded)
337        :param list files_to_upload:        List of filenames to upload
338        :param Path dir_with_files:         Local Path to files
339        :param Dataset dataset:             Dataset object for status updates and unique key
340        :return Path, Path:                 Server's Path to files, Server's Path to results
341        """
342        data = {'folder_name': file_collection_name}
343
344        # Check if files have already been sent
345        self.processor.dataset.update_status("Connecting to DMI Service Manager...")
346        existing_files = self.request_folder_files(file_collection_name)
347        uploaded_files = existing_files.get('4cat_uploads', [])
348        if len(uploaded_files) > 0:
349            self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files)))
350
351        # Compare files with previously uploaded
352        to_upload_filenames = [filename for filename in files_to_upload if filename not in uploaded_files]
353        total_files_to_upload = len(to_upload_filenames)
354
355        # Check if results folder exists
356        empty_placeholder = None
357        if results_name not in existing_files:
358            total_files_to_upload += 1
359            # Create a blank file to upload into results folder
360            empty_placeholder = f"4CAT_{results_name}_blank.txt"
361            with open(dir_with_files.joinpath(empty_placeholder), 'w') as file:
362                file.write('')
363            to_upload_filenames = [empty_placeholder] + to_upload_filenames
364
365        if total_files_to_upload > 0:
366            api_upload_endpoint = f"{self.server_address}send_files"
367
368            self.processor.dataset.update_status(f"Uploading {total_files_to_upload} files")
369            files_uploaded = 0
370            while to_upload_filenames:
371                upload_file = to_upload_filenames.pop()
372                self.processor.dataset.log(f"Uploading {upload_file}")
373                # Upload files
374                if upload_file == empty_placeholder:
375                    # Upload a blank results file to results folder
376                    response = requests.post(api_upload_endpoint,
377                                             files=[(results_name, open(dir_with_files.joinpath(upload_file), 'rb'))],
378                                             data=data, timeout=120)
379                else:
380                    # All other files uploading to general upload folder belonging to parent dataset collection
381                    response = requests.post(api_upload_endpoint,
382                                             files=[('4cat_uploads', open(dir_with_files.joinpath(upload_file), 'rb'))],
383                                             data=data, timeout=120)
384
385                if response.status_code == 200:
386                    files_uploaded += 1
387                    if files_uploaded % 1000 == 0:
388                        self.processor.dataset.update_status(f"Uploaded {files_uploaded} of {total_files_to_upload} files!")
389                    self.processor.dataset.update_progress(files_uploaded / total_files_to_upload)
390                elif response.status_code == 403:
391                    raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server")
392                elif response.status_code == 405:
393                    raise DsmConnectionError("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)")
394                else:
395                    self.processor.dataset.log(f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file}")
396
397                try:
398                    response_json = response.json()
399                except JSONDecodeError:
400                    response_json = None
401                if response_json and "errors" in response.json():
402                    self.processor.dataset.log(
403                        f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file} - {response.json()['errors']}")
404
405            self.processor.dataset.update_status(f"Uploaded {files_uploaded} files!")
406
407        server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads")
408        server_path_to_results = Path(file_collection_name).joinpath(results_name)
409
410        return server_path_to_files, server_path_to_results
411
412    def download_results(self, filenames_to_download, folder_name, local_output_dir, timeout=30):
413        """
414        Download results from the DMI Service Manager server.
415
416        :param list filenames_to_download:  List of filenames to download
417        :param str folder_name:             Name of subfolder where files are localed (e.g. "results_name" or "files")
418        :param Path local_output_dir:       Local Path to download files to
419        :param int timeout:                 Number of seconds to wait for a response from the server
420        """
421        # Download the result files
422        api_upload_endpoint = f"{self.server_address}download/"
423        total_files_to_download = len(filenames_to_download)
424        files_downloaded = 0
425        self.processor.dataset.update_status(f"Downloading {total_files_to_download} files from {folder_name}...")
426        for filename in filenames_to_download:
427            retries = 0
428            while True:
429                try:
430                    file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=timeout)
431                    break
432                except requests.exceptions.ConnectionError as e:
433                    retries += 1
434                    if retries > 3:
435                        raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading file: {folder_name}/{filename}")
436                    continue
437            files_downloaded += 1
438            if files_downloaded % 1000 == 0:
439                self.processor.dataset.update_status(f"Downloaded {files_downloaded} of {total_files_to_download} files")
440            self.processor.dataset.update_progress(files_downloaded / total_files_to_download)
441
442            with open(local_output_dir.joinpath(filename), 'wb') as file:
443                file.write(file_response.content)
444
445    def sanitize_filenames(self, filename):
446        """
447        If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to
448        ensure our filenames match what the server expects.
449        """
450        if self.local_or_remote == "local":
451            return filename
452        elif self.local_or_remote == "remote":
453            return secure_filename(filename)
454        else:
455            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
456
457    @staticmethod
458    def get_folder_name(dataset):
459        """
460        Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the
461        parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times).
462
463        This is only relevant for remote mode based on file management.
464        """
465        return datetime.datetime.fromtimestamp(dataset.timestamp).strftime("%Y-%m-%d-%H%M%S") + '-' + \
466            ''.join(e if e.isalnum() else '_' for e in dataset.get_label()) + '-' + \
467            str(dataset.key)
468
469    @staticmethod
470    def count_local_files(directory):
471        """
472        Get number of files in directory
473        """
474        return len(os.listdir(directory))
class DmiServiceManagerException(builtins.Exception):
24class DmiServiceManagerException(Exception):
25    """
26    Raised when there is a problem with the configuration settings.
27    """
28    pass

Raised when there is a problem with the configuration settings.

class DsmOutOfMemory(DmiServiceManagerException):
30class DsmOutOfMemory(DmiServiceManagerException):
31    """
32    Raised when there is a problem with the configuration settings.
33    """
34    pass

Raised when there is a problem with the configuration settings.

class DsmConnectionError(DmiServiceManagerException):
37class DsmConnectionError(DmiServiceManagerException):
38    """
39    Raised when there is a problem with the configuration settings.
40    """
41    pass

Raised when there is a problem with the configuration settings.

class DmiServiceManager:
 43class DmiServiceManager:
 44    """
 45    Class to manage interactions with a DMI Service Manager server.
 46
 47    Found here:
 48    https://github.com/digitalmethodsinitiative/dmi_service_manager
 49    """
 50    def __init__(self, processor):
 51        """
 52        """
 53        self.processor = processor
 54        self.local_or_remote = processor.config.get("dmi-service-manager.ac_local_or_remote")
 55        self.server_address = processor.config.get("dmi-service-manager.ab_server_address").rstrip("/") + "/api/"
 56
 57        self.processed_files = 0
 58
 59        self.num_files_to_process = None
 60        self.server_file_collection_name = None
 61        self.server_results_folder_name = None
 62        self.path_to_files = None
 63        self.path_to_results = None
 64
 65    def check_gpu_memory_available(self, service_endpoint):
 66        """
 67        Returns tuple with True if server has some memory available and  False otherwise as well as the JSON response
 68        from server containing the memory information.
 69        """
 70        api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint
 71        resp = requests.get(api_endpoint, timeout=30)
 72        if resp.status_code == 200:
 73            return resp.json()
 74        elif resp.status_code == 503:
 75            # TODO: retry later (increase delay in dmi_service_manager class and interrupt w/ retry)? DSM could possibly manage jobs in queue
 76            # Processor could run CPU mode, but DSM needs to run different container (container fails if GPU enabled but not available)
 77            raise DsmOutOfMemory("DMI Service Manager server out of GPU memory.")
 78        else:
 79            try:
 80                reason = resp.json()['reason']
 81            except JSONDecodeError:
 82                reason = strip_tags(resp.text)
 83            raise DsmConnectionError(f"Connection Error {resp.status_code}: {reason}")
 84
 85    def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name):
 86        """
 87        Process files according to DMI Service Manager local or remote settings
 88        """
 89        self.num_files_to_process = len(filenames)
 90
 91        # Upload files if necessary
 92        if self.local_or_remote == "local":
 93            # Relative to PATH_DATA which should be where Docker mounts the container volume
 94            # TODO: path is just the staging_area name, but what if we move staging areas? DMI Service manager needs to know...
 95            path_to_files = input_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute())
 96            path_to_results = output_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute())
 97
 98        elif self.local_or_remote == "remote":
 99
100            # Upload files
101            self.server_file_collection_name = server_file_collection_name
102            self.server_results_folder_name = server_results_folder_name
103            path_to_files, path_to_results = self.send_files(server_file_collection_name, server_results_folder_name, filenames, input_file_dir)
104
105        else:
106            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
107
108        self.path_to_files = path_to_files
109        self.path_to_results = path_to_results
110        return path_to_files, path_to_results
111
112    def check_progress(self):
113        if self.local_or_remote == "local":
114            current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results))
115        elif self.local_or_remote == "remote":
116            existing_files = self.request_folder_files(self.server_file_collection_name)
117            current_completed = len(existing_files.get(self.server_results_folder_name, []))
118        else:
119            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
120
121        if current_completed != self.processed_files:
122            self.processor.dataset.update_status(
123                f"Processed {current_completed} of {self.num_files_to_process} files")
124            self.processor.dataset.update_progress(current_completed / self.num_files_to_process)
125            self.processed_files = current_completed
126
127    def check_service_exists(self):
128        """"
129        Check for services created with the current dataset key.
130
131        Returns None, if none found else the related service jobs and their details
132        """
133        # Check to see if service already created
134        resp = requests.get(self.server_address + "jobs/details_query/", json={"details_key": "$.request_json.4CAT_dataset_key", "details_value": self.processor.dataset.key}, timeout=30)
135        if resp.status_code == 200:
136            # Check if service is already running
137            if len(resp.json()["jobs"]) > 0:
138                return resp.json()["jobs"]
139            else:
140                return None
141
142    def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True, callback=None):
143        """
144        Send request and wait for results to be ready.
145
146        Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False.
147        If counts the number of files in the output folder and compares it to the number of input files.
148        """
149        if self.local_or_remote == "local":
150            service_endpoint += "_local"
151        elif self.local_or_remote == "remote":
152            service_endpoint += "_remote"
153        else:
154            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
155
156        existing_service = self.check_service_exists()
157        if existing_service:
158            if len(existing_service) > 1:
159                raise DmiServiceManagerException("Multiple services found with the same dataset key.")
160            else:
161                existing_service = existing_service[0]
162                if existing_service['status'] == 'complete':
163                    # Service already completed
164                    return True
165                elif existing_service['status'] == 'error':
166                    results = json.loads(existing_service['results'])
167                    raise DmiServiceManagerException(f"DMI Service Manager Error: {results['error']}")
168                else:
169                    # Service already running
170                    results_url = self.server_address + f"jobs/{existing_service.get('id')}"
171                    self.processor.dataset.update_status(f"Waiting for results from {service_endpoint}...")
172        else:
173            # Create a new service
174            # Append dataset key to data
175            data["4CAT_dataset_key"] = self.processor.dataset.key
176            api_endpoint = self.server_address + service_endpoint
177            try:
178                resp = requests.post(api_endpoint, json=data, timeout=30)
179            except requests.exceptions.ConnectionError as e :
180                raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}")
181
182            if resp.status_code == 202:
183                # New request successful
184                results_url = resp.json()['result_url']
185            else:
186                try:
187                    resp_json = resp.json()
188                    if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists":
189                        # Request already exists; get DMI SM database key
190                        raise DmiServiceManagerException(f"Request already exists; check that DMI SM is up to date")
191                    elif resp.status_code == 404:
192                        # Could be local vs remote not set correctly
193                        raise DsmConnectionError(f"404: {resp.url} not found; DMI Service Manager may not be set up for this service")
194                    else:
195                        raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
196                except JSONDecodeError:
197                    # Unexpected Error
198                    raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}")
199
200            # Wait for results to be ready
201            self.processor.dataset.update_status(f"Generating results for {service_endpoint}...")
202
203        check_time = 0
204        success = False
205        connection_error = 0
206        last_status = None
207        while True:
208            # If interrupted is called, attempt to finish dataset while server still running
209            if self.processor.interrupted:
210                self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...", is_final=True)
211                break
212
213            # Send request to check status every wait_period seconds
214            if (time.time() - check_time) > wait_period:
215                check_time = time.time()
216                if callback:
217                    callback(self)
218
219                try:
220                    result = requests.get(results_url, timeout=30)
221                except requests.exceptions.ConnectionError as e:
222                    # Have seen the Service Manager fail particularly when another processor is uploading many consecutive files
223                    connection_error += 1
224                    if connection_error > 3:
225                        raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}")
226                    continue
227
228                if result.status_code != 200 or (result.json and result.json().get('status') != "success"):
229                    # Unexpected response from DMI SM
230                    connection_error += 1
231                    if connection_error > 3:
232                        raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(result.status_code)}: {str(result.json()) if 'json' in result.headers.get('Content-Type', '') else str(result.text)}")
233                    continue
234                service_status = result.json()["job"]
235
236                # Update progress
237                if check_process:
238                    # Check for message
239                    status_message = service_status.get('message', '')
240                    current_completed = service_status.get('processed_records', False)
241                    if status_message:
242                        # Update status message if changed
243                        if last_status != status_message:
244                            last_status = service_status.get('message', '')
245                            self.processor.dataset.update_status(last_status)
246                        if current_completed and self.processed_files != int(current_completed):
247                            self.processor.dataset.update_progress(int(current_completed) / self.num_files_to_process)
248                            self.processed_files = int(current_completed)
249                    else:
250                        # This service does not provide status message; check progress via file count
251                        self.check_progress()
252
253                if service_status['status'] in ["created", "running", "pending"]:
254                    # Still running
255                    continue
256                elif service_status['status'] in ["complete", "error"]:
257                    results = json.loads(service_status['results'])
258                    if not results:
259                        # This should not be the case if the service was written well (unless the DMI SM crashed?)
260                        #TODO test if timing issue?
261                        connection_error += 1
262                        if connection_error > 3:
263                            raise DmiServiceManagerException(f"Unable to read DMI SM results: {service_status}")
264                    if int(results['returncode']) == 0:
265                        # Complete without error
266                        self.processor.dataset.update_status(f"Completed {service_endpoint}!")
267                        success = True
268                        break
269                    else:
270                        error = results['error']
271                        if "CUDA error: out of memory" in error:
272                            raise DsmOutOfMemory("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.")
273                        else:
274                            raise DmiServiceManagerException(f"Error {service_endpoint}: " + error)
275                else:
276                    # Something botched
277                    raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
278            else:
279                time.sleep(1)
280
281        return success
282
283    def process_results(self, local_output_dir):
284        if self.local_or_remote == "local":
285            # Output files are already in local directory
286            pass
287        elif self.local_or_remote == "remote":
288            results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name)
289            self.processor.dataset.log(f"Downloading results from {results_path}...")
290            # Collect result filenames from server
291            result_files = self.request_folder_files(results_path)
292            for path, files in result_files.items():
293                if path == '.':
294                    self.download_results(files, results_path, local_output_dir)
295                else:
296                    Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True)
297                    self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path))
298
299    def request_folder_files(self, folder_name):
300        """
301        Request files from a folder on the DMI Service Manager server.
302        """
303        filename_url = f"{self.server_address}list_filenames/{folder_name}"
304        retries = 0
305        while True:
306            try:
307                filename_response = requests.get(filename_url, timeout=30)
308                break
309            except requests.exceptions.ConnectionError as e:
310                retries += 1
311                if retries > 3:
312                    raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading files from: {folder_name}")
313                continue
314
315        # Check if 4CAT has access to this server
316        if filename_response.status_code == 403:
317            raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server")
318        elif filename_response.status_code in [400, 405]:
319            raise DsmConnectionError(f"400: DMI Service Manager server {filename_response.json()['reason']}")
320        elif filename_response.status_code == 404:
321            # Folder not found; no files
322            return {}
323        elif filename_response.status_code != 200:
324            raise DmiServiceManagerException(f"Unknown response from DMI Service Manager: {filename_response.status_code} - {filename_response.reason}")
325        return filename_response.json()
326
327    def send_files(self, file_collection_name, results_name, files_to_upload, dir_with_files):
328        """
329        Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management.
330        The path on the server to both the files and results will be returned.
331
332        A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be
333        stored there. A unique results folder will be created under the results_name folder so that multiple results
334        can be created based on a file collection if needed (without needing to re-upload files).
335
336        :param str file_collection_name:    Name of collection; files will be uploaded to 'files' subfolder
337        :param str results_name:            Name of results subfolder where output will be stored (and can be downloaded)
338        :param list files_to_upload:        List of filenames to upload
339        :param Path dir_with_files:         Local Path to files
340        :param Dataset dataset:             Dataset object for status updates and unique key
341        :return Path, Path:                 Server's Path to files, Server's Path to results
342        """
343        data = {'folder_name': file_collection_name}
344
345        # Check if files have already been sent
346        self.processor.dataset.update_status("Connecting to DMI Service Manager...")
347        existing_files = self.request_folder_files(file_collection_name)
348        uploaded_files = existing_files.get('4cat_uploads', [])
349        if len(uploaded_files) > 0:
350            self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files)))
351
352        # Compare files with previously uploaded
353        to_upload_filenames = [filename for filename in files_to_upload if filename not in uploaded_files]
354        total_files_to_upload = len(to_upload_filenames)
355
356        # Check if results folder exists
357        empty_placeholder = None
358        if results_name not in existing_files:
359            total_files_to_upload += 1
360            # Create a blank file to upload into results folder
361            empty_placeholder = f"4CAT_{results_name}_blank.txt"
362            with open(dir_with_files.joinpath(empty_placeholder), 'w') as file:
363                file.write('')
364            to_upload_filenames = [empty_placeholder] + to_upload_filenames
365
366        if total_files_to_upload > 0:
367            api_upload_endpoint = f"{self.server_address}send_files"
368
369            self.processor.dataset.update_status(f"Uploading {total_files_to_upload} files")
370            files_uploaded = 0
371            while to_upload_filenames:
372                upload_file = to_upload_filenames.pop()
373                self.processor.dataset.log(f"Uploading {upload_file}")
374                # Upload files
375                if upload_file == empty_placeholder:
376                    # Upload a blank results file to results folder
377                    response = requests.post(api_upload_endpoint,
378                                             files=[(results_name, open(dir_with_files.joinpath(upload_file), 'rb'))],
379                                             data=data, timeout=120)
380                else:
381                    # All other files uploading to general upload folder belonging to parent dataset collection
382                    response = requests.post(api_upload_endpoint,
383                                             files=[('4cat_uploads', open(dir_with_files.joinpath(upload_file), 'rb'))],
384                                             data=data, timeout=120)
385
386                if response.status_code == 200:
387                    files_uploaded += 1
388                    if files_uploaded % 1000 == 0:
389                        self.processor.dataset.update_status(f"Uploaded {files_uploaded} of {total_files_to_upload} files!")
390                    self.processor.dataset.update_progress(files_uploaded / total_files_to_upload)
391                elif response.status_code == 403:
392                    raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server")
393                elif response.status_code == 405:
394                    raise DsmConnectionError("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)")
395                else:
396                    self.processor.dataset.log(f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file}")
397
398                try:
399                    response_json = response.json()
400                except JSONDecodeError:
401                    response_json = None
402                if response_json and "errors" in response.json():
403                    self.processor.dataset.log(
404                        f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file} - {response.json()['errors']}")
405
406            self.processor.dataset.update_status(f"Uploaded {files_uploaded} files!")
407
408        server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads")
409        server_path_to_results = Path(file_collection_name).joinpath(results_name)
410
411        return server_path_to_files, server_path_to_results
412
413    def download_results(self, filenames_to_download, folder_name, local_output_dir, timeout=30):
414        """
415        Download results from the DMI Service Manager server.
416
417        :param list filenames_to_download:  List of filenames to download
418        :param str folder_name:             Name of subfolder where files are localed (e.g. "results_name" or "files")
419        :param Path local_output_dir:       Local Path to download files to
420        :param int timeout:                 Number of seconds to wait for a response from the server
421        """
422        # Download the result files
423        api_upload_endpoint = f"{self.server_address}download/"
424        total_files_to_download = len(filenames_to_download)
425        files_downloaded = 0
426        self.processor.dataset.update_status(f"Downloading {total_files_to_download} files from {folder_name}...")
427        for filename in filenames_to_download:
428            retries = 0
429            while True:
430                try:
431                    file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=timeout)
432                    break
433                except requests.exceptions.ConnectionError as e:
434                    retries += 1
435                    if retries > 3:
436                        raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading file: {folder_name}/{filename}")
437                    continue
438            files_downloaded += 1
439            if files_downloaded % 1000 == 0:
440                self.processor.dataset.update_status(f"Downloaded {files_downloaded} of {total_files_to_download} files")
441            self.processor.dataset.update_progress(files_downloaded / total_files_to_download)
442
443            with open(local_output_dir.joinpath(filename), 'wb') as file:
444                file.write(file_response.content)
445
446    def sanitize_filenames(self, filename):
447        """
448        If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to
449        ensure our filenames match what the server expects.
450        """
451        if self.local_or_remote == "local":
452            return filename
453        elif self.local_or_remote == "remote":
454            return secure_filename(filename)
455        else:
456            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
457
458    @staticmethod
459    def get_folder_name(dataset):
460        """
461        Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the
462        parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times).
463
464        This is only relevant for remote mode based on file management.
465        """
466        return datetime.datetime.fromtimestamp(dataset.timestamp).strftime("%Y-%m-%d-%H%M%S") + '-' + \
467            ''.join(e if e.isalnum() else '_' for e in dataset.get_label()) + '-' + \
468            str(dataset.key)
469
470    @staticmethod
471    def count_local_files(directory):
472        """
473        Get number of files in directory
474        """
475        return len(os.listdir(directory))

Class to manage interactions with a DMI Service Manager server.

Found here: https://github.com/digitalmethodsinitiative/dmi_service_manager

DmiServiceManager(processor)
50    def __init__(self, processor):
51        """
52        """
53        self.processor = processor
54        self.local_or_remote = processor.config.get("dmi-service-manager.ac_local_or_remote")
55        self.server_address = processor.config.get("dmi-service-manager.ab_server_address").rstrip("/") + "/api/"
56
57        self.processed_files = 0
58
59        self.num_files_to_process = None
60        self.server_file_collection_name = None
61        self.server_results_folder_name = None
62        self.path_to_files = None
63        self.path_to_results = None
processor
local_or_remote
server_address
processed_files
num_files_to_process
server_file_collection_name
server_results_folder_name
path_to_files
path_to_results
def check_gpu_memory_available(self, service_endpoint):
65    def check_gpu_memory_available(self, service_endpoint):
66        """
67        Returns tuple with True if server has some memory available and  False otherwise as well as the JSON response
68        from server containing the memory information.
69        """
70        api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint
71        resp = requests.get(api_endpoint, timeout=30)
72        if resp.status_code == 200:
73            return resp.json()
74        elif resp.status_code == 503:
75            # TODO: retry later (increase delay in dmi_service_manager class and interrupt w/ retry)? DSM could possibly manage jobs in queue
76            # Processor could run CPU mode, but DSM needs to run different container (container fails if GPU enabled but not available)
77            raise DsmOutOfMemory("DMI Service Manager server out of GPU memory.")
78        else:
79            try:
80                reason = resp.json()['reason']
81            except JSONDecodeError:
82                reason = strip_tags(resp.text)
83            raise DsmConnectionError(f"Connection Error {resp.status_code}: {reason}")

Returns tuple with True if server has some memory available and False otherwise as well as the JSON response from server containing the memory information.

def process_files( self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name):
 85    def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name):
 86        """
 87        Process files according to DMI Service Manager local or remote settings
 88        """
 89        self.num_files_to_process = len(filenames)
 90
 91        # Upload files if necessary
 92        if self.local_or_remote == "local":
 93            # Relative to PATH_DATA which should be where Docker mounts the container volume
 94            # TODO: path is just the staging_area name, but what if we move staging areas? DMI Service manager needs to know...
 95            path_to_files = input_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute())
 96            path_to_results = output_file_dir.absolute().relative_to(self.processor.config.get("PATH_DATA").absolute())
 97
 98        elif self.local_or_remote == "remote":
 99
100            # Upload files
101            self.server_file_collection_name = server_file_collection_name
102            self.server_results_folder_name = server_results_folder_name
103            path_to_files, path_to_results = self.send_files(server_file_collection_name, server_results_folder_name, filenames, input_file_dir)
104
105        else:
106            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
107
108        self.path_to_files = path_to_files
109        self.path_to_results = path_to_results
110        return path_to_files, path_to_results

Process files according to DMI Service Manager local or remote settings

def check_progress(self):
112    def check_progress(self):
113        if self.local_or_remote == "local":
114            current_completed = self.count_local_files(self.processor.config.get("PATH_DATA").joinpath(self.path_to_results))
115        elif self.local_or_remote == "remote":
116            existing_files = self.request_folder_files(self.server_file_collection_name)
117            current_completed = len(existing_files.get(self.server_results_folder_name, []))
118        else:
119            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
120
121        if current_completed != self.processed_files:
122            self.processor.dataset.update_status(
123                f"Processed {current_completed} of {self.num_files_to_process} files")
124            self.processor.dataset.update_progress(current_completed / self.num_files_to_process)
125            self.processed_files = current_completed
def check_service_exists(self):
127    def check_service_exists(self):
128        """"
129        Check for services created with the current dataset key.
130
131        Returns None, if none found else the related service jobs and their details
132        """
133        # Check to see if service already created
134        resp = requests.get(self.server_address + "jobs/details_query/", json={"details_key": "$.request_json.4CAT_dataset_key", "details_value": self.processor.dataset.key}, timeout=30)
135        if resp.status_code == 200:
136            # Check if service is already running
137            if len(resp.json()["jobs"]) > 0:
138                return resp.json()["jobs"]
139            else:
140                return None

" Check for services created with the current dataset key.

Returns None, if none found else the related service jobs and their details

def send_request_and_wait_for_results( self, service_endpoint, data, wait_period=60, check_process=True, callback=None):
142    def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True, callback=None):
143        """
144        Send request and wait for results to be ready.
145
146        Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False.
147        If counts the number of files in the output folder and compares it to the number of input files.
148        """
149        if self.local_or_remote == "local":
150            service_endpoint += "_local"
151        elif self.local_or_remote == "remote":
152            service_endpoint += "_remote"
153        else:
154            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")
155
156        existing_service = self.check_service_exists()
157        if existing_service:
158            if len(existing_service) > 1:
159                raise DmiServiceManagerException("Multiple services found with the same dataset key.")
160            else:
161                existing_service = existing_service[0]
162                if existing_service['status'] == 'complete':
163                    # Service already completed
164                    return True
165                elif existing_service['status'] == 'error':
166                    results = json.loads(existing_service['results'])
167                    raise DmiServiceManagerException(f"DMI Service Manager Error: {results['error']}")
168                else:
169                    # Service already running
170                    results_url = self.server_address + f"jobs/{existing_service.get('id')}"
171                    self.processor.dataset.update_status(f"Waiting for results from {service_endpoint}...")
172        else:
173            # Create a new service
174            # Append dataset key to data
175            data["4CAT_dataset_key"] = self.processor.dataset.key
176            api_endpoint = self.server_address + service_endpoint
177            try:
178                resp = requests.post(api_endpoint, json=data, timeout=30)
179            except requests.exceptions.ConnectionError as e :
180                raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}")
181
182            if resp.status_code == 202:
183                # New request successful
184                results_url = resp.json()['result_url']
185            else:
186                try:
187                    resp_json = resp.json()
188                    if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists":
189                        # Request already exists; get DMI SM database key
190                        raise DmiServiceManagerException(f"Request already exists; check that DMI SM is up to date")
191                    elif resp.status_code == 404:
192                        # Could be local vs remote not set correctly
193                        raise DsmConnectionError(f"404: {resp.url} not found; DMI Service Manager may not be set up for this service")
194                    else:
195                        raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
196                except JSONDecodeError:
197                    # Unexpected Error
198                    raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}")
199
200            # Wait for results to be ready
201            self.processor.dataset.update_status(f"Generating results for {service_endpoint}...")
202
203        check_time = 0
204        success = False
205        connection_error = 0
206        last_status = None
207        while True:
208            # If interrupted is called, attempt to finish dataset while server still running
209            if self.processor.interrupted:
210                self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...", is_final=True)
211                break
212
213            # Send request to check status every wait_period seconds
214            if (time.time() - check_time) > wait_period:
215                check_time = time.time()
216                if callback:
217                    callback(self)
218
219                try:
220                    result = requests.get(results_url, timeout=30)
221                except requests.exceptions.ConnectionError as e:
222                    # Have seen the Service Manager fail particularly when another processor is uploading many consecutive files
223                    connection_error += 1
224                    if connection_error > 3:
225                        raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(e)}")
226                    continue
227
228                if result.status_code != 200 or (result.json and result.json().get('status') != "success"):
229                    # Unexpected response from DMI SM
230                    connection_error += 1
231                    if connection_error > 3:
232                        raise DsmConnectionError(f"Unable to connect to DMI Service Manager server: {str(result.status_code)}: {str(result.json()) if 'json' in result.headers.get('Content-Type', '') else str(result.text)}")
233                    continue
234                service_status = result.json()["job"]
235
236                # Update progress
237                if check_process:
238                    # Check for message
239                    status_message = service_status.get('message', '')
240                    current_completed = service_status.get('processed_records', False)
241                    if status_message:
242                        # Update status message if changed
243                        if last_status != status_message:
244                            last_status = service_status.get('message', '')
245                            self.processor.dataset.update_status(last_status)
246                        if current_completed and self.processed_files != int(current_completed):
247                            self.processor.dataset.update_progress(int(current_completed) / self.num_files_to_process)
248                            self.processed_files = int(current_completed)
249                    else:
250                        # This service does not provide status message; check progress via file count
251                        self.check_progress()
252
253                if service_status['status'] in ["created", "running", "pending"]:
254                    # Still running
255                    continue
256                elif service_status['status'] in ["complete", "error"]:
257                    results = json.loads(service_status['results'])
258                    if not results:
259                        # This should not be the case if the service was written well (unless the DMI SM crashed?)
260                        #TODO test if timing issue?
261                        connection_error += 1
262                        if connection_error > 3:
263                            raise DmiServiceManagerException(f"Unable to read DMI SM results: {service_status}")
264                    if int(results['returncode']) == 0:
265                        # Complete without error
266                        self.processor.dataset.update_status(f"Completed {service_endpoint}!")
267                        success = True
268                        break
269                    else:
270                        error = results['error']
271                        if "CUDA error: out of memory" in error:
272                            raise DsmOutOfMemory("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.")
273                        else:
274                            raise DmiServiceManagerException(f"Error {service_endpoint}: " + error)
275                else:
276                    # Something botched
277                    raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
278            else:
279                time.sleep(1)
280
281        return success

Send request and wait for results to be ready.

Check process assumes a one to one ratio of input files to output files. If this is not the case, set to False. If counts the number of files in the output folder and compares it to the number of input files.

def process_results(self, local_output_dir):
283    def process_results(self, local_output_dir):
284        if self.local_or_remote == "local":
285            # Output files are already in local directory
286            pass
287        elif self.local_or_remote == "remote":
288            results_path = os.path.join(self.server_file_collection_name, self.server_results_folder_name)
289            self.processor.dataset.log(f"Downloading results from {results_path}...")
290            # Collect result filenames from server
291            result_files = self.request_folder_files(results_path)
292            for path, files in result_files.items():
293                if path == '.':
294                    self.download_results(files, results_path, local_output_dir)
295                else:
296                    Path(os.path.join(local_output_dir, path)).mkdir(exist_ok=True, parents=True)
297                    self.download_results(files, os.path.join(results_path, path), local_output_dir.joinpath(path))
def request_folder_files(self, folder_name):
299    def request_folder_files(self, folder_name):
300        """
301        Request files from a folder on the DMI Service Manager server.
302        """
303        filename_url = f"{self.server_address}list_filenames/{folder_name}"
304        retries = 0
305        while True:
306            try:
307                filename_response = requests.get(filename_url, timeout=30)
308                break
309            except requests.exceptions.ConnectionError as e:
310                retries += 1
311                if retries > 3:
312                    raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading files from: {folder_name}")
313                continue
314
315        # Check if 4CAT has access to this server
316        if filename_response.status_code == 403:
317            raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server")
318        elif filename_response.status_code in [400, 405]:
319            raise DsmConnectionError(f"400: DMI Service Manager server {filename_response.json()['reason']}")
320        elif filename_response.status_code == 404:
321            # Folder not found; no files
322            return {}
323        elif filename_response.status_code != 200:
324            raise DmiServiceManagerException(f"Unknown response from DMI Service Manager: {filename_response.status_code} - {filename_response.reason}")
325        return filename_response.json()

Request files from a folder on the DMI Service Manager server.

def send_files( self, file_collection_name, results_name, files_to_upload, dir_with_files):
327    def send_files(self, file_collection_name, results_name, files_to_upload, dir_with_files):
328        """
329        Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management.
330        The path on the server to both the files and results will be returned.
331
332        A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be
333        stored there. A unique results folder will be created under the results_name folder so that multiple results
334        can be created based on a file collection if needed (without needing to re-upload files).
335
336        :param str file_collection_name:    Name of collection; files will be uploaded to 'files' subfolder
337        :param str results_name:            Name of results subfolder where output will be stored (and can be downloaded)
338        :param list files_to_upload:        List of filenames to upload
339        :param Path dir_with_files:         Local Path to files
340        :param Dataset dataset:             Dataset object for status updates and unique key
341        :return Path, Path:                 Server's Path to files, Server's Path to results
342        """
343        data = {'folder_name': file_collection_name}
344
345        # Check if files have already been sent
346        self.processor.dataset.update_status("Connecting to DMI Service Manager...")
347        existing_files = self.request_folder_files(file_collection_name)
348        uploaded_files = existing_files.get('4cat_uploads', [])
349        if len(uploaded_files) > 0:
350            self.processor.dataset.update_status("Found %i files previously uploaded" % (len(uploaded_files)))
351
352        # Compare files with previously uploaded
353        to_upload_filenames = [filename for filename in files_to_upload if filename not in uploaded_files]
354        total_files_to_upload = len(to_upload_filenames)
355
356        # Check if results folder exists
357        empty_placeholder = None
358        if results_name not in existing_files:
359            total_files_to_upload += 1
360            # Create a blank file to upload into results folder
361            empty_placeholder = f"4CAT_{results_name}_blank.txt"
362            with open(dir_with_files.joinpath(empty_placeholder), 'w') as file:
363                file.write('')
364            to_upload_filenames = [empty_placeholder] + to_upload_filenames
365
366        if total_files_to_upload > 0:
367            api_upload_endpoint = f"{self.server_address}send_files"
368
369            self.processor.dataset.update_status(f"Uploading {total_files_to_upload} files")
370            files_uploaded = 0
371            while to_upload_filenames:
372                upload_file = to_upload_filenames.pop()
373                self.processor.dataset.log(f"Uploading {upload_file}")
374                # Upload files
375                if upload_file == empty_placeholder:
376                    # Upload a blank results file to results folder
377                    response = requests.post(api_upload_endpoint,
378                                             files=[(results_name, open(dir_with_files.joinpath(upload_file), 'rb'))],
379                                             data=data, timeout=120)
380                else:
381                    # All other files uploading to general upload folder belonging to parent dataset collection
382                    response = requests.post(api_upload_endpoint,
383                                             files=[('4cat_uploads', open(dir_with_files.joinpath(upload_file), 'rb'))],
384                                             data=data, timeout=120)
385
386                if response.status_code == 200:
387                    files_uploaded += 1
388                    if files_uploaded % 1000 == 0:
389                        self.processor.dataset.update_status(f"Uploaded {files_uploaded} of {total_files_to_upload} files!")
390                    self.processor.dataset.update_progress(files_uploaded / total_files_to_upload)
391                elif response.status_code == 403:
392                    raise DsmConnectionError("403: 4CAT does not have permission to use the DMI Service Manager server")
393                elif response.status_code == 405:
394                    raise DsmConnectionError("405: Method not allowed; check DMI Service Manager server address (perhaps http is being used instead of https)")
395                else:
396                    self.processor.dataset.log(f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file}")
397
398                try:
399                    response_json = response.json()
400                except JSONDecodeError:
401                    response_json = None
402                if response_json and "errors" in response.json():
403                    self.processor.dataset.log(
404                        f"Unable to upload file ({response.status_code} - {response.reason}): {upload_file} - {response.json()['errors']}")
405
406            self.processor.dataset.update_status(f"Uploaded {files_uploaded} files!")
407
408        server_path_to_files = Path(file_collection_name).joinpath("4cat_uploads")
409        server_path_to_results = Path(file_collection_name).joinpath(results_name)
410
411        return server_path_to_files, server_path_to_results

Send files to the DMI Service Manager server. This is only relevant for remote mode based on file management. The path on the server to both the files and results will be returned.

A "files" folder will be created in the under the file_collection_name folder. The files_to_upload will be be stored there. A unique results folder will be created under the results_name folder so that multiple results can be created based on a file collection if needed (without needing to re-upload files).

Parameters
  • str file_collection_name: Name of collection; files will be uploaded to 'files' subfolder
  • str results_name: Name of results subfolder where output will be stored (and can be downloaded)
  • list files_to_upload: List of filenames to upload
  • Path dir_with_files: Local Path to files
  • Dataset dataset: Dataset object for status updates and unique key
Returns
             Server's Path to files, Server's Path to results
def download_results( self, filenames_to_download, folder_name, local_output_dir, timeout=30):
413    def download_results(self, filenames_to_download, folder_name, local_output_dir, timeout=30):
414        """
415        Download results from the DMI Service Manager server.
416
417        :param list filenames_to_download:  List of filenames to download
418        :param str folder_name:             Name of subfolder where files are localed (e.g. "results_name" or "files")
419        :param Path local_output_dir:       Local Path to download files to
420        :param int timeout:                 Number of seconds to wait for a response from the server
421        """
422        # Download the result files
423        api_upload_endpoint = f"{self.server_address}download/"
424        total_files_to_download = len(filenames_to_download)
425        files_downloaded = 0
426        self.processor.dataset.update_status(f"Downloading {total_files_to_download} files from {folder_name}...")
427        for filename in filenames_to_download:
428            retries = 0
429            while True:
430                try:
431                    file_response = requests.get(api_upload_endpoint + f"{folder_name}/{filename}", timeout=timeout)
432                    break
433                except requests.exceptions.ConnectionError as e:
434                    retries += 1
435                    if retries > 3:
436                        raise DsmConnectionError(f"Connection Error {e} (retries {retries}) while downloading file: {folder_name}/{filename}")
437                    continue
438            files_downloaded += 1
439            if files_downloaded % 1000 == 0:
440                self.processor.dataset.update_status(f"Downloaded {files_downloaded} of {total_files_to_download} files")
441            self.processor.dataset.update_progress(files_downloaded / total_files_to_download)
442
443            with open(local_output_dir.joinpath(filename), 'wb') as file:
444                file.write(file_response.content)

Download results from the DMI Service Manager server.

Parameters
  • list filenames_to_download: List of filenames to download
  • str folder_name: Name of subfolder where files are localed (e.g. "results_name" or "files")
  • Path local_output_dir: Local Path to download files to
  • int timeout: Number of seconds to wait for a response from the server
def sanitize_filenames(self, filename):
446    def sanitize_filenames(self, filename):
447        """
448        If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to
449        ensure our filenames match what the server expects.
450        """
451        if self.local_or_remote == "local":
452            return filename
453        elif self.local_or_remote == "remote":
454            return secure_filename(filename)
455        else:
456            raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")

If source is local, no sanitization needed. If source is remote, the server sanitizes and as such, we need to ensure our filenames match what the server expects.

@staticmethod
def get_folder_name(dataset):
458    @staticmethod
459    def get_folder_name(dataset):
460        """
461        Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the
462        parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times).
463
464        This is only relevant for remote mode based on file management.
465        """
466        return datetime.datetime.fromtimestamp(dataset.timestamp).strftime("%Y-%m-%d-%H%M%S") + '-' + \
467            ''.join(e if e.isalnum() else '_' for e in dataset.get_label()) + '-' + \
468            str(dataset.key)

Creates a unique folder name based on a dataset and timestamp. In some instances it may make sense to use the parent dataset in order to group files (e.g., in order to ensure files are not uploaded multiple times).

This is only relevant for remote mode based on file management.

@staticmethod
def count_local_files(directory):
470    @staticmethod
471    def count_local_files(directory):
472        """
473        Get number of files in directory
474        """
475        return len(os.listdir(directory))

Get number of files in directory