Edit on GitHub

datasources.fourcat_import.import_4cat

Import datasets from other 4CATs

  1"""
  2Import datasets from other 4CATs
  3"""
  4import requests
  5import json
  6import time
  7import zipfile
  8from pathlib import Path
  9from psycopg2.errors import InFailedSqlTransaction
 10
 11from backend.lib.processor import BasicProcessor
 12from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException,
 13                                   DataSetException)
 14from common.lib.helpers import UserInput, get_software_version
 15from common.lib.dataset import DataSet
 16
 17
 18class FourcatImportException(FourcatException):
 19    pass
 20
 21
 22class SearchImportFromFourcat(BasicProcessor):
 23    type = "import_4cat-search"  # job ID
 24    category = "Search"  # category
 25    title = "Import 4CAT dataset and analyses"  # title displayed in UI
 26    description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)"  # description displayed in UI
 27    is_local = False  # Whether this datasource is locally scraped
 28    is_static = False  # Whether this datasource is still updated
 29
 30    max_workers = 1  # this cannot be more than 1, else things get VERY messy
 31
 32    options = {
 33        "intro": {
 34            "type": UserInput.OPTION_INFO,
 35            "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. "
 36                    "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. "
 37                    "You can find the current version in the footer at the bottom of the interface."
 38        },
 39        "method": {
 40            "type": UserInput.OPTION_CHOICE,
 41            "help": "Import Type",
 42            "options": {
 43                "zip": "Zip File",
 44                "url": "4CAT URL",
 45            },
 46            "default": "url"
 47        },
 48        "url": {
 49            "type": UserInput.OPTION_TEXT,
 50            "help": "Dataset URL",
 51            "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.",
 52            "requires": "method^=url"
 53        },
 54        "intro2": {
 55            "type": UserInput.OPTION_INFO,
 56            "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need "
 57                    "an API key from **the server you are importing from**, not the one you are looking at right now. "
 58                    "Additionally, you need to have owner access to the dataset you want to import.",
 59            "requires": "method^=url"
 60        },
 61        "api-key": {
 62            "type": UserInput.OPTION_TEXT,
 63            "help": "4CAT API Key",
 64            "sensitive": True,
 65            "cache": True,
 66            "requires": "method^=url"
 67        },
 68        "data_upload": {
 69            "type": UserInput.OPTION_FILE,
 70            "help": "File",
 71            "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.",
 72            "requires": "method^=zip"
 73        },
 74
 75    }
 76
 77    created_datasets = None
 78    base = None
 79    remapped_keys = None
 80    dataset_owner = None
 81
 82    def process(self):
 83        """
 84        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 85        """
 86        self.created_datasets = set()  # keys of created datasets - may not be successful!
 87        self.remapped_keys = {}  # changed dataset keys
 88        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 89        try:
 90            if self.parameters.get("method") == "zip":
 91                self.process_zip()
 92            else:
 93                self.process_urls()
 94        except Exception as e:
 95            # Catch all exceptions and finish the job with an error
 96            # Resuming is impossible because this dataset was overwritten with the importing dataset
 97            # halt_and_catch_fire() will clean up and delete the datasets that were created
 98            self.interrupted = True
 99            try:
