Edit on GitHub

datasources.fourcat_import.import_4cat

Import datasets from other 4CATs

  1"""
  2Import datasets from other 4CATs
  3"""
  4import requests
  5import json
  6import time
  7import zipfile
  8from pathlib import Path
  9from psycopg2.errors import InFailedSqlTransaction
 10
 11from backend.lib.processor import BasicProcessor
 12from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException,
 13                                   DataSetException)
 14from common.lib.helpers import UserInput, get_software_version
 15from common.lib.dataset import DataSet
 16
 17
 18class FourcatImportException(FourcatException):
 19    pass
 20
 21
 22class SearchImportFromFourcat(BasicProcessor):
 23    type = "import_4cat-search"  # job ID
 24    category = "Search"  # category
 25    title = "Import 4CAT dataset and analyses"  # title displayed in UI
 26    description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)"  # description displayed in UI
 27    is_local = False  # Whether this datasource is locally scraped
 28    is_static = False  # Whether this datasource is still updated
 29
 30    max_workers = 1  # this cannot be more than 1, else things get VERY messy
 31
 32    created_datasets = None
 33    base = None
 34    remapped_keys = None
 35    dataset_owner = None
 36
 37    @classmethod
 38    def get_options(cls, parent_dataset=None, config=None) -> dict:
 39        """
 40        Get processor options
 41
 42        :param DataSet parent_dataset:  An object representing the dataset that
 43          the processor would or was run on
 44        :param ConfigManager|None config:  Configuration reader (context-aware)
 45        """
 46        return {
 47            "intro": {
 48                "type": UserInput.OPTION_INFO,
 49                "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. "
 50                        "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. "
 51                        "You can find the current version in the footer at the bottom of the interface."
 52            },
 53            "method": {
 54                "type": UserInput.OPTION_CHOICE,
 55                "help": "Import Type",
 56                "options": {
 57                    "zip": "Zip File",
 58                    "url": "4CAT URL",
 59                },
 60                "default": "url"
 61            },
 62            "url": {
 63                "type": UserInput.OPTION_TEXT,
 64                "help": "Dataset URL",
 65                "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.",
 66                "requires": "method^=url"
 67            },
 68            "intro2": {
 69                "type": UserInput.OPTION_INFO,
 70                "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need "
 71                        "an API key from **the server you are importing from**, not the one you are looking at right now. "
 72                        "Additionally, you need to have owner access to the dataset you want to import.",
 73                "requires": "method^=url"
 74            },
 75            "api-key": {
 76                "type": UserInput.OPTION_TEXT,
 77                "help": "4CAT API Key",
 78                "sensitive": True,
 79                "cache": True,
 80                "requires": "method^=url"
 81            },
 82            "data_upload": {
 83                "type": UserInput.OPTION_FILE,
 84                "help": "File",
 85                "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.",
 86                "requires": "method^=zip"
 87            },
 88
 89        }
 90
 91    def process(self):
 92        """
 93        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 94        """
 95        self.created_datasets = set()  # keys of created datasets - may not be successful!
 96        self.remapped_keys = {}  # changed dataset keys
 97        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 98        try:
 99            if self.parameters.get("method") == "zip":