100                self.halt_and_catch_fire()
101            except ProcessorInterruptedException:
102                pass
103            except InFailedSqlTransaction:
104                # Catch database issue and retry
105                self.db.rollback()
106                try:
107                    self.halt_and_catch_fire()
108                except ProcessorInterruptedException:
109                    pass
110            # Reraise the original exception for logging
111            raise e
112
113    def after_create(query, dataset, request):
114        """
115        Hook to execute after the dataset for this source has been created
116
117        In this case, put the file in a temporary location so it can be
118        processed properly by the related Job later.
119
120        :param dict query:  Sanitised query parameters
121        :param DataSet dataset:  Dataset created for this query
122        :param request:  Flask request submitted for its creation
123        """
124        if query.get("method") == "zip":
125            file = request.files["option-data_upload"]
126            file.seek(0)
127            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
128                while True:
129                    chunk = file.read(1024)
130                    if len(chunk) == 0:
131                        break
132                    outfile.write(chunk)
133        else:
134            # nothing to do for URLs
135            pass
136
137
138    def process_zip(self):
139        """
140        Import 4CAT dataset from a ZIP file
141        """
142        self.dataset.update_status("Importing datasets and analyses from ZIP file.")
143        temp_file = self.dataset.get_results_path().with_suffix(".importing")
144
145        imported = []
146        processed_files = 1 # take into account the export.log file
147        failed_imports = []
148        primary_dataset_original_log = None
149        with zipfile.ZipFile(temp_file, "r") as zip_ref:
150            zip_contents = zip_ref.namelist()
151
152            # Get all metadata files and determine primary dataset
153            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
154            if not metadata_files:
155                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
156                return
157
158            # Get the primary dataset
159            primary_dataset_keys = set()
160            datasets = []
161            parent_child_mapping = {}
162            for file in metadata_files:
163                with zip_ref.open(file) as f:
164                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
165                    metadata = json.loads(content)
166                    if not metadata.get("key_parent"):
167                        primary_dataset_keys.add(metadata.get("key"))
168                        datasets.append(metadata)
169                    else:
170                        # Store the mapping of parent to child datasets
171                        parent_key = metadata.get("key_parent")
172                        if parent_key not in parent_child_mapping:
173                            parent_child_mapping[parent_key] = []
174                        parent_child_mapping[parent_key].append(metadata)
175
176            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
177            if len(primary_dataset_keys) != 1:
178                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
179                return
180
181            # Import datasets
182            while datasets:
183                self.halt_and_catch_fire()
184
185                # Create the datasets
186                metadata = datasets.pop(0)
187                dataset_key = metadata.get("key")
188                processed_metadata = self.process_metadata(metadata)
189                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
190                processed_files += 1
191
192                # Copy the log file
193                self.halt_and_catch_fire()
194                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
195                if log_filename in zip_contents:
196                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
197                    with zip_ref.open(log_filename) as f:
198                        content = f.read().decode('utf-8')
199                        if new_dataset.key == self.dataset.key:
200                            # Hold the original log for the primary dataset and add at the end
201                            primary_dataset_original_log = content
202                        else:
203                            new_dataset.log("Original dataset log included below:")
204                            with new_dataset.get_log_path().open("a") as outfile:
205                                outfile.write(content)
206                    processed_files += 1
207                else:
208                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
209
210                # Copy the results
211                self.halt_and_catch_fire()
212                results_filename = metadata["result_file"]
213                if results_filename in zip_contents:
214                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
215                    with zip_ref.open(results_filename) as f:
216                        with new_dataset.get_results_path().open("wb") as outfile:
217                            outfile.write(f.read())
218                    processed_files += 1
219
220                    if not imported:
221                        # first dataset - use num rows as 'overall'
222                        num_rows = metadata["num_rows"]
223                else:
224                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
225                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
226                    failed_imports.append(dataset_key)
227                    continue
228
229                # finally, the kids
230                self.halt_and_catch_fire()
231                if dataset_key in parent_child_mapping:
232                    datasets.extend(parent_child_mapping[dataset_key])
233                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
234
235                # done - remember that we've imported this one
236                imported.append(new_dataset)
237                new_dataset.update_status(metadata["status"])
238
239                if new_dataset.key != self.dataset.key:
240                    # only finish if this is not the 'main' dataset, or the user
241                    # will think the whole import is done
242                    new_dataset.finish(metadata["num_rows"])
243
244            # Check that all files were processed
245            missed_files = []
246            if len(zip_contents) != processed_files:
247                for file in zip_contents:
248                    if file not in processed_files:
249                        missed_files.append(file)
250
251            # todo: this part needs updating if/when we support importing multiple datasets!
252            if failed_imports:
253                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
254                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
255                                           f"dataset log file for details.", is_final=True)
256            elif missed_files:
257                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
258                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
259                                           f"{len(missed_files)} files were not successfully imported. Check the "
260                                           f"dataset log file for details.", is_final=True)
261            else:
262                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
263                                           is_final=True)
264
265            if not self.dataset.is_finished():
266                # now all related datasets are imported, we can finish the 'main'
267                # dataset, and the user will be alerted that the full import is
268                # complete
269                self.dataset.finish(num_rows)
270
271            # Add the original log for the primary dataset
272            if primary_dataset_original_log:
273                self.dataset.log("Original dataset log included below:\n")
274                with self.dataset.get_log_path().open("a") as outfile:
275                    outfile.write(primary_dataset_original_log)
276
277
278    @staticmethod
279    def process_metadata(metadata):
280        """
281        Process metadata for import
282        """
283        # get rid of some keys that are server-specific and don't need to
284        # be stored (or don't correspond to database columns)
285        metadata.pop("current_4CAT_version")
286        metadata.pop("id")
287        metadata.pop("job")
288        metadata.pop("is_private")
289        metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
290
291        # extra params are stored as JSON...
292        metadata["parameters"] = json.loads(metadata["parameters"])
293        if "copied_from" in metadata["parameters"]:
294            metadata["parameters"].pop("copied_from")
295        metadata["parameters"] = json.dumps(metadata["parameters"])
296
297        return metadata
298
299    def create_dataset(self, metadata, original_key, primary=False):
300        """
301        Create a new dataset
302        """
303        if primary:
304            self.dataset.update_status(f"Importing primary dataset {original_key}.")
305            # if this is the first dataset we're importing, make it the
306            # processor's "own" dataset. the key has already been set to
307            # the imported dataset's key via ensure_key() (or a new unqiue
308            # key if it already existed on this server)
309            # by making it the "own" dataset, the user initiating the
310            # import will see the imported dataset as the "result" of their
311            # import query in the interface, similar to the workflow for
312            # other data sources
313            new_dataset = self.dataset
314
315            # Update metadata and file
316            metadata.pop("key")  # key already OK (see above)
317            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
318
319        else:
320            self.dataset.update_status(f"Importing child dataset {original_key}.")
321            # supernumerary datasets - handle on their own
322            # these include any children of imported datasets
323            try:
324                DataSet(key=metadata["key"], db=self.db, modules=self.modules)
325
326                # if we *haven't* thrown a DatasetException now, then the
327                # key is already in use, so create a "dummy" dataset and
328                # overwrite it with the metadata we have (except for the
329                # key). this ensures that a new unique key will be
330                # generated.
331                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
332                metadata.pop("key")
333                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
334
335            except DataSetException:
336                # this is *good* since it means the key doesn't exist, so
337                # we can re-use the key of the imported dataset
338                self.db.insert("datasets", data=metadata)
339                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
340
341        if new_dataset.key != original_key:
342            # could not use original key because it was already in use
343            # so update any references to use the new key
344            self.remapped_keys[original_key] = new_dataset.key
345            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
346                                      f"{new_dataset.key} instead of key {original_key}!")
347
348        # refresh object, make sure it's in sync with the database
349        self.created_datasets.add(new_dataset.key)
350        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
351        current_log = None
352        if new_dataset.key == self.dataset.key:
353            # this ensures that the first imported dataset becomes the
354            # processor's "own" dataset, and that the import logs go to
355            # that dataset's log file. For later imports, this evaluates to
356            # False.
357
358            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
359            current_log = self.dataset.get_log_path().read_text()
360            # Update the dataset
361            self.dataset = new_dataset
362
363        # if the key of the parent dataset was changed, change the
364        # reference to it that the child dataset has
365        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
366            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
367
368        # update some attributes that should come from the new server, not
369        # the old
370        new_dataset.creator = self.dataset_owner
371        new_dataset.original_timestamp = new_dataset.timestamp
372        new_dataset.imported = True
373        new_dataset.timestamp = int(time.time())
374        new_dataset.db.commit()
375
376        # make sure the dataset path uses the new key and local dataset
377        # path settings. this also makes sure the log file is created in
378        # the right place (since it is derived from the results file path)
379        extension = metadata["result_file"].split(".")[-1]
380        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
381        if not updated:
382            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
383
384        if current_log:
385            # Add the current log to the new dataset
386            with new_dataset.get_log_path().open("a") as outfile:
387                outfile.write(current_log)
388
389        return new_dataset
390
391
392    def process_urls(self):
393        """
394        Import 4CAT dataset from another 4CAT server
395
396        Interfaces with another 4CAT server to transfer a dataset's metadata,
397        data files and child datasets.
398        """
399        urls = [url.strip() for url in self.parameters.get("url").split(",")]
400        self.base = urls[0].split("/results/")[0]
401        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
402        api_key = self.parameters.get("api-key")
403
404        imported = []  # successfully imported datasets
405        failed_imports = []  # keys that failed to import
406        num_rows = 0  # will be used later
407
408        # we can add support for multiple datasets later by removing
409        # this part!
410        keys = [keys[0]]
411
412        while keys:
413            dataset_key = keys.pop(0)
414
415            self.halt_and_catch_fire()
416            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
417
418            # first, metadata!
419            try:
420                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
421                metadata = metadata.json()
422            except FourcatImportException as e:
423                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
424                continue
425            except ValueError:
426                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
427                continue
428
429            # copying empty datasets doesn't really make sense
430            if metadata["num_rows"] == 0:
431                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
432                failed_imports.append(dataset_key)
433                continue
434
435            metadata = self.process_metadata(metadata)
436
437            # create the new dataset
438            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
439
440            # then, the log
441            self.halt_and_catch_fire()
442            try:
443                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
444                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
445                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
446                logpath = new_dataset.get_log_path()
447                new_dataset.log("Original dataset log included below:")
448                with logpath.open("a") as outfile:
449                    outfile.write(log.text)
450            except FourcatImportException as e:
451                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
452                failed_imports.append(dataset_key)
453                continue
454            except ValueError:
455                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
456                failed_imports.append(dataset_key)
457                continue
458
459            # then, the results
460            self.halt_and_catch_fire()
461            try:
462                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
463                datapath = new_dataset.get_results_path()
464                SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
465
466                if not imported:
467                    # first dataset - use num rows as 'overall'
468                    num_rows = metadata["num_rows"]
469
470            except FourcatImportException as e:
471                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
472                if new_dataset.key != self.dataset.key:
473                    new_dataset.delete()
474                continue
475
476            except ValueError:
477                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
478                failed_imports.append(dataset_key)
479                continue
480
481            # finally, the kids
482            self.halt_and_catch_fire()
483            try:
484                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
485                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
486                children = children.json()
487            except FourcatImportException as e:
488                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
489                failed_imports.append(dataset_key)
490                continue
491            except ValueError:
492                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
493                failed_imports.append(dataset_key)
494                continue
495
496            for child in children:
497                keys.append(child)
498                self.dataset.log(f"Adding child dataset {child} to import queue")
499
500            # done - remember that we've imported this one
501            imported.append(new_dataset)
502            new_dataset.update_status(metadata["status"])
503
504            if new_dataset.key != self.dataset.key:
505                # only finish if this is not the 'main' dataset, or the user
506                # will think the whole import is done
507                new_dataset.finish(metadata["num_rows"])
508
509        # todo: this part needs updating if/when we support importing multiple datasets!
510        if failed_imports:
511            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
512                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
513                                       f"dataset log file for details.", is_final=True)
514        else:
515            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
516                                       is_final=True)
517
518        if not self.dataset.is_finished():
519            # now all related datasets are imported, we can finish the 'main'
520            # dataset, and the user will be alerted that the full import is
521            # complete
522            self.dataset.finish(num_rows)
523
524    def halt_and_catch_fire(self):
525        """
526        Clean up on interrupt
527
528        There are multiple places in the code where we can bail out on an
529        interrupt, so abstract that away in its own function.
530        :return:
531        """
532        if self.interrupted:
533            # resuming is impossible because the original dataset (which
534            # has the list of URLs to import) has probably been
535            # overwritten by this point
536            deletables = [k for k in self.created_datasets if k != self.dataset.key]
537            for deletable in deletables:
538                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
539
540            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
541                                           f"will need to initiate the import again.")
542
543            raise ProcessorInterruptedException()
544
545    @staticmethod
546    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
547        """
548        Get dataset component from 4CAT export API
549
550        :param str base:  Server URL base to import from
551        :param str dataset_key:  Key of dataset to import
552        :param str api_key:  API authentication token
553        :param str component:  Component to retrieve
554        :return:  HTTP response object
555        """
556        try:
557            if component == "data" and datapath:
558                # Stream data
559                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
560                                  headers={
561                                            "User-Agent": "4cat/import",
562                                            "Authentication": api_key
563                                        }) as r:
564                    r.raise_for_status()
565                    with datapath.open("wb") as outfile:
566                        for chunk in r.iter_content(chunk_size=8192):
567                            outfile.write(chunk)
568                return r
569            else:
570                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
571                    "User-Agent": "4cat/import",
572                    "Authentication": api_key
573                })
574        except requests.Timeout:
575            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
576                                         f"accessible to external connections and try again.")
577        except requests.RequestException as e:
578            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
579                                         f"accessible to external connections and try again.")
580
581        if response.status_code == 404:
582            raise FourcatImportException(
583                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
584                f"a valid dataset.")
585        elif response.status_code in (401, 403):
586            raise FourcatImportException(
587                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
588                f"dataset and are using the correct API key.")
589        elif response.status_code != 200:
590            raise FourcatImportException(
591                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
592
593        return response
594
595    @staticmethod
596    def validate_query(query, request, config):
597        """
598        Validate custom data input
599
600        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
601        some metadata.
602
603        :param dict query:  Query parameters, from client-side.
604        :param request:  Flask request
605        :param ConfigManager|None config:  Configuration reader (context-aware)
606        :return dict:  Safe query parameters
607        """
608        if query.get("method") == "zip":
609            filename = ""
610            if "option-data_upload-entries" in request.form:
611                # First pass sends list of files in the zip
612                pass
613            elif "option-data_upload" in request.files:
614                # Second pass sends the actual file
615                file = request.files["option-data_upload"]
616                if not file:
617                    raise QueryParametersException("No file uploaded.")
618
619                if not file.filename.endswith(".zip"):
620                    raise QueryParametersException("Uploaded file must be a ZIP file.")
621
622                filename = file.filename
623            else:
624                raise QueryParametersException("No file was offered for upload.")
625
626            return {
627                "method": "zip",
628                "filename": filename
629            }
630        elif query.get("method") == "url":
631            urls = query.get("url")
632            if not urls:
633                raise QueryParametersException("Provide at least one dataset URL.")
634
635            urls = urls.split(",")
636            bases = set([url.split("/results/")[0].lower() for url in urls])
637            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
638
639            if len(keys) != 1:
640                # todo: change this to < 1 if we allow multiple datasets
641                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
642
643            if len(bases) != 1:
644                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
645                                                "one 4CAT server at a time.")
646
647            base = urls[0].split("/results/")[0]
648            try:
649                # test if API key is valid and server is reachable
650                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
651            except FourcatImportException as e:
652                raise QueryParametersException(str(e))
653
654            try:
655                # test if we get a response we can parse
656                metadata = test.json()
657            except ValueError:
658                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
659
660            version = get_software_version()
661
662            if metadata.get("current_4CAT_version") != version:
663                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
664                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
665                                               "sure both are running the same version of 4CAT and try again.")
666
667            # OK, we can import at least one dataset
668            return {
669                "url": ",".join(urls),
670                "api-key": query.get("api-key")
671            }
672        else:
673            raise QueryParametersException("Import method not yet implemented.")
674
675    @staticmethod
676    def get_keys_from_urls(urls):
677        """
678        Get dataset keys from 4CAT URLs
679
680        :param list urls:  List of URLs
681        :return list:  List of keys
682        """
683        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
684
685    @staticmethod
686    def ensure_key(query):
687        """
688        Determine key for dataset generated by this processor
689
690        When importing datasets, it's necessary to determine the key of the
691        dataset that is created before it is actually created, because we want
692        to keep the original key of the imported dataset if possible. Luckily,
693        we can deduce it from the URL we're importing the dataset from.
694
695        :param dict query:  Input from the user, through the front-end
696        :return str:  Desired dataset key
697        """
698        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
699        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
700        urls = query.get("url", "").split(",")
701        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
702        return keys[0]
class FourcatImportException(common.lib.exceptions.FourcatException):
19class FourcatImportException(FourcatException):
20    pass

Base 4CAT exception class

class SearchImportFromFourcat(backend.lib.processor.BasicProcessor):
 23class SearchImportFromFourcat(BasicProcessor):
 24    type = "import_4cat-search"  # job ID
 25    category = "Search"  # category
 26    title = "Import 4CAT dataset and analyses"  # title displayed in UI
 27    description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)"  # description displayed in UI
 28    is_local = False  # Whether this datasource is locally scraped
 29    is_static = False  # Whether this datasource is still updated
 30
 31    max_workers = 1  # this cannot be more than 1, else things get VERY messy
 32
 33    options = {
 34        "intro": {
 35            "type": UserInput.OPTION_INFO,
 36            "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. "
 37                    "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. "
 38                    "You can find the current version in the footer at the bottom of the interface."
 39        },
 40        "method": {
 41            "type": UserInput.OPTION_CHOICE,
 42            "help": "Import Type",
 43            "options": {
 44                "zip": "Zip File",
 45                "url": "4CAT URL",
 46            },
 47            "default": "url"
 48        },
 49        "url": {
 50            "type": UserInput.OPTION_TEXT,
 51            "help": "Dataset URL",
 52            "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.",
 53            "requires": "method^=url"
 54        },
 55        "intro2": {
 56            "type": UserInput.OPTION_INFO,
 57            "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need "
 58                    "an API key from **the server you are importing from**, not the one you are looking at right now. "
 59                    "Additionally, you need to have owner access to the dataset you want to import.",
 60            "requires": "method^=url"
 61        },
 62        "api-key": {
 63            "type": UserInput.OPTION_TEXT,
 64            "help": "4CAT API Key",
 65            "sensitive": True,
 66            "cache": True,
 67            "requires": "method^=url"
 68        },
 69        "data_upload": {
 70            "type": UserInput.OPTION_FILE,
 71            "help": "File",
 72            "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.",
 73            "requires": "method^=zip"
 74        },
 75
 76    }
 77
 78    created_datasets = None
 79    base = None
 80    remapped_keys = None
 81    dataset_owner = None
 82
 83    def process(self):
 84        """
 85        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 86        """
 87        self.created_datasets = set()  # keys of created datasets - may not be successful!
 88        self.remapped_keys = {}  # changed dataset keys
 89        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 90        try:
 91            if self.parameters.get("method") == "zip":
 92                self.process_zip()
 93            else:
 94                self.process_urls()
 95        except Exception as e:
 96            # Catch all exceptions and finish the job with an error
 97            # Resuming is impossible because this dataset was overwritten with the importing dataset
 98            # halt_and_catch_fire() will clean up and delete the datasets that were created
 99            self.interrupted = True