100                self.process_zip()
101            else:
102                self.process_urls()
103        except Exception as e:
104            # Catch all exceptions and finish the job with an error
105            # Resuming is impossible because this dataset was overwritten with the importing dataset
106            # halt_and_catch_fire() will clean up and delete the datasets that were created
107            self.interrupted = True
108            try:
109                self.halt_and_catch_fire()
110            except ProcessorInterruptedException:
111                pass
112            except InFailedSqlTransaction:
113                # Catch database issue and retry
114                self.db.rollback()
115                try:
116                    self.halt_and_catch_fire()
117                except ProcessorInterruptedException:
118                    pass
119            # Reraise the original exception for logging
120            raise e
121
122    def after_create(query, dataset, request):
123        """
124        Hook to execute after the dataset for this source has been created
125
126        In this case, put the file in a temporary location so it can be
127        processed properly by the related Job later.
128
129        :param dict query:  Sanitised query parameters
130        :param DataSet dataset:  Dataset created for this query
131        :param request:  Flask request submitted for its creation
132        """
133        if query.get("method") == "zip":
134            file = request.files["option-data_upload"]
135            file.seek(0)
136            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
137                while True:
138                    chunk = file.read(1024)
139                    if len(chunk) == 0:
140                        break
141                    outfile.write(chunk)
142        else:
143            # nothing to do for URLs
144            pass
145
146
147    def process_zip(self):
148        """
149        Import 4CAT dataset from a ZIP file
150        """
151        self.dataset.update_status("Importing datasets and analyses from ZIP file.")
152        temp_file = self.dataset.get_results_path().with_suffix(".importing")
153
154        imported = []
155        processed_files = 1 # take into account the export.log file
156        failed_imports = []
157        primary_dataset_original_log = None
158        with zipfile.ZipFile(temp_file, "r") as zip_ref:
159            zip_contents = zip_ref.namelist()
160
161            # Get all metadata files and determine primary dataset
162            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
163            if not metadata_files:
164                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
165                return
166
167            # Get the primary dataset
168            primary_dataset_keys = set()
169            datasets = []
170            parent_child_mapping = {}
171            for file in metadata_files:
172                with zip_ref.open(file) as f:
173                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
174                    metadata = json.loads(content)
175                    if not metadata.get("key_parent"):
176                        primary_dataset_keys.add(metadata.get("key"))
177                        datasets.append(metadata)
178                    else:
179                        # Store the mapping of parent to child datasets
180                        parent_key = metadata.get("key_parent")
181                        if parent_key not in parent_child_mapping:
182                            parent_child_mapping[parent_key] = []
183                        parent_child_mapping[parent_key].append(metadata)
184
185            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
186            if len(primary_dataset_keys) != 1:
187                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
188                return
189
190            # Import datasets
191            while datasets:
192                self.halt_and_catch_fire()
193
194                # Create the datasets
195                metadata = datasets.pop(0)
196                dataset_key = metadata.get("key")
197                processed_metadata = self.process_metadata(metadata)
198                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
199                processed_files += 1
200
201                # Copy the log file
202                self.halt_and_catch_fire()
203                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
204                if log_filename in zip_contents:
205                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
206                    with zip_ref.open(log_filename) as f:
207                        content = f.read().decode('utf-8')
208                        if new_dataset.key == self.dataset.key:
209                            # Hold the original log for the primary dataset and add at the end
210                            primary_dataset_original_log = content
211                        else:
212                            new_dataset.log("Original dataset log included below:")
213                            with new_dataset.get_log_path().open("a") as outfile:
214                                outfile.write(content)
215                    processed_files += 1
216                else:
217                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
218
219                # Copy the results
220                self.halt_and_catch_fire()
221                results_filename = metadata["result_file"]
222                if results_filename in zip_contents:
223                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
224                    with zip_ref.open(results_filename) as f:
225                        with new_dataset.get_results_path().open("wb") as outfile:
226                            outfile.write(f.read())
227                    processed_files += 1
228
229                    if not imported:
230                        # first dataset - use num rows as 'overall'
231                        num_rows = metadata["num_rows"]
232                else:
233                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
234                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
235                    failed_imports.append(dataset_key)
236                    continue
237
238                # finally, the kids
239                self.halt_and_catch_fire()
240                if dataset_key in parent_child_mapping:
241                    datasets.extend(parent_child_mapping[dataset_key])
242                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
243
244                # done - remember that we've imported this one
245                imported.append(new_dataset)
246                new_dataset.update_status(metadata["status"])
247
248                if new_dataset.key != self.dataset.key:
249                    # only finish if this is not the 'main' dataset, or the user
250                    # will think the whole import is done
251                    new_dataset.finish(metadata["num_rows"])
252
253            # Check that all files were processed
254            missed_files = []
255            if len(zip_contents) != processed_files:
256                for file in zip_contents:
257                    if file not in processed_files:
258                        missed_files.append(file)
259
260            # todo: this part needs updating if/when we support importing multiple datasets!
261            if failed_imports:
262                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
263                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
264                                           f"dataset log file for details.", is_final=True)
265            elif missed_files:
266                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
267                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
268                                           f"{len(missed_files)} files were not successfully imported. Check the "
269                                           f"dataset log file for details.", is_final=True)
270            else:
271                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
272                                           is_final=True)
273
274            if not self.dataset.is_finished():
275                # now all related datasets are imported, we can finish the 'main'
276                # dataset, and the user will be alerted that the full import is
277                # complete
278                self.dataset.finish(num_rows)
279
280            # Add the original log for the primary dataset
281            if primary_dataset_original_log:
282                self.dataset.log("Original dataset log included below:\n")
283                with self.dataset.get_log_path().open("a") as outfile:
284                    outfile.write(primary_dataset_original_log)
285
286
287    @staticmethod
288    def process_metadata(metadata):
289        """
290        Process metadata for import
291
292        :param dict metadata:  Metadata of import
293        :return:  Relevant metadata, or `None` if parsing error
294        """
295        # get rid of some keys that are server-specific and don't need to
296        # be stored (or don't correspond to database columns)
297        try:
298            metadata.pop("current_4CAT_version")
299            metadata.pop("id")
300            metadata.pop("job")
301            metadata.pop("is_private")
302            metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
303
304            # extra params are stored as JSON...
305            metadata["parameters"] = json.loads(metadata["parameters"])
306            if "copied_from" in metadata["parameters"]:
307                metadata["parameters"].pop("copied_from")
308            metadata["parameters"] = json.dumps(metadata["parameters"])
309
310        except (ValueError, KeyError):
311            # we don't need all this metadata but it still should be present,
312            # otherwise something is wrong with the data format
313            return None
314
315        return metadata
316
317    def create_dataset(self, metadata, original_key, primary=False):
318        """
319        Create a new dataset
320        """
321        if primary:
322            self.dataset.update_status(f"Importing primary dataset {original_key}.")
323            # if this is the first dataset we're importing, make it the
324            # processor's "own" dataset. the key has already been set to
325            # the imported dataset's key via ensure_key() (or a new unqiue
326            # key if it already existed on this server)
327            # by making it the "own" dataset, the user initiating the
328            # import will see the imported dataset as the "result" of their
329            # import query in the interface, similar to the workflow for
330            # other data sources
331            new_dataset = self.dataset
332
333            # Update metadata and file
334            metadata.pop("key")  # key already OK (see above)
335            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
336
337        else:
338            self.dataset.update_status(f"Importing child dataset {original_key}.")
339            # supernumerary datasets - handle on their own
340            # these include any children of imported datasets
341            try:
342                DataSet(key=metadata["key"], db=self.db, modules=self.modules)
343
344                # if we *haven't* thrown a DatasetException now, then the
345                # key is already in use, so create a "dummy" dataset and
346                # overwrite it with the metadata we have (except for the
347                # key). this ensures that a new unique key will be
348                # generated.
349                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
350                metadata.pop("key")
351                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
352
353            except DataSetException:
354                # this is *good* since it means the key doesn't exist, so
355                # we can re-use the key of the imported dataset
356                self.db.insert("datasets", data=metadata)
357                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
358
359        if new_dataset.key != original_key:
360            # could not use original key because it was already in use
361            # so update any references to use the new key
362            self.remapped_keys[original_key] = new_dataset.key
363            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
364                                      f"{new_dataset.key} instead of key {original_key}!")
365
366        # refresh object, make sure it's in sync with the database
367        self.created_datasets.add(new_dataset.key)
368        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
369        current_log = None
370        if new_dataset.key == self.dataset.key:
371            # this ensures that the first imported dataset becomes the
372            # processor's "own" dataset, and that the import logs go to
373            # that dataset's log file. For later imports, this evaluates to
374            # False.
375
376            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
377            current_log = self.dataset.get_log_path().read_text()
378            # Update the dataset
379            self.dataset = new_dataset
380
381        # if the key of the parent dataset was changed, change the
382        # reference to it that the child dataset has
383        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
384            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
385
386        # update some attributes that should come from the new server, not
387        # the old
388        new_dataset.creator = self.dataset_owner
389        new_dataset.original_timestamp = new_dataset.timestamp
390        new_dataset.imported = True
391        new_dataset.timestamp = int(time.time())
392        new_dataset.db.commit()
393
394        # make sure the dataset path uses the new key and local dataset
395        # path settings. this also makes sure the log file is created in
396        # the right place (since it is derived from the results file path)
397        extension = metadata["result_file"].split(".")[-1]
398        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
399        if not updated:
400            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
401
402        if current_log:
403            # Add the current log to the new dataset
404            with new_dataset.get_log_path().open("a") as outfile:
405                outfile.write(current_log)
406
407        return new_dataset
408
409
410    def process_urls(self):
411        """
412        Import 4CAT dataset from another 4CAT server
413
414        Interfaces with another 4CAT server to transfer a dataset's metadata,
415        data files and child datasets.
416        """
417        urls = [url.strip() for url in self.parameters.get("url").split(",")]
418        self.base = urls[0].split("/results/")[0]
419        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
420        api_key = self.parameters.get("api-key")
421
422        imported = []  # successfully imported datasets
423        failed_imports = []  # keys that failed to import
424        num_rows = 0  # will be used later
425
426        # we can add support for multiple datasets later by removing
427        # this part!
428        keys = [keys[0]]
429
430        while keys:
431            dataset_key = keys.pop(0)
432
433            self.halt_and_catch_fire()
434            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
435
436            # first, metadata!
437            try:
438                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
439                metadata = metadata.json()
440            except FourcatImportException as e:
441                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
442                continue
443            except ValueError:
444                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
445                continue
446
447            # copying empty datasets doesn't really make sense
448            if metadata["num_rows"] == 0:
449                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
450                failed_imports.append(dataset_key)
451                continue
452
453            metadata = self.process_metadata(metadata)
454            if metadata is None:
455                self.dataset.update_status(f"Metadata for dataset {dataset_key} incomplete, skipping")
456                failed_imports.append(dataset_key)
457                continue
458
459            # create the new dataset
460            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
461
462            # then, the log
463            self.halt_and_catch_fire()
464            try:
465                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
466                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
467                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
468                logpath = new_dataset.get_log_path()
469                new_dataset.log("Original dataset log included below:")
470                with logpath.open("a") as outfile:
471                    outfile.write(log.text)
472            except FourcatImportException as e:
473                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
474                failed_imports.append(dataset_key)
475                continue
476            except ValueError:
477                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
478                failed_imports.append(dataset_key)
479                continue
480
481            # then, the results
482            self.halt_and_catch_fire()
483            try:
484                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
485                datapath = new_dataset.get_results_path()
486                SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
487
488                if not imported:
489                    # first dataset - use num rows as 'overall'
490                    num_rows = metadata["num_rows"]
491
492            except FourcatImportException as e:
493                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
494                if new_dataset.key != self.dataset.key:
495                    new_dataset.delete()
496                continue
497
498            except ValueError:
499                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
500                failed_imports.append(dataset_key)
501                continue
502
503            # finally, the kids
504            self.halt_and_catch_fire()
505            try:
506                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
507                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
508                children = children.json()
509            except FourcatImportException as e:
510                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
511                failed_imports.append(dataset_key)
512                continue
513            except ValueError:
514                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
515                failed_imports.append(dataset_key)
516                continue
517
518            for child in children:
519                keys.append(child)
520                self.dataset.log(f"Adding child dataset {child} to import queue")
521
522            # done - remember that we've imported this one
523            imported.append(new_dataset)
524            new_dataset.update_status(metadata["status"])
525
526            if new_dataset.key != self.dataset.key:
527                # only finish if this is not the 'main' dataset, or the user
528                # will think the whole import is done
529                new_dataset.finish(metadata["num_rows"])
530
531        # todo: this part needs updating if/when we support importing multiple datasets!
532        if failed_imports:
533            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
534                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
535                                       f"dataset log file for details.", is_final=True)
536        else:
537            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
538                                       is_final=True)
539
540        if not self.dataset.is_finished():
541            # now all related datasets are imported, we can finish the 'main'
542            # dataset, and the user will be alerted that the full import is
543            # complete
544            self.dataset.finish(num_rows)
545
546    def halt_and_catch_fire(self):
547        """
548        Clean up on interrupt
549
550        There are multiple places in the code where we can bail out on an
551        interrupt, so abstract that away in its own function.
552        :return:
553        """
554        if self.interrupted:
555            # resuming is impossible because the original dataset (which
556            # has the list of URLs to import) has probably been
557            # overwritten by this point
558            deletables = [k for k in self.created_datasets if k != self.dataset.key]
559            for deletable in deletables:
560                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
561
562            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
563                                           f"will need to initiate the import again.")
564
565            raise ProcessorInterruptedException()
566
567    @staticmethod
568    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
569        """
570        Get dataset component from 4CAT export API
571
572        :param str base:  Server URL base to import from
573        :param str dataset_key:  Key of dataset to import
574        :param str api_key:  API authentication token
575        :param str component:  Component to retrieve
576        :return:  HTTP response object
577        """
578        try:
579            if component == "data" and datapath:
580                # Stream data
581                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
582                                  headers={
583                                            "User-Agent": "4cat/import",
584                                            "Authentication": api_key
585                                        }) as r:
586                    r.raise_for_status()
587                    with datapath.open("wb") as outfile:
588                        for chunk in r.iter_content(chunk_size=8192):
589                            outfile.write(chunk)
590                return r
591            else:
592                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
593                    "User-Agent": "4cat/import",
594                    "Authentication": api_key
595                })
596        except requests.Timeout:
597            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
598                                         f"accessible to external connections and try again.")
599        except requests.RequestException as e:
600            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
601                                         f"accessible to external connections and try again.")
602
603        if response.status_code == 404:
604            raise FourcatImportException(
605                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
606                f"a valid dataset.")
607        elif response.status_code in (401, 403):
608            raise FourcatImportException(
609                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
610                f"dataset and are using the correct API key.")
611        elif response.status_code != 200:
612            raise FourcatImportException(
613                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
614
615        return response
616
617    @staticmethod
618    def validate_query(query, request, config):
619        """
620        Validate custom data input
621
622        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
623        some metadata.
624
625        :param dict query:  Query parameters, from client-side.
626        :param request:  Flask request
627        :param ConfigManager|None config:  Configuration reader (context-aware)
628        :return dict:  Safe query parameters
629        """
630        if query.get("method") == "zip":
631            filename = ""
632            if "option-data_upload-entries" in request.form:
633                # First pass sends list of files in the zip
634                pass
635            elif "option-data_upload" in request.files:
636                # Second pass sends the actual file
637                file = request.files["option-data_upload"]
638                if not file:
639                    raise QueryParametersException("No file uploaded.")
640
641                if not file.filename.endswith(".zip"):
642                    raise QueryParametersException("Uploaded file must be a ZIP file.")
643
644                filename = file.filename
645            else:
646                raise QueryParametersException("No file was offered for upload.")
647
648            return {
649                "method": "zip",
650                "filename": filename
651            }
652        elif query.get("method") == "url":
653            urls = query.get("url")
654            if not urls:
655                raise QueryParametersException("Provide at least one dataset URL.")
656
657            urls = urls.split(",")
658            bases = set([url.split("/results/")[0].lower() for url in urls])
659            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
660
661            if len(keys) != 1:
662                # todo: change this to < 1 if we allow multiple datasets
663                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
664
665            if len(bases) != 1:
666                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
667                                                "one 4CAT server at a time.")
668
669            base = urls[0].split("/results/")[0]
670            try:
671                # test if API key is valid and server is reachable
672                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
673            except FourcatImportException as e:
674                raise QueryParametersException(str(e))
675
676            try:
677                # test if we get a response we can parse
678                metadata = test.json()
679            except ValueError:
680                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
681
682            version = get_software_version()
683
684            if metadata.get("current_4CAT_version") != version:
685                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
686                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
687                                               "sure both are running the same version of 4CAT and try again.")
688
689            # OK, we can import at least one dataset
690            return {
691                "url": ",".join(urls),
692                "api-key": query.get("api-key")
693            }
694        else:
695            raise QueryParametersException("Import method not yet implemented.")
696
697    @staticmethod
698    def get_keys_from_urls(urls):
699        """
700        Get dataset keys from 4CAT URLs
701
702        :param list urls:  List of URLs
703        :return list:  List of keys
704        """
705        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
706
707    @staticmethod
708    def ensure_key(query):
709        """
710        Determine key for dataset generated by this processor
711
712        When importing datasets, it's necessary to determine the key of the
713        dataset that is created before it is actually created, because we want
714        to keep the original key of the imported dataset if possible. Luckily,
715        we can deduce it from the URL we're importing the dataset from.
716
717        :param dict query:  Input from the user, through the front-end
718        :return str:  Desired dataset key
719        """
720        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
721        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
722        urls = query.get("url", "").split(",")
723        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
724        return keys[0]
class FourcatImportException(common.lib.exceptions.FourcatException):
19class FourcatImportException(FourcatException):
20    pass