100            try:
101                self.halt_and_catch_fire()
102            except ProcessorInterruptedException:
103                pass
104            except InFailedSqlTransaction:
105                # Catch database issue and retry
106                self.db.rollback()
107                try:
108                    self.halt_and_catch_fire()
109                except ProcessorInterruptedException:
110                    pass
111            # Reraise the original exception for logging
112            raise e
113
114    def after_create(query, dataset, request):
115        """
116        Hook to execute after the dataset for this source has been created
117
118        In this case, put the file in a temporary location so it can be
119        processed properly by the related Job later.
120
121        :param dict query:  Sanitised query parameters
122        :param DataSet dataset:  Dataset created for this query
123        :param request:  Flask request submitted for its creation
124        """
125        if query.get("method") == "zip":
126            file = request.files["option-data_upload"]
127            file.seek(0)
128            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
129                while True:
130                    chunk = file.read(1024)
131                    if len(chunk) == 0:
132                        break
133                    outfile.write(chunk)
134        else:
135            # nothing to do for URLs
136            pass
137
138
139    def process_zip(self):
140        """
141        Import 4CAT dataset from a ZIP file
142        """
143        self.dataset.update_status("Importing datasets and analyses from ZIP file.")
144        temp_file = self.dataset.get_results_path().with_suffix(".importing")
145
146        imported = []
147        processed_files = 1 # take into account the export.log file
148        failed_imports = []
149        primary_dataset_original_log = None
150        with zipfile.ZipFile(temp_file, "r") as zip_ref:
151            zip_contents = zip_ref.namelist()
152
153            # Get all metadata files and determine primary dataset
154            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
155            if not metadata_files:
156                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
157                return
158
159            # Get the primary dataset
160            primary_dataset_keys = set()
161            datasets = []
162            parent_child_mapping = {}
163            for file in metadata_files:
164                with zip_ref.open(file) as f:
165                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
166                    metadata = json.loads(content)
167                    if not metadata.get("key_parent"):
168                        primary_dataset_keys.add(metadata.get("key"))
169                        datasets.append(metadata)
170                    else:
171                        # Store the mapping of parent to child datasets
172                        parent_key = metadata.get("key_parent")
173                        if parent_key not in parent_child_mapping:
174                            parent_child_mapping[parent_key] = []
175                        parent_child_mapping[parent_key].append(metadata)
176
177            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
178            if len(primary_dataset_keys) != 1:
179                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
180                return
181
182            # Import datasets
183            while datasets:
184                self.halt_and_catch_fire()
185
186                # Create the datasets
187                metadata = datasets.pop(0)
188                dataset_key = metadata.get("key")
189                processed_metadata = self.process_metadata(metadata)
190                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
191                processed_files += 1
192
193                # Copy the log file
194                self.halt_and_catch_fire()
195                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
196                if log_filename in zip_contents:
197                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
198                    with zip_ref.open(log_filename) as f:
199                        content = f.read().decode('utf-8')
200                        if new_dataset.key == self.dataset.key:
201                            # Hold the original log for the primary dataset and add at the end
202                            primary_dataset_original_log = content
203                        else:
204                            new_dataset.log("Original dataset log included below:")
205                            with new_dataset.get_log_path().open("a") as outfile:
206                                outfile.write(content)
207                    processed_files += 1
208                else:
209                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
210
211                # Copy the results
212                self.halt_and_catch_fire()
213                results_filename = metadata["result_file"]
214                if results_filename in zip_contents:
215                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
216                    with zip_ref.open(results_filename) as f:
217                        with new_dataset.get_results_path().open("wb") as outfile:
218                            outfile.write(f.read())
219                    processed_files += 1
220
221                    if not imported:
222                        # first dataset - use num rows as 'overall'
223                        num_rows = metadata["num_rows"]
224                else:
225                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
226                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
227                    failed_imports.append(dataset_key)
228                    continue
229
230                # finally, the kids
231                self.halt_and_catch_fire()
232                if dataset_key in parent_child_mapping:
233                    datasets.extend(parent_child_mapping[dataset_key])
234                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
235
236                # done - remember that we've imported this one
237                imported.append(new_dataset)
238                new_dataset.update_status(metadata["status"])
239
240                if new_dataset.key != self.dataset.key:
241                    # only finish if this is not the 'main' dataset, or the user
242                    # will think the whole import is done
243                    new_dataset.finish(metadata["num_rows"])
244
245            # Check that all files were processed
246            missed_files = []
247            if len(zip_contents) != processed_files:
248                for file in zip_contents:
249                    if file not in processed_files:
250                        missed_files.append(file)
251
252            # todo: this part needs updating if/when we support importing multiple datasets!
253            if failed_imports:
254                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
255                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
256                                           f"dataset log file for details.", is_final=True)
257            elif missed_files:
258                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
259                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
260                                           f"{len(missed_files)} files were not successfully imported. Check the "
261                                           f"dataset log file for details.", is_final=True)
262            else:
263                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
264                                           is_final=True)
265
266            if not self.dataset.is_finished():
267                # now all related datasets are imported, we can finish the 'main'
268                # dataset, and the user will be alerted that the full import is
269                # complete
270                self.dataset.finish(num_rows)
271
272            # Add the original log for the primary dataset
273            if primary_dataset_original_log:
274                self.dataset.log("Original dataset log included below:\n")
275                with self.dataset.get_log_path().open("a") as outfile:
276                    outfile.write(primary_dataset_original_log)
277
278
279    @staticmethod
280    def process_metadata(metadata):
281        """
282        Process metadata for import
283        """
284        # get rid of some keys that are server-specific and don't need to
285        # be stored (or don't correspond to database columns)
286        metadata.pop("current_4CAT_version")
287        metadata.pop("id")
288        metadata.pop("job")
289        metadata.pop("is_private")
290        metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
291
292        # extra params are stored as JSON...
293        metadata["parameters"] = json.loads(metadata["parameters"])
294        if "copied_from" in metadata["parameters"]:
295            metadata["parameters"].pop("copied_from")
296        metadata["parameters"] = json.dumps(metadata["parameters"])
297
298        return metadata
299
300    def create_dataset(self, metadata, original_key, primary=False):
301        """
302        Create a new dataset
303        """
304        if primary:
305            self.dataset.update_status(f"Importing primary dataset {original_key}.")
306            # if this is the first dataset we're importing, make it the
307            # processor's "own" dataset. the key has already been set to
308            # the imported dataset's key via ensure_key() (or a new unqiue
309            # key if it already existed on this server)
310            # by making it the "own" dataset, the user initiating the
311            # import will see the imported dataset as the "result" of their
312            # import query in the interface, similar to the workflow for
313            # other data sources
314            new_dataset = self.dataset
315
316            # Update metadata and file
317            metadata.pop("key")  # key already OK (see above)
318            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
319
320        else:
321            self.dataset.update_status(f"Importing child dataset {original_key}.")
322            # supernumerary datasets - handle on their own
323            # these include any children of imported datasets
324            try:
325                DataSet(key=metadata["key"], db=self.db, modules=self.modules)
326
327                # if we *haven't* thrown a DatasetException now, then the
328                # key is already in use, so create a "dummy" dataset and
329                # overwrite it with the metadata we have (except for the
330                # key). this ensures that a new unique key will be
331                # generated.
332                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
333                metadata.pop("key")
334                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
335
336            except DataSetException:
337                # this is *good* since it means the key doesn't exist, so
338                # we can re-use the key of the imported dataset
339                self.db.insert("datasets", data=metadata)
340                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
341
342        if new_dataset.key != original_key:
343            # could not use original key because it was already in use
344            # so update any references to use the new key
345            self.remapped_keys[original_key] = new_dataset.key
346            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
347                                      f"{new_dataset.key} instead of key {original_key}!")
348
349        # refresh object, make sure it's in sync with the database
350        self.created_datasets.add(new_dataset.key)
351        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
352        current_log = None
353        if new_dataset.key == self.dataset.key:
354            # this ensures that the first imported dataset becomes the
355            # processor's "own" dataset, and that the import logs go to
356            # that dataset's log file. For later imports, this evaluates to
357            # False.
358
359            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
360            current_log = self.dataset.get_log_path().read_text()
361            # Update the dataset
362            self.dataset = new_dataset
363
364        # if the key of the parent dataset was changed, change the
365        # reference to it that the child dataset has
366        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
367            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
368
369        # update some attributes that should come from the new server, not
370        # the old
371        new_dataset.creator = self.dataset_owner
372        new_dataset.original_timestamp = new_dataset.timestamp
373        new_dataset.imported = True
374        new_dataset.timestamp = int(time.time())
375        new_dataset.db.commit()
376
377        # make sure the dataset path uses the new key and local dataset
378        # path settings. this also makes sure the log file is created in
379        # the right place (since it is derived from the results file path)
380        extension = metadata["result_file"].split(".")[-1]
381        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
382        if not updated:
383            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
384
385        if current_log:
386            # Add the current log to the new dataset
387            with new_dataset.get_log_path().open("a") as outfile:
388                outfile.write(current_log)
389
390        return new_dataset
391
392
393    def process_urls(self):
394        """
395        Import 4CAT dataset from another 4CAT server
396
397        Interfaces with another 4CAT server to transfer a dataset's metadata,
398        data files and child datasets.
399        """
400        urls = [url.strip() for url in self.parameters.get("url").split(",")]
401        self.base = urls[0].split("/results/")[0]
402        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
403        api_key = self.parameters.get("api-key")
404
405        imported = []  # successfully imported datasets
406        failed_imports = []  # keys that failed to import
407        num_rows = 0  # will be used later
408
409        # we can add support for multiple datasets later by removing
410        # this part!
411        keys = [keys[0]]
412
413        while keys:
414            dataset_key = keys.pop(0)
415
416            self.halt_and_catch_fire()
417            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
418
419            # first, metadata!
420            try:
421                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
422                metadata = metadata.json()
423            except FourcatImportException as e:
424                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
425                continue
426            except ValueError:
427                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
428                continue
429
430            # copying empty datasets doesn't really make sense
431            if metadata["num_rows"] == 0:
432                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
433                failed_imports.append(dataset_key)
434                continue
435
436            metadata = self.process_metadata(metadata)
437
438            # create the new dataset
439            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
440
441            # then, the log
442            self.halt_and_catch_fire()
443            try:
444                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
445                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
446                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
447                logpath = new_dataset.get_log_path()
448                new_dataset.log("Original dataset log included below:")
449                with logpath.open("a") as outfile:
450                    outfile.write(log.text)
451            except FourcatImportException as e:
452                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
453                failed_imports.append(dataset_key)
454                continue
455            except ValueError:
456                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
457                failed_imports.append(dataset_key)
458                continue
459
460            # then, the results
461            self.halt_and_catch_fire()
462            try:
463                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
464                datapath = new_dataset.get_results_path()
465                SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
466
467                if not imported:
468                    # first dataset - use num rows as 'overall'
469                    num_rows = metadata["num_rows"]
470
471            except FourcatImportException as e:
472                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
473                if new_dataset.key != self.dataset.key:
474                    new_dataset.delete()
475                continue
476
477            except ValueError:
478                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
479                failed_imports.append(dataset_key)
480                continue
481
482            # finally, the kids
483            self.halt_and_catch_fire()
484            try:
485                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
486                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
487                children = children.json()
488            except FourcatImportException as e:
489                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
490                failed_imports.append(dataset_key)
491                continue
492            except ValueError:
493                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
494                failed_imports.append(dataset_key)
495                continue
496
497            for child in children:
498                keys.append(child)
499                self.dataset.log(f"Adding child dataset {child} to import queue")
500
501            # done - remember that we've imported this one
502            imported.append(new_dataset)
503            new_dataset.update_status(metadata["status"])
504
505            if new_dataset.key != self.dataset.key:
506                # only finish if this is not the 'main' dataset, or the user
507                # will think the whole import is done
508                new_dataset.finish(metadata["num_rows"])
509
510        # todo: this part needs updating if/when we support importing multiple datasets!
511        if failed_imports:
512            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
513                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
514                                       f"dataset log file for details.", is_final=True)
515        else:
516            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
517                                       is_final=True)
518
519        if not self.dataset.is_finished():
520            # now all related datasets are imported, we can finish the 'main'
521            # dataset, and the user will be alerted that the full import is
522            # complete
523            self.dataset.finish(num_rows)
524
525    def halt_and_catch_fire(self):
526        """
527        Clean up on interrupt
528
529        There are multiple places in the code where we can bail out on an
530        interrupt, so abstract that away in its own function.
531        :return:
532        """
533        if self.interrupted:
534            # resuming is impossible because the original dataset (which
535            # has the list of URLs to import) has probably been
536            # overwritten by this point
537            deletables = [k for k in self.created_datasets if k != self.dataset.key]
538            for deletable in deletables:
539                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
540
541            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
542                                           f"will need to initiate the import again.")
543
544            raise ProcessorInterruptedException()
545
546    @staticmethod
547    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
548        """
549        Get dataset component from 4CAT export API
550
551        :param str base:  Server URL base to import from
552        :param str dataset_key:  Key of dataset to import
553        :param str api_key:  API authentication token
554        :param str component:  Component to retrieve
555        :return:  HTTP response object
556        """
557        try:
558            if component == "data" and datapath:
559                # Stream data
560                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
561                                  headers={
562                                            "User-Agent": "4cat/import",
563                                            "Authentication": api_key
564                                        }) as r:
565                    r.raise_for_status()
566                    with datapath.open("wb") as outfile:
567                        for chunk in r.iter_content(chunk_size=8192):
568                            outfile.write(chunk)
569                return r
570            else:
571                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
572                    "User-Agent": "4cat/import",
573                    "Authentication": api_key
574                })
575        except requests.Timeout:
576            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
577                                         f"accessible to external connections and try again.")
578        except requests.RequestException as e:
579            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
580                                         f"accessible to external connections and try again.")
581
582        if response.status_code == 404:
583            raise FourcatImportException(
584                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
585                f"a valid dataset.")
586        elif response.status_code in (401, 403):
587            raise FourcatImportException(
588                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
589                f"dataset and are using the correct API key.")
590        elif response.status_code != 200:
591            raise FourcatImportException(
592                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
593
594        return response
595
596    @staticmethod
597    def validate_query(query, request, config):
598        """
599        Validate custom data input
600
601        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
602        some metadata.
603
604        :param dict query:  Query parameters, from client-side.
605        :param request:  Flask request
606        :param ConfigManager|None config:  Configuration reader (context-aware)
607        :return dict:  Safe query parameters
608        """
609        if query.get("method") == "zip":
610            filename = ""
611            if "option-data_upload-entries" in request.form:
612                # First pass sends list of files in the zip
613                pass
614            elif "option-data_upload" in request.files:
615                # Second pass sends the actual file
616                file = request.files["option-data_upload"]
617                if not file:
618                    raise QueryParametersException("No file uploaded.")
619
620                if not file.filename.endswith(".zip"):
621                    raise QueryParametersException("Uploaded file must be a ZIP file.")
622
623                filename = file.filename
624            else:
625                raise QueryParametersException("No file was offered for upload.")
626
627            return {
628                "method": "zip",
629                "filename": filename
630            }
631        elif query.get("method") == "url":
632            urls = query.get("url")
633            if not urls:
634                raise QueryParametersException("Provide at least one dataset URL.")
635
636            urls = urls.split(",")
637            bases = set([url.split("/results/")[0].lower() for url in urls])
638            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
639
640            if len(keys) != 1:
641                # todo: change this to < 1 if we allow multiple datasets
642                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
643
644            if len(bases) != 1:
645                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
646                                                "one 4CAT server at a time.")
647
648            base = urls[0].split("/results/")[0]
649            try:
650                # test if API key is valid and server is reachable
651                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
652            except FourcatImportException as e:
653                raise QueryParametersException(str(e))
654
655            try:
656                # test if we get a response we can parse
657                metadata = test.json()
658            except ValueError:
659                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
660
661            version = get_software_version()
662
663            if metadata.get("current_4CAT_version") != version:
664                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
665                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
666                                               "sure both are running the same version of 4CAT and try again.")
667
668            # OK, we can import at least one dataset
669            return {
670                "url": ",".join(urls),
671                "api-key": query.get("api-key")
672            }
673        else:
674            raise QueryParametersException("Import method not yet implemented.")
675
676    @staticmethod
677    def get_keys_from_urls(urls):
678        """
679        Get dataset keys from 4CAT URLs
680
681        :param list urls:  List of URLs
682        :return list:  List of keys
683        """
684        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
685
686    @staticmethod
687    def ensure_key(query):
688        """
689        Determine key for dataset generated by this processor
690
691        When importing datasets, it's necessary to determine the key of the
692        dataset that is created before it is actually created, because we want
693        to keep the original key of the imported dataset if possible. Luckily,
694        we can deduce it from the URL we're importing the dataset from.
695
696        :param dict query:  Input from the user, through the front-end
697        :return str:  Desired dataset key
698        """
699        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
700        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
701        urls = query.get("url", "").split(",")
702        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
703        return keys[0]

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:

@classmethod
def is_compatible_with(cls, module=None, config=None):
    return module.type == "linguistic-features"
type = 'import_4cat-search'
category = 'Search'
title = 'Import 4CAT dataset and analyses'
description = 'Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)'
is_local = False
is_static = False
max_workers = 1
options = {'intro': {'type': 'info', 'help': 'Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. \n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. You can find the current version in the footer at the bottom of the interface.'}, 'method': {'type': 'choice', 'help': 'Import Type', 'options': {'zip': 'Zip File', 'url': '4CAT URL'}, 'default': 'url'}, 'url': {'type': 'string', 'help': 'Dataset URL', 'tooltip': "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 'requires': 'method^=url'}, 'intro2': {'type': 'info', 'help': "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need an API key from **the server you are importing from**, not the one you are looking at right now. Additionally, you need to have owner access to the dataset you want to import.", 'requires': 'method^=url'}, 'api-key': {'type': 'string', 'help': '4CAT API Key', 'sensitive': True, 'cache': True, 'requires': 'method^=url'}, 'data_upload': {'type': 'file', 'help': 'File', 'tooltip': 'Upload a ZIP file containing a dataset exported from a 4CAT server.', 'requires': 'method^=zip'}}
created_datasets = None
base = None
remapped_keys = None
dataset_owner = None
def process(self):
 83    def process(self):
 84        """
 85        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 86        """
 87        self.created_datasets = set()  # keys of created datasets - may not be successful!
 88        self.remapped_keys = {}  # changed dataset keys
 89        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 90        try:
 91            if self.parameters.get("method") == "zip":
 92                self.process_zip()
 93            else:
 94                self.process_urls()
 95        except Exception as e:
 96            # Catch all exceptions and finish the job with an error
 97            # Resuming is impossible because this dataset was overwritten with the importing dataset
 98            # halt_and_catch_fire() will clean up and delete the datasets that were created
 99            self.interrupted = True
100            try:
101                self.halt_and_catch_fire()
102            except ProcessorInterruptedException:
103                pass
104            except InFailedSqlTransaction:
105                # Catch database issue and retry
106                self.db.rollback()
107                try:
108                    self.halt_and_catch_fire()
109                except ProcessorInterruptedException:
110                    pass
111            # Reraise the original exception for logging
112            raise e

Import 4CAT dataset either from another 4CAT server or from the uploaded zip file

def after_create(query, dataset, request):
114    def after_create(query, dataset, request):
115        """
116        Hook to execute after the dataset for this source has been created
117
118        In this case, put the file in a temporary location so it can be
119        processed properly by the related Job later.
120
121        :param dict query:  Sanitised query parameters
122        :param DataSet dataset:  Dataset created for this query
123        :param request:  Flask request submitted for its creation
124        """
125        if query.get("method") == "zip":
126            file = request.files["option-data_upload"]
127            file.seek(0)
128            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
129                while True:
130                    chunk = file.read(1024)
131                    if len(chunk) == 0:
132                        break
133                    outfile.write(chunk)
134        else:
135            # nothing to do for URLs
136            pass

Hook to execute after the dataset for this source has been created

In this case, put the file in a temporary location so it can be processed properly by the related Job later.

Parameters
  • dict query: Sanitised query parameters
  • DataSet dataset: Dataset created for this query
  • request: Flask request submitted for its creation
def process_zip(self):
139    def process_zip(self):
140        """
141        Import 4CAT dataset from a ZIP file
142        """
143        self.dataset.update_status("Importing datasets and analyses from ZIP file.")
144        temp_file = self.dataset.get_results_path().with_suffix(".importing")
145
146        imported = []
147        processed_files = 1 # take into account the export.log file
148        failed_imports = []
149        primary_dataset_original_log = None
150        with zipfile.ZipFile(temp_file, "r") as zip_ref:
151            zip_contents = zip_ref.namelist()
152
153            # Get all metadata files and determine primary dataset
154            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
155            if not metadata_files:
156                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
157                return
158
159            # Get the primary dataset
160            primary_dataset_keys = set()
161            datasets = []
162            parent_child_mapping = {}
163            for file in metadata_files:
164                with zip_ref.open(file) as f:
165                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
166                    metadata = json.loads(content)
167                    if not metadata.get("key_parent"):
168                        primary_dataset_keys.add(metadata.get("key"))
169                        datasets.append(metadata)
170                    else:
171                        # Store the mapping of parent to child datasets
172                        parent_key = metadata.get("key_parent")
173                        if parent_key not in parent_child_mapping:
174                            parent_child_mapping[parent_key] = []
175                        parent_child_mapping[parent_key].append(metadata)
176
177            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
178            if len(primary_dataset_keys) != 1:
179                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
180                return
181
182            # Import datasets
183            while datasets:
184                self.halt_and_catch_fire()
185
186                # Create the datasets
187                metadata = datasets.pop(0)
188                dataset_key = metadata.get("key")
189                processed_metadata = self.process_metadata(metadata)
190                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
191                processed_files += 1
192
193                # Copy the log file
194                self.halt_and_catch_fire()
195                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
196                if log_filename in zip_contents:
197                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
198                    with zip_ref.open(log_filename) as f:
199                        content = f.read().decode('utf-8')
200                        if new_dataset.key == self.dataset.key:
201                            # Hold the original log for the primary dataset and add at the end
202                            primary_dataset_original_log = content
203                        else:
204                            new_dataset.log("Original dataset log included below:")
205                            with new_dataset.get_log_path().open("a") as outfile:
206                                outfile.write(content)
207                    processed_files += 1
208                else:
209                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
210
211                # Copy the results
212                self.halt_and_catch_fire()
213                results_filename = metadata["result_file"]
214                if results_filename in zip_contents:
215                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
216                    with zip_ref.open(results_filename) as f:
217                        with new_dataset.get_results_path().open("wb") as outfile:
218                            outfile.write(f.read())
219                    processed_files += 1
220
221                    if not imported:
222                        # first dataset - use num rows as 'overall'
223                        num_rows = metadata["num_rows"]
224                else:
225                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
226                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
227                    failed_imports.append(dataset_key)
228                    continue
229
230                # finally, the kids
231                self.halt_and_catch_fire()
232                if dataset_key in parent_child_mapping:
233                    datasets.extend(parent_child_mapping[dataset_key])
234                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
235
236                # done - remember that we've imported this one
237                imported.append(new_dataset)
238                new_dataset.update_status(metadata["status"])
239
240                if new_dataset.key != self.dataset.key:
241                    # only finish if this is not the 'main' dataset, or the user
242                    # will think the whole import is done
243                    new_dataset.finish(metadata["num_rows"])
244
245            # Check that all files were processed
246            missed_files = []
247            if len(zip_contents) != processed_files:
248                for file in zip_contents:
249                    if file not in processed_files:
250                        missed_files.append(file)
251
252            # todo: this part needs updating if/when we support importing multiple datasets!
253            if failed_imports:
254                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
255                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
256                                           f"dataset log file for details.", is_final=True)
257            elif missed_files:
258                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
259                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
260                                           f"{len(missed_files)} files were not successfully imported. Check the "
261                                           f"dataset log file for details.", is_final=True)
262            else:
263                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
264                                           is_final=True)
265
266            if not self.dataset.is_finished():
267                # now all related datasets are imported, we can finish the 'main'
268                # dataset, and the user will be alerted that the full import is
269                # complete
270                self.dataset.finish(num_rows)
271
272            # Add the original log for the primary dataset
273            if primary_dataset_original_log:
274                self.dataset.log("Original dataset log included below:\n")
275                with self.dataset.get_log_path().open("a") as outfile:
276                    outfile.write(primary_dataset_original_log)