Base 4CAT exception class

class SearchImportFromFourcat(backend.lib.processor.BasicProcessor):
 23class SearchImportFromFourcat(BasicProcessor):
 24    type = "import_4cat-search"  # job ID
 25    category = "Search"  # category
 26    title = "Import 4CAT dataset and analyses"  # title displayed in UI
 27    description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)"  # description displayed in UI
 28    is_local = False  # Whether this datasource is locally scraped
 29    is_static = False  # Whether this datasource is still updated
 30
 31    max_workers = 1  # this cannot be more than 1, else things get VERY messy
 32
 33    created_datasets = None
 34    base = None
 35    remapped_keys = None
 36    dataset_owner = None
 37
 38    @classmethod
 39    def get_options(cls, parent_dataset=None, config=None) -> dict:
 40        """
 41        Get processor options
 42
 43        :param DataSet parent_dataset:  An object representing the dataset that
 44          the processor would or was run on
 45        :param ConfigManager|None config:  Configuration reader (context-aware)
 46        """
 47        return {
 48            "intro": {
 49                "type": UserInput.OPTION_INFO,
 50                "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. "
 51                        "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. "
 52                        "You can find the current version in the footer at the bottom of the interface."
 53            },
 54            "method": {
 55                "type": UserInput.OPTION_CHOICE,
 56                "help": "Import Type",
 57                "options": {
 58                    "zip": "Zip File",
 59                    "url": "4CAT URL",
 60                },
 61                "default": "url"
 62            },
 63            "url": {
 64                "type": UserInput.OPTION_TEXT,
 65                "help": "Dataset URL",
 66                "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.",
 67                "requires": "method^=url"
 68            },
 69            "intro2": {
 70                "type": UserInput.OPTION_INFO,
 71                "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need "
 72                        "an API key from **the server you are importing from**, not the one you are looking at right now. "
 73                        "Additionally, you need to have owner access to the dataset you want to import.",
 74                "requires": "method^=url"
 75            },
 76            "api-key": {
 77                "type": UserInput.OPTION_TEXT,
 78                "help": "4CAT API Key",
 79                "sensitive": True,
 80                "cache": True,
 81                "requires": "method^=url"
 82            },
 83            "data_upload": {
 84                "type": UserInput.OPTION_FILE,
 85                "help": "File",
 86                "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.",
 87                "requires": "method^=zip"
 88            },
 89
 90        }
 91
 92    def process(self):
 93        """
 94        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 95        """
 96        self.created_datasets = set()  # keys of created datasets - may not be successful!
 97        self.remapped_keys = {}  # changed dataset keys
 98        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 99        try:
100            if self.parameters.get("method") == "zip":
101                self.process_zip()
102            else:
103                self.process_urls()
104        except Exception as e:
105            # Catch all exceptions and finish the job with an error
106            # Resuming is impossible because this dataset was overwritten with the importing dataset
107            # halt_and_catch_fire() will clean up and delete the datasets that were created
108            self.interrupted = True
109            try:
110                self.halt_and_catch_fire()
111            except ProcessorInterruptedException:
112                pass
113            except InFailedSqlTransaction:
114                # Catch database issue and retry
115                self.db.rollback()
116                try:
117                    self.halt_and_catch_fire()
118                except ProcessorInterruptedException:
119                    pass
120            # Reraise the original exception for logging
121            raise e
122
123    def after_create(query, dataset, request):
124        """
125        Hook to execute after the dataset for this source has been created
126
127        In this case, put the file in a temporary location so it can be
128        processed properly by the related Job later.
129
130        :param dict query:  Sanitised query parameters
131        :param DataSet dataset:  Dataset created for this query
132        :param request:  Flask request submitted for its creation
133        """
134        if query.get("method") == "zip":
135            file = request.files["option-data_upload"]
136            file.seek(0)
137            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
138                while True:
139                    chunk = file.read(1024)
140                    if len(chunk) == 0:
141                        break
142                    outfile.write(chunk)
143        else:
144            # nothing to do for URLs
145            pass
146
147
148    def process_zip(self):
149        """
150        Import 4CAT dataset from a ZIP file
151        """
152        self.dataset.update_status("Importing datasets and analyses from ZIP file.")
153        temp_file = self.dataset.get_results_path().with_suffix(".importing")
154
155        imported = []
156        processed_files = 1 # take into account the export.log file
157        failed_imports = []
158        primary_dataset_original_log = None
159        with zipfile.ZipFile(temp_file, "r") as zip_ref:
160            zip_contents = zip_ref.namelist()
161
162            # Get all metadata files and determine primary dataset
163            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
164            if not metadata_files:
165                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
166                return
167
168            # Get the primary dataset
169            primary_dataset_keys = set()
170            datasets = []
171            parent_child_mapping = {}
172            for file in metadata_files:
173                with zip_ref.open(file) as f:
174                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
175                    metadata = json.loads(content)
176                    if not metadata.get("key_parent"):
177                        primary_dataset_keys.add(metadata.get("key"))
178                        datasets.append(metadata)
179                    else:
180                        # Store the mapping of parent to child datasets
181                        parent_key = metadata.get("key_parent")
182                        if parent_key not in parent_child_mapping:
183                            parent_child_mapping[parent_key] = []
184                        parent_child_mapping[parent_key].append(metadata)
185
186            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
187            if len(primary_dataset_keys) != 1:
188                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
189                return
190
191            # Import datasets
192            while datasets:
193                self.halt_and_catch_fire()
194
195                # Create the datasets
196                metadata = datasets.pop(0)
197                dataset_key = metadata.get("key")
198                processed_metadata = self.process_metadata(metadata)
199                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
200                processed_files += 1
201
202                # Copy the log file
203                self.halt_and_catch_fire()
204                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
205                if log_filename in zip_contents:
206                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
207                    with zip_ref.open(log_filename) as f:
208                        content = f.read().decode('utf-8')
209                        if new_dataset.key == self.dataset.key:
210                            # Hold the original log for the primary dataset and add at the end
211                            primary_dataset_original_log = content
212                        else:
213                            new_dataset.log("Original dataset log included below:")
214                            with new_dataset.get_log_path().open("a") as outfile:
215                                outfile.write(content)
216                    processed_files += 1
217                else:
218                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
219
220                # Copy the results
221                self.halt_and_catch_fire()
222                results_filename = metadata["result_file"]
223                if results_filename in zip_contents:
224                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
225                    with zip_ref.open(results_filename) as f:
226                        with new_dataset.get_results_path().open("wb") as outfile:
227                            outfile.write(f.read())
228                    processed_files += 1
229
230                    if not imported:
231                        # first dataset - use num rows as 'overall'
232                        num_rows = metadata["num_rows"]
233                else:
234                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
235                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
236                    failed_imports.append(dataset_key)
237                    continue
238
239                # finally, the kids
240                self.halt_and_catch_fire()
241                if dataset_key in parent_child_mapping:
242                    datasets.extend(parent_child_mapping[dataset_key])
243                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
244
245                # done - remember that we've imported this one
246                imported.append(new_dataset)
247                new_dataset.update_status(metadata["status"])
248
249                if new_dataset.key != self.dataset.key:
250                    # only finish if this is not the 'main' dataset, or the user
251                    # will think the whole import is done
252                    new_dataset.finish(metadata["num_rows"])
253
254            # Check that all files were processed
255            missed_files = []
256            if len(zip_contents) != processed_files:
257                for file in zip_contents:
258                    if file not in processed_files:
259                        missed_files.append(file)
260
261            # todo: this part needs updating if/when we support importing multiple datasets!
262            if failed_imports:
263                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
264                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
265                                           f"dataset log file for details.", is_final=True)
266            elif missed_files:
267                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
268                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
269                                           f"{len(missed_files)} files were not successfully imported. Check the "
270                                           f"dataset log file for details.", is_final=True)
271            else:
272                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
273                                           is_final=True)
274
275            if not self.dataset.is_finished():
276                # now all related datasets are imported, we can finish the 'main'
277                # dataset, and the user will be alerted that the full import is
278                # complete
279                self.dataset.finish(num_rows)
280
281            # Add the original log for the primary dataset
282            if primary_dataset_original_log:
283                self.dataset.log("Original dataset log included below:\n")
284                with self.dataset.get_log_path().open("a") as outfile:
285                    outfile.write(primary_dataset_original_log)
286
287
288    @staticmethod
289    def process_metadata(metadata):
290        """
291        Process metadata for import
292
293        :param dict metadata:  Metadata of import
294        :return:  Relevant metadata, or `None` if parsing error
295        """
296        # get rid of some keys that are server-specific and don't need to
297        # be stored (or don't correspond to database columns)
298        try:
299            metadata.pop("current_4CAT_version")
300            metadata.pop("id")
301            metadata.pop("job")
302            metadata.pop("is_private")
303            metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
304
305            # extra params are stored as JSON...
306            metadata["parameters"] = json.loads(metadata["parameters"])
307            if "copied_from" in metadata["parameters"]:
308                metadata["parameters"].pop("copied_from")
309            metadata["parameters"] = json.dumps(metadata["parameters"])
310
311        except (ValueError, KeyError):
312            # we don't need all this metadata but it still should be present,
313            # otherwise something is wrong with the data format
314            return None
315
316        return metadata
317
318    def create_dataset(self, metadata, original_key, primary=False):
319        """
320        Create a new dataset
321        """
322        if primary:
323            self.dataset.update_status(f"Importing primary dataset {original_key}.")
324            # if this is the first dataset we're importing, make it the
325            # processor's "own" dataset. the key has already been set to
326            # the imported dataset's key via ensure_key() (or a new unqiue
327            # key if it already existed on this server)
328            # by making it the "own" dataset, the user initiating the
329            # import will see the imported dataset as the "result" of their
330            # import query in the interface, similar to the workflow for
331            # other data sources
332            new_dataset = self.dataset
333
334            # Update metadata and file
335            metadata.pop("key")  # key already OK (see above)
336            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
337
338        else:
339            self.dataset.update_status(f"Importing child dataset {original_key}.")
340            # supernumerary datasets - handle on their own
341            # these include any children of imported datasets
342            try:
343                DataSet(key=metadata["key"], db=self.db, modules=self.modules)
344
345                # if we *haven't* thrown a DatasetException now, then the
346                # key is already in use, so create a "dummy" dataset and
347                # overwrite it with the metadata we have (except for the
348                # key). this ensures that a new unique key will be
349                # generated.
350                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
351                metadata.pop("key")
352                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
353
354            except DataSetException:
355                # this is *good* since it means the key doesn't exist, so
356                # we can re-use the key of the imported dataset
357                self.db.insert("datasets", data=metadata)
358                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
359
360        if new_dataset.key != original_key:
361            # could not use original key because it was already in use
362            # so update any references to use the new key
363            self.remapped_keys[original_key] = new_dataset.key
364            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
365                                      f"{new_dataset.key} instead of key {original_key}!")
366
367        # refresh object, make sure it's in sync with the database
368        self.created_datasets.add(new_dataset.key)
369        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
370        current_log = None
371        if new_dataset.key == self.dataset.key:
372            # this ensures that the first imported dataset becomes the
373            # processor's "own" dataset, and that the import logs go to
374            # that dataset's log file. For later imports, this evaluates to
375            # False.
376
377            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
378            current_log = self.dataset.get_log_path().read_text()
379            # Update the dataset
380            self.dataset = new_dataset
381
382        # if the key of the parent dataset was changed, change the
383        # reference to it that the child dataset has
384        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
385            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
386
387        # update some attributes that should come from the new server, not
388        # the old
389        new_dataset.creator = self.dataset_owner
390        new_dataset.original_timestamp = new_dataset.timestamp
391        new_dataset.imported = True
392        new_dataset.timestamp = int(time.time())
393        new_dataset.db.commit()
394
395        # make sure the dataset path uses the new key and local dataset
396        # path settings. this also makes sure the log file is created in
397        # the right place (since it is derived from the results file path)
398        extension = metadata["result_file"].split(".")[-1]
399        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
400        if not updated:
401            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
402
403        if current_log:
404            # Add the current log to the new dataset
405            with new_dataset.get_log_path().open("a") as outfile:
406                outfile.write(current_log)
407
408        return new_dataset
409
410
411    def process_urls(self):
412        """
413        Import 4CAT dataset from another 4CAT server
414
415        Interfaces with another 4CAT server to transfer a dataset's metadata,
416        data files and child datasets.
417        """
418        urls = [url.strip() for url in self.parameters.get("url").split(",")]
419        self.base = urls[0].split("/results/")[0]
420        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
421        api_key = self.parameters.get("api-key")
422
423        imported = []  # successfully imported datasets
424        failed_imports = []  # keys that failed to import
425        num_rows = 0  # will be used later
426
427        # we can add support for multiple datasets later by removing
428        # this part!
429        keys = [keys[0]]
430
431        while keys:
432            dataset_key = keys.pop(0)
433
434            self.halt_and_catch_fire()
435            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
436
437            # first, metadata!
438            try:
439                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
440                metadata = metadata.json()
441            except FourcatImportException as e:
442                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
443                continue
444            except ValueError:
445                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
446                continue
447
448            # copying empty datasets doesn't really make sense
449            if metadata["num_rows"] == 0:
450                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
451                failed_imports.append(dataset_key)
452                continue
453
454            metadata = self.process_metadata(metadata)
455            if metadata is None:
456                self.dataset.update_status(f"Metadata for dataset {dataset_key} incomplete, skipping")
457                failed_imports.append(dataset_key)
458                continue
459
460            # create the new dataset
461            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
462
463            # then, the log
464            self.halt_and_catch_fire()
465            try:
466                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
467                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
468                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
469                logpath = new_dataset.get_log_path()
470                new_dataset.log("Original dataset log included below:")
471                with logpath.open("a") as outfile:
472                    outfile.write(log.text)
473            except FourcatImportException as e:
474                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
475                failed_imports.append(dataset_key)
476                continue
477            except ValueError:
478                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
479                failed_imports.append(dataset_key)
480                continue
481
482            # then, the results
483            self.halt_and_catch_fire()
484            try:
485                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
486                datapath = new_dataset.get_results_path()
487                SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
488
489                if not imported:
490                    # first dataset - use num rows as 'overall'
491                    num_rows = metadata["num_rows"]
492
493            except FourcatImportException as e:
494                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
495                if new_dataset.key != self.dataset.key:
496                    new_dataset.delete()
497                continue
498
499            except ValueError:
500                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
501                failed_imports.append(dataset_key)
502                continue
503
504            # finally, the kids
505            self.halt_and_catch_fire()
506            try:
507                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
508                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
509                children = children.json()
510            except FourcatImportException as e:
511                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
512                failed_imports.append(dataset_key)
513                continue
514            except ValueError:
515                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
516                failed_imports.append(dataset_key)
517                continue
518
519            for child in children:
520                keys.append(child)
521                self.dataset.log(f"Adding child dataset {child} to import queue")
522
523            # done - remember that we've imported this one
524            imported.append(new_dataset)
525            new_dataset.update_status(metadata["status"])
526
527            if new_dataset.key != self.dataset.key:
528                # only finish if this is not the 'main' dataset, or the user
529                # will think the whole import is done
530                new_dataset.finish(metadata["num_rows"])
531
532        # todo: this part needs updating if/when we support importing multiple datasets!
533        if failed_imports:
534            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
535                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
536                                       f"dataset log file for details.", is_final=True)
537        else:
538            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
539                                       is_final=True)
540
541        if not self.dataset.is_finished():
542            # now all related datasets are imported, we can finish the 'main'
543            # dataset, and the user will be alerted that the full import is
544            # complete
545            self.dataset.finish(num_rows)
546
547    def halt_and_catch_fire(self):
548        """
549        Clean up on interrupt
550
551        There are multiple places in the code where we can bail out on an
552        interrupt, so abstract that away in its own function.
553        :return:
554        """
555        if self.interrupted:
556            # resuming is impossible because the original dataset (which
557            # has the list of URLs to import) has probably been
558            # overwritten by this point
559            deletables = [k for k in self.created_datasets if k != self.dataset.key]
560            for deletable in deletables:
561                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
562
563            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
564                                           f"will need to initiate the import again.")
565
566            raise ProcessorInterruptedException()
567
568    @staticmethod
569    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
570        """
571        Get dataset component from 4CAT export API
572
573        :param str base:  Server URL base to import from
574        :param str dataset_key:  Key of dataset to import
575        :param str api_key:  API authentication token
576        :param str component:  Component to retrieve
577        :return:  HTTP response object
578        """
579        try:
580            if component == "data" and datapath:
581                # Stream data
582                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
583                                  headers={
584                                            "User-Agent": "4cat/import",
585                                            "Authentication": api_key
586                                        }) as r:
587                    r.raise_for_status()
588                    with datapath.open("wb") as outfile:
589                        for chunk in r.iter_content(chunk_size=8192):
590                            outfile.write(chunk)
591                return r
592            else:
593                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
594                    "User-Agent": "4cat/import",
595                    "Authentication": api_key
596                })
597        except requests.Timeout:
598            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
599                                         f"accessible to external connections and try again.")
600        except requests.RequestException as e:
601            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
602                                         f"accessible to external connections and try again.")
603
604        if response.status_code == 404:
605            raise FourcatImportException(
606                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
607                f"a valid dataset.")
608        elif response.status_code in (401, 403):
609            raise FourcatImportException(
610                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
611                f"dataset and are using the correct API key.")
612        elif response.status_code != 200:
613            raise FourcatImportException(
614                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
615
616        return response
617
618    @staticmethod
619    def validate_query(query, request, config):
620        """
621        Validate custom data input
622
623        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
624        some metadata.
625
626        :param dict query:  Query parameters, from client-side.
627        :param request:  Flask request
628        :param ConfigManager|None config:  Configuration reader (context-aware)
629        :return dict:  Safe query parameters
630        """
631        if query.get("method") == "zip":
632            filename = ""
633            if "option-data_upload-entries" in request.form:
634                # First pass sends list of files in the zip
635                pass
636            elif "option-data_upload" in request.files:
637                # Second pass sends the actual file
638                file = request.files["option-data_upload"]
639                if not file:
640                    raise QueryParametersException("No file uploaded.")
641
642                if not file.filename.endswith(".zip"):
643                    raise QueryParametersException("Uploaded file must be a ZIP file.")
644
645                filename = file.filename
646            else:
647                raise QueryParametersException("No file was offered for upload.")
648
649            return {
650                "method": "zip",
651                "filename": filename
652            }
653        elif query.get("method") == "url":
654            urls = query.get("url")
655            if not urls:
656                raise QueryParametersException("Provide at least one dataset URL.")
657
658            urls = urls.split(",")
659            bases = set([url.split("/results/")[0].lower() for url in urls])
660            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
661
662            if len(keys) != 1:
663                # todo: change this to < 1 if we allow multiple datasets
664                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
665
666            if len(bases) != 1:
667                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
668                                                "one 4CAT server at a time.")
669
670            base = urls[0].split("/results/")[0]
671            try:
672                # test if API key is valid and server is reachable
673                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
674            except FourcatImportException as e:
675                raise QueryParametersException(str(e))
676
677            try:
678                # test if we get a response we can parse
679                metadata = test.json()
680            except ValueError:
681                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
682
683            version = get_software_version()
684
685            if metadata.get("current_4CAT_version") != version:
686                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
687                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
688                                               "sure both are running the same version of 4CAT and try again.")
689
690            # OK, we can import at least one dataset
691            return {
692                "url": ",".join(urls),
693                "api-key": query.get("api-key")
694            }
695        else:
696            raise QueryParametersException("Import method not yet implemented.")
697
698    @staticmethod
699    def get_keys_from_urls(urls):
700        """
701        Get dataset keys from 4CAT URLs
702
703        :param list urls:  List of URLs
704        :return list:  List of keys
705        """
706        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
707
708    @staticmethod
709    def ensure_key(query):
710        """
711        Determine key for dataset generated by this processor
712
713        When importing datasets, it's necessary to determine the key of the
714        dataset that is created before it is actually created, because we want
715        to keep the original key of the imported dataset if possible. Luckily,
716        we can deduce it from the URL we're importing the dataset from.
717
718        :param dict query:  Input from the user, through the front-end
719        :return str:  Desired dataset key
720        """
721        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
722        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
723        urls = query.get("url", "").split(",")
724        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
725        return keys[0]

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:

@classmethod
def is_compatible_with(cls, module=None, config=None):
    return module.type == "linguistic-features"
type = 'import_4cat-search'
category = 'Search'
title = 'Import 4CAT dataset and analyses'
description = 'Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)'
is_local = False
is_static = False
max_workers = 1
created_datasets = None
base = None
remapped_keys = None
dataset_owner = None
@classmethod
def get_options(cls, parent_dataset=None, config=None) -> dict:
38    @classmethod
39    def get_options(cls, parent_dataset=None, config=None) -> dict:
40        """
41        Get processor options
42
43        :param DataSet parent_dataset:  An object representing the dataset that
44          the processor would or was run on
45        :param ConfigManager|None config:  Configuration reader (context-aware)
46        """
47        return {
48            "intro": {
49                "type": UserInput.OPTION_INFO,
50                "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. "
51                        "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. "
52                        "You can find the current version in the footer at the bottom of the interface."
53            },
54            "method": {
55                "type": UserInput.OPTION_CHOICE,
56                "help": "Import Type",
57                "options": {
58                    "zip": "Zip File",
59                    "url": "4CAT URL",
60                },
61                "default": "url"
62            },
63            "url": {
64                "type": UserInput.OPTION_TEXT,
65                "help": "Dataset URL",
66                "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.",
67                "requires": "method^=url"
68            },
69            "intro2": {
70                "type": UserInput.OPTION_INFO,
71                "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need "
72                        "an API key from **the server you are importing from**, not the one you are looking at right now. "
73                        "Additionally, you need to have owner access to the dataset you want to import.",
74                "requires": "method^=url"
75            },
76            "api-key": {
77                "type": UserInput.OPTION_TEXT,
78                "help": "4CAT API Key",
79                "sensitive": True,
80                "cache": True,
81                "requires": "method^=url"
82            },
83            "data_upload": {
84                "type": UserInput.OPTION_FILE,
85                "help": "File",
86                "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.",
87                "requires": "method^=zip"
88            },
89
90        }