Import 4CAT dataset from a ZIP file

@staticmethod
def process_metadata(metadata):
279    @staticmethod
280    def process_metadata(metadata):
281        """
282        Process metadata for import
283        """
284        # get rid of some keys that are server-specific and don't need to
285        # be stored (or don't correspond to database columns)
286        metadata.pop("current_4CAT_version")
287        metadata.pop("id")
288        metadata.pop("job")
289        metadata.pop("is_private")
290        metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
291
292        # extra params are stored as JSON...
293        metadata["parameters"] = json.loads(metadata["parameters"])
294        if "copied_from" in metadata["parameters"]:
295            metadata["parameters"].pop("copied_from")
296        metadata["parameters"] = json.dumps(metadata["parameters"])
297
298        return metadata

Process metadata for import

def create_dataset(self, metadata, original_key, primary=False):
300    def create_dataset(self, metadata, original_key, primary=False):
301        """
302        Create a new dataset
303        """
304        if primary:
305            self.dataset.update_status(f"Importing primary dataset {original_key}.")
306            # if this is the first dataset we're importing, make it the
307            # processor's "own" dataset. the key has already been set to
308            # the imported dataset's key via ensure_key() (or a new unqiue
309            # key if it already existed on this server)
310            # by making it the "own" dataset, the user initiating the
311            # import will see the imported dataset as the "result" of their
312            # import query in the interface, similar to the workflow for
313            # other data sources
314            new_dataset = self.dataset
315
316            # Update metadata and file
317            metadata.pop("key")  # key already OK (see above)
318            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
319
320        else:
321            self.dataset.update_status(f"Importing child dataset {original_key}.")
322            # supernumerary datasets - handle on their own
323            # these include any children of imported datasets
324            try:
325                DataSet(key=metadata["key"], db=self.db, modules=self.modules)
326
327                # if we *haven't* thrown a DatasetException now, then the
328                # key is already in use, so create a "dummy" dataset and
329                # overwrite it with the metadata we have (except for the
330                # key). this ensures that a new unique key will be
331                # generated.
332                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
333                metadata.pop("key")
334                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
335
336            except DataSetException:
337                # this is *good* since it means the key doesn't exist, so
338                # we can re-use the key of the imported dataset
339                self.db.insert("datasets", data=metadata)
340                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
341
342        if new_dataset.key != original_key:
343            # could not use original key because it was already in use
344            # so update any references to use the new key
345            self.remapped_keys[original_key] = new_dataset.key
346            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
347                                      f"{new_dataset.key} instead of key {original_key}!")
348
349        # refresh object, make sure it's in sync with the database
350        self.created_datasets.add(new_dataset.key)
351        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
352        current_log = None
353        if new_dataset.key == self.dataset.key:
354            # this ensures that the first imported dataset becomes the
355            # processor's "own" dataset, and that the import logs go to
356            # that dataset's log file. For later imports, this evaluates to
357            # False.
358
359            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
360            current_log = self.dataset.get_log_path().read_text()
361            # Update the dataset
362            self.dataset = new_dataset
363
364        # if the key of the parent dataset was changed, change the
365        # reference to it that the child dataset has
366        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
367            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
368
369        # update some attributes that should come from the new server, not
370        # the old
371        new_dataset.creator = self.dataset_owner
372        new_dataset.original_timestamp = new_dataset.timestamp
373        new_dataset.imported = True
374        new_dataset.timestamp = int(time.time())
375        new_dataset.db.commit()
376
377        # make sure the dataset path uses the new key and local dataset
378        # path settings. this also makes sure the log file is created in
379        # the right place (since it is derived from the results file path)
380        extension = metadata["result_file"].split(".")[-1]
381        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
382        if not updated:
383            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
384
385        if current_log:
386            # Add the current log to the new dataset
387            with new_dataset.get_log_path().open("a") as outfile:
388                outfile.write(current_log)
389
390        return new_dataset