Get processor options

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would or was run on
  • ConfigManager|None config: Configuration reader (context-aware)
def process(self):
 92    def process(self):
 93        """
 94        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 95        """
 96        self.created_datasets = set()  # keys of created datasets - may not be successful!
 97        self.remapped_keys = {}  # changed dataset keys
 98        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 99        try:
100            if self.parameters.get("method") == "zip":
101                self.process_zip()
102            else:
103                self.process_urls()
104        except Exception as e:
105            # Catch all exceptions and finish the job with an error
106            # Resuming is impossible because this dataset was overwritten with the importing dataset
107            # halt_and_catch_fire() will clean up and delete the datasets that were created
108            self.interrupted = True
109            try:
110                self.halt_and_catch_fire()
111            except ProcessorInterruptedException:
112                pass
113            except InFailedSqlTransaction:
114                # Catch database issue and retry
115                self.db.rollback()
116                try:
117                    self.halt_and_catch_fire()
118                except ProcessorInterruptedException:
119                    pass
120            # Reraise the original exception for logging
121            raise e

Import 4CAT dataset either from another 4CAT server or from the uploaded zip file

def after_create(query, dataset, request):
123    def after_create(query, dataset, request):
124        """
125        Hook to execute after the dataset for this source has been created
126
127        In this case, put the file in a temporary location so it can be
128        processed properly by the related Job later.
129
130        :param dict query:  Sanitised query parameters
131        :param DataSet dataset:  Dataset created for this query
132        :param request:  Flask request submitted for its creation
133        """
134        if query.get("method") == "zip":
135            file = request.files["option-data_upload"]
136            file.seek(0)
137            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
138                while True:
139                    chunk = file.read(1024)
140                    if len(chunk) == 0:
141                        break
142                    outfile.write(chunk)
143        else:
144            # nothing to do for URLs
145            pass

Hook to execute after the dataset for this source has been created

In this case, put the file in a temporary location so it can be processed properly by the related Job later.

Parameters
  • dict query: Sanitised query parameters
  • DataSet dataset: Dataset created for this query
  • request: Flask request submitted for its creation