Create a new dataset

def process_urls(self):
393    def process_urls(self):
394        """
395        Import 4CAT dataset from another 4CAT server
396
397        Interfaces with another 4CAT server to transfer a dataset's metadata,
398        data files and child datasets.
399        """
400        urls = [url.strip() for url in self.parameters.get("url").split(",")]
401        self.base = urls[0].split("/results/")[0]
402        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
403        api_key = self.parameters.get("api-key")
404
405        imported = []  # successfully imported datasets
406        failed_imports = []  # keys that failed to import
407        num_rows = 0  # will be used later
408
409        # we can add support for multiple datasets later by removing
410        # this part!
411        keys = [keys[0]]
412
413        while keys:
414            dataset_key = keys.pop(0)
415
416            self.halt_and_catch_fire()
417            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
418
419            # first, metadata!
420            try:
421                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
422                metadata = metadata.json()
423            except FourcatImportException as e:
424                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
425                continue
426            except ValueError:
427                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
428                continue
429
430            # copying empty datasets doesn't really make sense
431            if metadata["num_rows"] == 0:
432                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
433                failed_imports.append(dataset_key)
434                continue
435
436            metadata = self.process_metadata(metadata)
437
438            # create the new dataset
439            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
440
441            # then, the log
442            self.halt_and_catch_fire()
443            try:
444                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
445                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
446                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
447                logpath = new_dataset.get_log_path()
448                new_dataset.log("Original dataset log included below:")
449                with logpath.open("a") as outfile:
450                    outfile.write(log.text)
451            except FourcatImportException as e:
452                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
453                failed_imports.append(dataset_key)
454                continue
455            except ValueError:
456                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
457                failed_imports.append(dataset_key)
458                continue
459
460            # then, the results
461            self.halt_and_catch_fire()
462            try:
463                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
464                datapath = new_dataset.get_results_path()
465                SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
466
467                if not imported:
468                    # first dataset - use num rows as 'overall'
469                    num_rows = metadata["num_rows"]
470
471            except FourcatImportException as e:
472                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
473                if new_dataset.key != self.dataset.key:
474                    new_dataset.delete()
475                continue
476
477            except ValueError:
478                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
479                failed_imports.append(dataset_key)
480                continue
481
482            # finally, the kids
483            self.halt_and_catch_fire()
484            try:
485                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
486                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
487                children = children.json()
488            except FourcatImportException as e:
489                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
490                failed_imports.append(dataset_key)
491                continue
492            except ValueError:
493                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
494                failed_imports.append(dataset_key)
495                continue
496
497            for child in children:
498                keys.append(child)
499                self.dataset.log(f"Adding child dataset {child} to import queue")
500
501            # done - remember that we've imported this one
502            imported.append(new_dataset)
503            new_dataset.update_status(metadata["status"])
504
505            if new_dataset.key != self.dataset.key:
506                # only finish if this is not the 'main' dataset, or the user
507                # will think the whole import is done
508                new_dataset.finish(metadata["num_rows"])
509
510        # todo: this part needs updating if/when we support importing multiple datasets!
511        if failed_imports:
512            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
513                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
514                                       f"dataset log file for details.", is_final=True)
515        else:
516            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
517                                       is_final=True)
518
519        if not self.dataset.is_finished():
520            # now all related datasets are imported, we can finish the 'main'
521            # dataset, and the user will be alerted that the full import is
522            # complete
523            self.dataset.finish(num_rows)