def process_zip(self):
148    def process_zip(self):
149        """
150        Import 4CAT dataset from a ZIP file
151        """
152        self.dataset.update_status("Importing datasets and analyses from ZIP file.")
153        temp_file = self.dataset.get_results_path().with_suffix(".importing")
154
155        imported = []
156        processed_files = 1 # take into account the export.log file
157        failed_imports = []
158        primary_dataset_original_log = None
159        with zipfile.ZipFile(temp_file, "r") as zip_ref:
160            zip_contents = zip_ref.namelist()
161
162            # Get all metadata files and determine primary dataset
163            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
164            if not metadata_files:
165                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
166                return
167
168            # Get the primary dataset
169            primary_dataset_keys = set()
170            datasets = []
171            parent_child_mapping = {}
172            for file in metadata_files:
173                with zip_ref.open(file) as f:
174                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
175                    metadata = json.loads(content)
176                    if not metadata.get("key_parent"):
177                        primary_dataset_keys.add(metadata.get("key"))
178                        datasets.append(metadata)
179                    else:
180                        # Store the mapping of parent to child datasets
181                        parent_key = metadata.get("key_parent")
182                        if parent_key not in parent_child_mapping:
183                            parent_child_mapping[parent_key] = []
184                        parent_child_mapping[parent_key].append(metadata)
185
186            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
187            if len(primary_dataset_keys) != 1:
188                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
189                return
190
191            # Import datasets
192            while datasets:
193                self.halt_and_catch_fire()
194
195                # Create the datasets
196                metadata = datasets.pop(0)
197                dataset_key = metadata.get("key")
198                processed_metadata = self.process_metadata(metadata)
199                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
200                processed_files += 1
201
202                # Copy the log file
203                self.halt_and_catch_fire()
204                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
205                if log_filename in zip_contents:
206                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
207                    with zip_ref.open(log_filename) as f:
208                        content = f.read().decode('utf-8')
209                        if new_dataset.key == self.dataset.key:
210                            # Hold the original log for the primary dataset and add at the end
211                            primary_dataset_original_log = content
212                        else:
213                            new_dataset.log("Original dataset log included below:")
214                            with new_dataset.get_log_path().open("a") as outfile:
215                                outfile.write(content)
216                    processed_files += 1
217                else:
218                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
219
220                # Copy the results
221                self.halt_and_catch_fire()
222                results_filename = metadata["result_file"]
223                if results_filename in zip_contents:
224                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
225                    with zip_ref.open(results_filename) as f:
226                        with new_dataset.get_results_path().open("wb") as outfile:
227                            outfile.write(f.read())
228                    processed_files += 1
229
230                    if not imported:
231                        # first dataset - use num rows as 'overall'
232                        num_rows = metadata["num_rows"]
233                else:
234                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
235                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
236                    failed_imports.append(dataset_key)
237                    continue
238
239                # finally, the kids
240                self.halt_and_catch_fire()
241                if dataset_key in parent_child_mapping:
242                    datasets.extend(parent_child_mapping[dataset_key])
243                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
244
245                # done - remember that we've imported this one
246                imported.append(new_dataset)
247                new_dataset.update_status(metadata["status"])
248
249                if new_dataset.key != self.dataset.key:
250                    # only finish if this is not the 'main' dataset, or the user
251                    # will think the whole import is done
252                    new_dataset.finish(metadata["num_rows"])
253
254            # Check that all files were processed
255            missed_files = []
256            if len(zip_contents) != processed_files:
257                for file in zip_contents:
258                    if file not in processed_files:
259                        missed_files.append(file)
260
261            # todo: this part needs updating if/when we support importing multiple datasets!
262            if failed_imports:
263                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
264                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
265                                           f"dataset log file for details.", is_final=True)
266            elif missed_files:
267                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
268                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
269                                           f"{len(missed_files)} files were not successfully imported. Check the "
270                                           f"dataset log file for details.", is_final=True)
271            else:
272                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
273                                           is_final=True)
274
275            if not self.dataset.is_finished():
276                # now all related datasets are imported, we can finish the 'main'
277                # dataset, and the user will be alerted that the full import is
278                # complete
279                self.dataset.finish(num_rows)
280
281            # Add the original log for the primary dataset
282            if primary_dataset_original_log:
283                self.dataset.log("Original dataset log included below:\n")
284                with self.dataset.get_log_path().open("a") as outfile:
285                    outfile.write(primary_dataset_original_log)

Import 4CAT dataset from a ZIP file

@staticmethod
def process_metadata(metadata):
288    @staticmethod
289    def process_metadata(metadata):
290        """
291        Process metadata for import
292
293        :param dict metadata:  Metadata of import
294        :return:  Relevant metadata, or `None` if parsing error
295        """
296        # get rid of some keys that are server-specific and don't need to
297        # be stored (or don't correspond to database columns)
298        try:
299            metadata.pop("current_4CAT_version")
300            metadata.pop("id")
301            metadata.pop("job")
302            metadata.pop("is_private")
303            metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
304
305            # extra params are stored as JSON...
306            metadata["parameters"] = json.loads(metadata["parameters"])
307            if "copied_from" in metadata["parameters"]:
308                metadata["parameters"].pop("copied_from")
309            metadata["parameters"] = json.dumps(metadata["parameters"])
310
311        except (ValueError, KeyError):
312            # we don't need all this metadata but it still should be present,
313            # otherwise something is wrong with the data format
314            return None
315
316        return metadata

Process metadata for import

Parameters
  • dict metadata: Metadata of import
Returns

Relevant metadata, or None if parsing error

def create_dataset(self, metadata, original_key, primary=False):
318    def create_dataset(self, metadata, original_key, primary=False):
319        """
320        Create a new dataset
321        """
322        if primary:
323            self.dataset.update_status(f"Importing primary dataset {original_key}.")
324            # if this is the first dataset we're importing, make it the
325            # processor's "own" dataset. the key has already been set to
326            # the imported dataset's key via ensure_key() (or a new unqiue
327            # key if it already existed on this server)
328            # by making it the "own" dataset, the user initiating the
329            # import will see the imported dataset as the "result" of their
330            # import query in the interface, similar to the workflow for
331            # other data sources
332            new_dataset = self.dataset
333
334            # Update metadata and file
335            metadata.pop("key")  # key already OK (see above)
336            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
337
338        else:
339            self.dataset.update_status(f"Importing child dataset {original_key}.")
340            # supernumerary datasets - handle on their own
341            # these include any children of imported datasets
342            try:
343                DataSet(key=metadata["key"], db=self.db, modules=self.modules)
344
345                # if we *haven't* thrown a DatasetException now, then the
346                # key is already in use, so create a "dummy" dataset and
347                # overwrite it with the metadata we have (except for the
348                # key). this ensures that a new unique key will be
349                # generated.
350                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
351                metadata.pop("key")
352                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
353
354            except DataSetException:
355                # this is *good* since it means the key doesn't exist, so
356                # we can re-use the key of the imported dataset
357                self.db.insert("datasets", data=metadata)
358                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
359
360        if new_dataset.key != original_key:
361            # could not use original key because it was already in use
362            # so update any references to use the new key
363            self.remapped_keys[original_key] = new_dataset.key
364            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
365                                      f"{new_dataset.key} instead of key {original_key}!")
366
367        # refresh object, make sure it's in sync with the database
368        self.created_datasets.add(new_dataset.key)
369        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
370        current_log = None
371        if new_dataset.key == self.dataset.key:
372            # this ensures that the first imported dataset becomes the
373            # processor's "own" dataset, and that the import logs go to
374            # that dataset's log file. For later imports, this evaluates to
375            # False.
376
377            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
378            current_log = self.dataset.get_log_path().read_text()
379            # Update the dataset
380            self.dataset = new_dataset
381
382        # if the key of the parent dataset was changed, change the
383        # reference to it that the child dataset has
384        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
385            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
386
387        # update some attributes that should come from the new server, not
388        # the old
389        new_dataset.creator = self.dataset_owner
390        new_dataset.original_timestamp = new_dataset.timestamp
391        new_dataset.imported = True
392        new_dataset.timestamp = int(time.time())
393        new_dataset.db.commit()
394
395        # make sure the dataset path uses the new key and local dataset
396        # path settings. this also makes sure the log file is created in
397        # the right place (since it is derived from the results file path)
398        extension = metadata["result_file"].split(".")[-1]
399        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
400        if not updated:
401            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
402
403        if current_log:
404            # Add the current log to the new dataset
405            with new_dataset.get_log_path().open("a") as outfile:
406                outfile.write(current_log)
407
408        return new_dataset

Create a new dataset

def process_urls(self):
411    def process_urls(self):
412        """
413        Import 4CAT dataset from another 4CAT server
414
415        Interfaces with another 4CAT server to transfer a dataset's metadata,
416        data files and child datasets.
417        """
418        urls = [url.strip() for url in self.parameters.get("url").split(",")]
419        self.base = urls[0].split("/results/")[0]
420        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
421        api_key = self.parameters.get("api-key")
422
423        imported = []  # successfully imported datasets
424        failed_imports = []  # keys that failed to import
425        num_rows = 0  # will be used later
426
427        # we can add support for multiple datasets later by removing
428        # this part!
429        keys = [keys[0]]
430
431        while keys:
432            dataset_key = keys.pop(0)
433
434            self.halt_and_catch_fire()
435            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
436
437            # first, metadata!
438            try:
439                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
440                metadata = metadata.json()
441            except FourcatImportException as e:
442                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
443                continue
444            except ValueError:
445                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
446                continue
447
448            # copying empty datasets doesn't really make sense
449            if metadata["num_rows"] == 0:
450                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
451                failed_imports.append(dataset_key)
452                continue
453
454            metadata = self.process_metadata(metadata)
455            if metadata is None:
456                self.dataset.update_status(f"Metadata for dataset {dataset_key} incomplete, skipping")
457                failed_imports.append(dataset_key)
458                continue
459
460            # create the new dataset
461            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
462
463            # then, the log
464            self.halt_and_catch_fire()
465            try:
466                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
467                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
468                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
469                logpath = new_dataset.get_log_path()
470                new_dataset.log("Original dataset log included below:")
471                with logpath.open("a") as outfile:
472                    outfile.write(log.text)
473            except FourcatImportException as e:
474                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
475                failed_imports.append(dataset_key)
476                continue
477            except ValueError:
478                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
479                failed_imports.append(dataset_key)
480                continue
481
482            # then, the results
483            self.halt_and_catch_fire()
484            try:
485                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
486                datapath = new_dataset.get_results_path()
487                SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
488
489                if not imported:
490                    # first dataset - use num rows as 'overall'
491                    num_rows = metadata["num_rows"]
492
493            except FourcatImportException as e:
494                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
495                if new_dataset.key != self.dataset.key:
496                    new_dataset.delete()
497                continue
498
499            except ValueError:
500                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
501                failed_imports.append(dataset_key)
502                continue
503
504            # finally, the kids
505            self.halt_and_catch_fire()
506            try:
507                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
508                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
509                children = children.json()
510            except FourcatImportException as e:
511                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
512                failed_imports.append(dataset_key)
513                continue
514            except ValueError:
515                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
516                failed_imports.append(dataset_key)
517                continue
518
519            for child in children:
520                keys.append(child)
521                self.dataset.log(f"Adding child dataset {child} to import queue")
522
523            # done - remember that we've imported this one
524            imported.append(new_dataset)
525            new_dataset.update_status(metadata["status"])
526
527            if new_dataset.key != self.dataset.key:
528                # only finish if this is not the 'main' dataset, or the user
529                # will think the whole import is done
530                new_dataset.finish(metadata["num_rows"])
531
532        # todo: this part needs updating if/when we support importing multiple datasets!
533        if failed_imports:
534            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
535                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
536                                       f"dataset log file for details.", is_final=True)
537        else:
538            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
539                                       is_final=True)
540
541        if not self.dataset.is_finished():
542            # now all related datasets are imported, we can finish the 'main'
543            # dataset, and the user will be alerted that the full import is
544            # complete
545            self.dataset.finish(num_rows)

Import 4CAT dataset from another 4CAT server

Interfaces with another 4CAT server to transfer a dataset's metadata, data files and child datasets.

def halt_and_catch_fire(self):
547    def halt_and_catch_fire(self):
548        """
549        Clean up on interrupt
550
551        There are multiple places in the code where we can bail out on an
552        interrupt, so abstract that away in its own function.
553        :return:
554        """
555        if self.interrupted:
556            # resuming is impossible because the original dataset (which
557            # has the list of URLs to import) has probably been
558            # overwritten by this point
559            deletables = [k for k in self.created_datasets if k != self.dataset.key]
560            for deletable in deletables:
561                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
562
563            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
564                                           f"will need to initiate the import again.")
565
566            raise ProcessorInterruptedException()

Clean up on interrupt

There are multiple places in the code where we can bail out on an interrupt, so abstract that away in its own function.

Returns
@staticmethod
def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
568    @staticmethod
569    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
570        """
571        Get dataset component from 4CAT export API
572
573        :param str base:  Server URL base to import from
574        :param str dataset_key:  Key of dataset to import
575        :param str api_key:  API authentication token
576        :param str component:  Component to retrieve
577        :return:  HTTP response object
578        """
579        try:
580            if component == "data" and datapath:
581                # Stream data
582                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
583                                  headers={
584                                            "User-Agent": "4cat/import",
585                                            "Authentication": api_key
586                                        }) as r:
587                    r.raise_for_status()
588                    with datapath.open("wb") as outfile:
589                        for chunk in r.iter_content(chunk_size=8192):
590                            outfile.write(chunk)
591                return r
592            else:
593                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
594                    "User-Agent": "4cat/import",
595                    "Authentication": api_key
596                })
597        except requests.Timeout:
598            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
599                                         f"accessible to external connections and try again.")
600        except requests.RequestException as e:
601            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
602                                         f"accessible to external connections and try again.")
603
604        if response.status_code == 404:
605            raise FourcatImportException(
606                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
607                f"a valid dataset.")
608        elif response.status_code in (401, 403):
609            raise FourcatImportException(
610                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
611                f"dataset and are using the correct API key.")
612        elif response.status_code != 200:
613            raise FourcatImportException(
614                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
615
616        return response

Get dataset component from 4CAT export API

Parameters
  • str base: Server URL base to import from
  • str dataset_key: Key of dataset to import
  • str api_key: API authentication token
  • str component: Component to retrieve
Returns

HTTP response object

@staticmethod
def validate_query(query, request, config):
618    @staticmethod
619    def validate_query(query, request, config):
620        """
621        Validate custom data input
622
623        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
624        some metadata.
625
626        :param dict query:  Query parameters, from client-side.
627        :param request:  Flask request
628        :param ConfigManager|None config:  Configuration reader (context-aware)
629        :return dict:  Safe query parameters
630        """
631        if query.get("method") == "zip":
632            filename = ""
633            if "option-data_upload-entries" in request.form:
634                # First pass sends list of files in the zip
635                pass
636            elif "option-data_upload" in request.files:
637                # Second pass sends the actual file
638                file = request.files["option-data_upload"]
639                if not file:
640                    raise QueryParametersException("No file uploaded.")
641
642                if not file.filename.endswith(".zip"):
643                    raise QueryParametersException("Uploaded file must be a ZIP file.")
644
645                filename = file.filename
646            else:
647                raise QueryParametersException("No file was offered for upload.")
648
649            return {
650                "method": "zip",
651                "filename": filename
652            }
653        elif query.get("method") == "url":
654            urls = query.get("url")
655            if not urls:
656                raise QueryParametersException("Provide at least one dataset URL.")
657
658            urls = urls.split(",")
659            bases = set([url.split("/results/")[0].lower() for url in urls])
660            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
661
662            if len(keys) != 1:
663                # todo: change this to < 1 if we allow multiple datasets
664                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
665
666            if len(bases) != 1:
667                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
668                                                "one 4CAT server at a time.")
669
670            base = urls[0].split("/results/")[0]
671            try:
672                # test if API key is valid and server is reachable
673                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
674            except FourcatImportException as e:
675                raise QueryParametersException(str(e))
676
677            try:
678                # test if we get a response we can parse
679                metadata = test.json()
680            except ValueError:
681                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
682
683            version = get_software_version()
684
685            if metadata.get("current_4CAT_version") != version:
686                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
687                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
688                                               "sure both are running the same version of 4CAT and try again.")
689
690            # OK, we can import at least one dataset
691            return {
692                "url": ",".join(urls),
693                "api-key": query.get("api-key")
694            }
695        else:
696            raise QueryParametersException("Import method not yet implemented.")

Validate custom data input

Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • ConfigManager|None config: Configuration reader (context-aware)
Returns

Safe query parameters

@staticmethod
def get_keys_from_urls(urls):
698    @staticmethod
699    def get_keys_from_urls(urls):
700        """
701        Get dataset keys from 4CAT URLs
702
703        :param list urls:  List of URLs
704        :return list:  List of keys
705        """
706        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]

Get dataset keys from 4CAT URLs

Parameters
  • list urls: List of URLs
Returns

List of keys

@staticmethod
def ensure_key(query):
708    @staticmethod
709    def ensure_key(query):
710        """
711        Determine key for dataset generated by this processor
712
713        When importing datasets, it's necessary to determine the key of the
714        dataset that is created before it is actually created, because we want
715        to keep the original key of the imported dataset if possible. Luckily,
716        we can deduce it from the URL we're importing the dataset from.
717
718        :param dict query:  Input from the user, through the front-end
719        :return str:  Desired dataset key
720        """
721        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
722        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
723        urls = query.get("url", "").split(",")
724        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
725        return keys[0]

Determine key for dataset generated by this processor

When importing datasets, it's necessary to determine the key of the dataset that is created before it is actually created, because we want to keep the original key of the imported dataset if possible. Luckily, we can deduce it from the URL we're importing the dataset from.

Parameters
  • dict query: Input from the user, through the front-end
Returns

Desired dataset key