Import 4CAT dataset from another 4CAT server

Interfaces with another 4CAT server to transfer a dataset's metadata, data files and child datasets.

def halt_and_catch_fire(self):
525    def halt_and_catch_fire(self):
526        """
527        Clean up on interrupt
528
529        There are multiple places in the code where we can bail out on an
530        interrupt, so abstract that away in its own function.
531        :return:
532        """
533        if self.interrupted:
534            # resuming is impossible because the original dataset (which
535            # has the list of URLs to import) has probably been
536            # overwritten by this point
537            deletables = [k for k in self.created_datasets if k != self.dataset.key]
538            for deletable in deletables:
539                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
540
541            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
542                                           f"will need to initiate the import again.")
543
544            raise ProcessorInterruptedException()

Clean up on interrupt

There are multiple places in the code where we can bail out on an interrupt, so abstract that away in its own function.

Returns
@staticmethod
def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
546    @staticmethod
547    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
548        """
549        Get dataset component from 4CAT export API
550
551        :param str base:  Server URL base to import from
552        :param str dataset_key:  Key of dataset to import
553        :param str api_key:  API authentication token
554        :param str component:  Component to retrieve
555        :return:  HTTP response object
556        """
557        try:
558            if component == "data" and datapath:
559                # Stream data
560                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
561                                  headers={
562                                            "User-Agent": "4cat/import",
563                                            "Authentication": api_key
564                                        }) as r:
565                    r.raise_for_status()
566                    with datapath.open("wb") as outfile:
567                        for chunk in r.iter_content(chunk_size=8192):
568                            outfile.write(chunk)
569                return r
570            else:
571                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
572                    "User-Agent": "4cat/import",
573                    "Authentication": api_key
574                })
575        except requests.Timeout:
576            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
577                                         f"accessible to external connections and try again.")
578        except requests.RequestException as e:
579            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
580                                         f"accessible to external connections and try again.")
581
582        if response.status_code == 404:
583            raise FourcatImportException(
584                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
585                f"a valid dataset.")
586        elif response.status_code in (401, 403):
587            raise FourcatImportException(
588                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
589                f"dataset and are using the correct API key.")
590        elif response.status_code != 200:
591            raise FourcatImportException(
592                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
593
594        return response

Get dataset component from 4CAT export API

Parameters
  • str base: Server URL base to import from
  • str dataset_key: Key of dataset to import
  • str api_key: API authentication token
  • str component: Component to retrieve
Returns

HTTP response object

@staticmethod
def validate_query(query, request, config):
596    @staticmethod
597    def validate_query(query, request, config):
598        """
599        Validate custom data input
600
601        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
602        some metadata.
603
604        :param dict query:  Query parameters, from client-side.
605        :param request:  Flask request
606        :param ConfigManager|None config:  Configuration reader (context-aware)
607        :return dict:  Safe query parameters
608        """
609        if query.get("method") == "zip":
610            filename = ""
611            if "option-data_upload-entries" in request.form:
612                # First pass sends list of files in the zip
613                pass
614            elif "option-data_upload" in request.files:
615                # Second pass sends the actual file
616                file = request.files["option-data_upload"]
617                if not file:
618                    raise QueryParametersException("No file uploaded.")
619
620                if not file.filename.endswith(".zip"):
621                    raise QueryParametersException("Uploaded file must be a ZIP file.")
622
623                filename = file.filename
624            else:
625                raise QueryParametersException("No file was offered for upload.")
626
627            return {
628                "method": "zip",
629                "filename": filename
630            }
631        elif query.get("method") == "url":
632            urls = query.get("url")
633            if not urls:
634                raise QueryParametersException("Provide at least one dataset URL.")
635
636            urls = urls.split(",")
637            bases = set([url.split("/results/")[0].lower() for url in urls])
638            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
639
640            if len(keys) != 1:
641                # todo: change this to < 1 if we allow multiple datasets
642                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
643
644            if len(bases) != 1:
645                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
646                                                "one 4CAT server at a time.")
647
648            base = urls[0].split("/results/")[0]
649            try:
650                # test if API key is valid and server is reachable
651                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
652            except FourcatImportException as e:
653                raise QueryParametersException(str(e))
654
655            try:
656                # test if we get a response we can parse
657                metadata = test.json()
658            except ValueError:
659                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
660
661            version = get_software_version()
662
663            if metadata.get("current_4CAT_version") != version:
664                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
665                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
666                                               "sure both are running the same version of 4CAT and try again.")
667
668            # OK, we can import at least one dataset
669            return {
670                "url": ",".join(urls),
671                "api-key": query.get("api-key")
672            }
673        else:
674            raise QueryParametersException("Import method not yet implemented.")

Validate custom data input

Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • ConfigManager|None config: Configuration reader (context-aware)
Returns

Safe query parameters

@staticmethod
def get_keys_from_urls(urls):
676    @staticmethod
677    def get_keys_from_urls(urls):
678        """
679        Get dataset keys from 4CAT URLs
680
681        :param list urls:  List of URLs
682        :return list:  List of keys
683        """
684        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]

Get dataset keys from 4CAT URLs

Parameters
  • list urls: List of URLs
Returns

List of keys

@staticmethod
def ensure_key(query):
686    @staticmethod
687    def ensure_key(query):
688        """
689        Determine key for dataset generated by this processor
690
691        When importing datasets, it's necessary to determine the key of the
692        dataset that is created before it is actually created, because we want
693        to keep the original key of the imported dataset if possible. Luckily,
694        we can deduce it from the URL we're importing the dataset from.
695
696        :param dict query:  Input from the user, through the front-end
697        :return str:  Desired dataset key
698        """
699        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
700        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
701        urls = query.get("url", "").split(",")
702        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
703        return keys[0]

Determine key for dataset generated by this processor

When importing datasets, it's necessary to determine the key of the dataset that is created before it is actually created, because we want to keep the original key of the imported dataset if possible. Luckily, we can deduce it from the URL we're importing the dataset from.

Parameters
  • dict query: Input from the user, through the front-end
Returns

Desired dataset key