Edit on GitHub

datasources.fourcat_import.import_4cat

Import datasets from other 4CATs

  1"""
  2Import datasets from other 4CATs
  3"""
  4import requests
  5import json
  6import time
  7import zipfile
  8from pathlib import Path
  9
 10from backend.lib.processor import BasicProcessor
 11from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException,
 12                                   DataSetException)
 13from common.lib.helpers import UserInput, get_software_version
 14from common.lib.dataset import DataSet
 15
 16
 17class FourcatImportException(FourcatException):
 18    pass
 19
 20
 21class SearchImportFromFourcat(BasicProcessor):
 22    type = "import_4cat-search"  # job ID
 23    category = "Search"  # category
 24    title = "Import 4CAT dataset and analyses"  # title displayed in UI
 25    description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)"  # description displayed in UI
 26    is_local = False  # Whether this datasource is locally scraped
 27    is_static = False  # Whether this datasource is still updated
 28
 29    max_workers = 1  # this cannot be more than 1, else things get VERY messy
 30
 31    options = {
 32        "intro": {
 33            "type": UserInput.OPTION_INFO,
 34            "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. "
 35                    "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. "
 36                    "You can find the current version in the footer at the bottom of the interface."
 37        },
 38        "method": {
 39            "type": UserInput.OPTION_CHOICE,
 40            "help": "Import Type",
 41            "options": {
 42                "zip": "Zip File",
 43                "url": "4CAT URL",
 44            },
 45            "default": "url"
 46        },
 47        "url": {
 48            "type": UserInput.OPTION_TEXT,
 49            "help": "Dataset URL",
 50            "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.",
 51            "requires": "method^=url"
 52        },
 53        "intro2": {
 54            "type": UserInput.OPTION_INFO,
 55            "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need "
 56                    "an API key from **the server you are importing from**, not the one you are looking at right now. "
 57                    "Additionally, you need to have owner access to the dataset you want to import.",
 58            "requires": "method^=url"
 59        },
 60        "api-key": {
 61            "type": UserInput.OPTION_TEXT,
 62            "help": "4CAT API Key",
 63            "sensitive": True,
 64            "cache": True,
 65            "requires": "method^=url"
 66        },
 67        "data_upload": {
 68            "type": UserInput.OPTION_FILE,
 69            "help": "File",
 70            "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.",
 71            "requires": "method^=zip"
 72        },
 73
 74    }
 75
 76    created_datasets = None
 77    base = None
 78    remapped_keys = None
 79    dataset_owner = None
 80
 81    def process(self):
 82        """
 83        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 84        """
 85        self.created_datasets = set()  # keys of created datasets - may not be successful!
 86        self.remapped_keys = {}  # changed dataset keys
 87        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 88        try:
 89            if self.parameters.get("method") == "zip":
 90                self.process_zip()
 91            else:
 92                self.process_urls()
 93        except Exception as e:
 94            # Catch all exceptions and finish the job with an error
 95            # Resuming is impossible because this dataset was overwritten with the importing dataset
 96            # halt_and_catch_fire() will clean up and delete the datasets that were created
 97            self.interrupted = True
 98            try:
 99                self.halt_and_catch_fire()
100            except ProcessorInterruptedException:
101                pass
102            # Reraise the original exception for logging
103            raise e
104
105    def after_create(query, dataset, request):
106        """
107        Hook to execute after the dataset for this source has been created
108
109        In this case, put the file in a temporary location so it can be
110        processed properly by the related Job later.
111
112        :param dict query:  Sanitised query parameters
113        :param DataSet dataset:  Dataset created for this query
114        :param request:  Flask request submitted for its creation
115        """
116        if query.get("method") == "zip":
117            file = request.files["option-data_upload"]
118            file.seek(0)
119            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
120                while True:
121                    chunk = file.read(1024)
122                    if len(chunk) == 0:
123                        break
124                    outfile.write(chunk)
125        else:
126            # nothing to do for URLs
127            pass
128
129
130    def process_zip(self):
131        """
132        Import 4CAT dataset from a ZIP file
133        """
134        self.dataset.update_status(f"Importing datasets and analyses from ZIP file.")
135        temp_file = self.dataset.get_results_path().with_suffix(".importing")
136
137        imported = []
138        processed_files = 1 # take into account the export.log file
139        failed_imports = []
140        primary_dataset_original_log = None
141        with zipfile.ZipFile(temp_file, "r") as zip_ref:
142            zip_contents = zip_ref.namelist()
143
144            # Get all metadata files and determine primary dataset
145            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
146            if not metadata_files:
147                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
148                return
149
150            # Get the primary dataset
151            primary_dataset_keys = set()
152            datasets = []
153            parent_child_mapping = {}
154            for file in metadata_files:
155                with zip_ref.open(file) as f:
156                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
157                    metadata = json.loads(content)
158                    if not metadata.get("key_parent"):
159                        primary_dataset_keys.add(metadata.get("key"))
160                        datasets.append(metadata)
161                    else:
162                        # Store the mapping of parent to child datasets
163                        parent_key = metadata.get("key_parent")
164                        if parent_key not in parent_child_mapping:
165                            parent_child_mapping[parent_key] = []
166                        parent_child_mapping[parent_key].append(metadata)
167
168            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
169            if len(primary_dataset_keys) != 1:
170                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
171                return
172
173            # Import datasets
174            while datasets:
175                self.halt_and_catch_fire()
176
177                # Create the datasets
178                metadata = datasets.pop(0)
179                dataset_key = metadata.get("key")
180                processed_metadata = self.process_metadata(metadata)
181                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
182                processed_files += 1
183
184                # Copy the log file
185                self.halt_and_catch_fire()
186                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
187                if log_filename in zip_contents:
188                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
189                    with zip_ref.open(log_filename) as f:
190                        content = f.read().decode('utf-8')
191                        if new_dataset.key == self.dataset.key:
192                            # Hold the original log for the primary dataset and add at the end
193                            primary_dataset_original_log = content
194                        else:
195                            new_dataset.log("Original dataset log included below:")
196                            with new_dataset.get_log_path().open("a") as outfile:
197                                outfile.write(content)
198                    processed_files += 1
199                else:
200                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
201
202                # Copy the results
203                self.halt_and_catch_fire()
204                results_filename = metadata["result_file"]
205                if results_filename in zip_contents:
206                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
207                    with zip_ref.open(results_filename) as f:
208                        with new_dataset.get_results_path().open("wb") as outfile:
209                            outfile.write(f.read())
210                    processed_files += 1
211
212                    if not imported:
213                        # first dataset - use num rows as 'overall'
214                        num_rows = metadata["num_rows"]
215                else:
216                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
217                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
218                    failed_imports.append(dataset_key)
219                    continue
220
221                # finally, the kids
222                self.halt_and_catch_fire()
223                if dataset_key in parent_child_mapping:
224                    datasets.extend(parent_child_mapping[dataset_key])
225                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
226
227                # done - remember that we've imported this one
228                imported.append(new_dataset)
229                new_dataset.update_status(metadata["status"])
230
231                if new_dataset.key != self.dataset.key:
232                    # only finish if this is not the 'main' dataset, or the user
233                    # will think the whole import is done
234                    new_dataset.finish(metadata["num_rows"])
235
236            # Check that all files were processed
237            missed_files = []
238            if len(zip_contents) != processed_files:
239                for file in zip_contents:
240                    if file not in processed_files:
241                        missed_files.append(file)
242
243            # todo: this part needs updating if/when we support importing multiple datasets!
244            if failed_imports:
245                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
246                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
247                                           f"dataset log file for details.", is_final=True)
248            elif missed_files:
249                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
250                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
251                                           f"{len(missed_files)} files were not successfully imported. Check the "
252                                           f"dataset log file for details.", is_final=True)
253            else:
254                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
255                                           is_final=True)
256
257            if not self.dataset.is_finished():
258                # now all related datasets are imported, we can finish the 'main'
259                # dataset, and the user will be alerted that the full import is
260                # complete
261                self.dataset.finish(num_rows)
262
263            # Add the original log for the primary dataset
264            if primary_dataset_original_log:
265                self.dataset.log("Original dataset log included below:\n")
266                with self.dataset.get_log_path().open("a") as outfile:
267                    outfile.write(primary_dataset_original_log)
268
269
270    @staticmethod
271    def process_metadata(metadata):
272        """
273        Process metadata for import
274        """
275        # get rid of some keys that are server-specific and don't need to
276        # be stored (or don't correspond to database columns)
277        metadata.pop("current_4CAT_version")
278        metadata.pop("id")
279        metadata.pop("job")
280        metadata.pop("is_private")
281        metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
282
283        # extra params are stored as JSON...
284        metadata["parameters"] = json.loads(metadata["parameters"])
285        if "copied_from" in metadata["parameters"]:
286            metadata["parameters"].pop("copied_from")
287        metadata["parameters"] = json.dumps(metadata["parameters"])
288
289        return metadata
290
291    def create_dataset(self, metadata, original_key, primary=False):
292        """
293        Create a new dataset
294        """
295        if primary:
296            self.dataset.update_status(f"Importing primary dataset {original_key}.")
297            # if this is the first dataset we're importing, make it the
298            # processor's "own" dataset. the key has already been set to
299            # the imported dataset's key via ensure_key() (or a new unqiue
300            # key if it already existed on this server)
301            # by making it the "own" dataset, the user initiating the
302            # import will see the imported dataset as the "result" of their
303            # import query in the interface, similar to the workflow for
304            # other data sources
305            new_dataset = self.dataset
306
307            # Update metadata and file
308            metadata.pop("key")  # key already OK (see above)
309            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
310
311        else:
312            self.dataset.update_status(f"Importing child dataset {original_key}.")
313            # supernumerary datasets - handle on their own
314            # these include any children of imported datasets
315            try:
316                key_exists = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
317
318                # if we *haven't* thrown a DatasetException now, then the
319                # key is already in use, so create a "dummy" dataset and
320                # overwrite it with the metadata we have (except for the
321                # key). this ensures that a new unique key will be
322                # generated.
323                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
324                metadata.pop("key")
325                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
326
327            except DataSetException:
328                # this is *good* since it means the key doesn't exist, so
329                # we can re-use the key of the imported dataset
330                self.db.insert("datasets", data=metadata)
331                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
332
333        if new_dataset.key != original_key:
334            # could not use original key because it was already in use
335            # so update any references to use the new key
336            self.remapped_keys[original_key] = new_dataset.key
337            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
338                                      f"{new_dataset.key} instead of key {original_key}!")
339
340        # refresh object, make sure it's in sync with the database
341        self.created_datasets.add(new_dataset.key)
342        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
343        current_log = None
344        if new_dataset.key == self.dataset.key:
345            # this ensures that the first imported dataset becomes the
346            # processor's "own" dataset, and that the import logs go to
347            # that dataset's log file. For later imports, this evaluates to
348            # False.
349
350            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
351            current_log = self.dataset.get_log_path().read_text()
352            # Update the dataset
353            self.dataset = new_dataset
354
355        # if the key of the parent dataset was changed, change the
356        # reference to it that the child dataset has
357        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
358            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
359
360        # update some attributes that should come from the new server, not
361        # the old
362        new_dataset.creator = self.dataset_owner
363        new_dataset.original_timestamp = new_dataset.timestamp
364        new_dataset.imported = True
365        new_dataset.timestamp = int(time.time())
366        new_dataset.db.commit()
367
368        # make sure the dataset path uses the new key and local dataset
369        # path settings. this also makes sure the log file is created in
370        # the right place (since it is derived from the results file path)
371        extension = metadata["result_file"].split(".")[-1]
372        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
373        if not updated:
374            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
375
376        if current_log:
377            # Add the current log to the new dataset
378            with new_dataset.get_log_path().open("a") as outfile:
379                outfile.write(current_log)
380
381        return new_dataset
382
383
384    def process_urls(self):
385        """
386        Import 4CAT dataset from another 4CAT server
387
388        Interfaces with another 4CAT server to transfer a dataset's metadata,
389        data files and child datasets.
390        """
391        urls = [url.strip() for url in self.parameters.get("url").split(",")]
392        self.base = urls[0].split("/results/")[0]
393        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
394        api_key = self.parameters.get("api-key")
395
396        imported = []  # successfully imported datasets
397        failed_imports = []  # keys that failed to import
398        num_rows = 0  # will be used later
399
400        # we can add support for multiple datasets later by removing
401        # this part!
402        keys = [keys[0]]
403
404        while keys:
405            dataset_key = keys.pop(0)
406
407            self.halt_and_catch_fire()
408            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
409
410            # first, metadata!
411            try:
412                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
413                metadata = metadata.json()
414            except FourcatImportException as e:
415                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
416                continue
417            except ValueError:
418                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
419                continue
420
421            # copying empty datasets doesn't really make sense
422            if metadata["num_rows"] == 0:
423                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
424                failed_imports.append(dataset_key)
425                continue
426
427            metadata = self.process_metadata(metadata)
428
429            # create the new dataset
430            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
431
432            # then, the log
433            self.halt_and_catch_fire()
434            try:
435                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
436                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
437                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
438                logpath = new_dataset.get_log_path()
439                new_dataset.log("Original dataset log included below:")
440                with logpath.open("a") as outfile:
441                    outfile.write(log.text)
442            except FourcatImportException as e:
443                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
444                failed_imports.append(dataset_key)
445                continue
446            except ValueError:
447                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
448                failed_imports.append(dataset_key)
449                continue
450
451            # then, the results
452            self.halt_and_catch_fire()
453            try:
454                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
455                datapath = new_dataset.get_results_path()
456                data = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
457
458                if not imported:
459                    # first dataset - use num rows as 'overall'
460                    num_rows = metadata["num_rows"]
461
462            except FourcatImportException as e:
463                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
464                if new_dataset.key != self.dataset.key:
465                    new_dataset.delete()
466                continue
467
468            except ValueError:
469                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
470                failed_imports.append(dataset_key)
471                continue
472
473            # finally, the kids
474            self.halt_and_catch_fire()
475            try:
476                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
477                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
478                children = children.json()
479            except FourcatImportException as e:
480                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
481                failed_imports.append(dataset_key)
482                continue
483            except ValueError:
484                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
485                failed_imports.append(dataset_key)
486                continue
487
488            for child in children:
489                keys.append(child)
490                self.dataset.log(f"Adding child dataset {child} to import queue")
491
492            # done - remember that we've imported this one
493            imported.append(new_dataset)
494            new_dataset.update_status(metadata["status"])
495
496            if new_dataset.key != self.dataset.key:
497                # only finish if this is not the 'main' dataset, or the user
498                # will think the whole import is done
499                new_dataset.finish(metadata["num_rows"])
500
501        # todo: this part needs updating if/when we support importing multiple datasets!
502        if failed_imports:
503            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
504                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
505                                       f"dataset log file for details.", is_final=True)
506        else:
507            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
508                                       is_final=True)
509
510        if not self.dataset.is_finished():
511            # now all related datasets are imported, we can finish the 'main'
512            # dataset, and the user will be alerted that the full import is
513            # complete
514            self.dataset.finish(num_rows)
515
516    def halt_and_catch_fire(self):
517        """
518        Clean up on interrupt
519
520        There are multiple places in the code where we can bail out on an
521        interrupt, so abstract that away in its own function.
522        :return:
523        """
524        if self.interrupted:
525            # resuming is impossible because the original dataset (which
526            # has the list of URLs to import) has probably been
527            # overwritten by this point
528            deletables = [k for k in self.created_datasets if k != self.dataset.key]
529            for deletable in deletables:
530                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
531
532            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
533                                           f"will need to initiate the import again.")
534
535            raise ProcessorInterruptedException()
536
537    @staticmethod
538    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
539        """
540        Get dataset component from 4CAT export API
541
542        :param str base:  Server URL base to import from
543        :param str dataset_key:  Key of dataset to import
544        :param str api_key:  API authentication token
545        :param str component:  Component to retrieve
546        :return:  HTTP response object
547        """
548        try:
549            if component == "data" and datapath:
550                # Stream data
551                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
552                                  headers={
553                                            "User-Agent": "4cat/import",
554                                            "Authentication": api_key
555                                        }) as r:
556                    r.raise_for_status()
557                    with datapath.open("wb") as outfile:
558                        for chunk in r.iter_content(chunk_size=8192):
559                            outfile.write(chunk)
560                return r
561            else:
562                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
563                    "User-Agent": "4cat/import",
564                    "Authentication": api_key
565                })
566        except requests.Timeout:
567            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
568                                         f"accessible to external connections and try again.")
569        except requests.RequestException as e:
570            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
571                                         f"accessible to external connections and try again.")
572
573        if response.status_code == 404:
574            raise FourcatImportException(
575                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
576                f"a valid dataset.")
577        elif response.status_code in (401, 403):
578            raise FourcatImportException(
579                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
580                f"dataset and are using the correct API key.")
581        elif response.status_code != 200:
582            raise FourcatImportException(
583                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
584
585        return response
586
587    @staticmethod
588    def validate_query(query, request, user):
589        """
590        Validate custom data input
591
592        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
593        some metadata.
594
595        :param dict query:  Query parameters, from client-side.
596        :param request:  Flask request
597        :param User user:  User object of user who has submitted the query
598        :return dict:  Safe query parameters
599        """
600        if query.get("method") == "zip":
601            filename = ""
602            if "option-data_upload-entries" in request.form:
603                # First pass sends list of files in the zip
604                pass
605            elif "option-data_upload" in request.files:
606                # Second pass sends the actual file
607                file = request.files["option-data_upload"]
608                if not file:
609                    raise QueryParametersException("No file uploaded.")
610
611                if not file.filename.endswith(".zip"):
612                    raise QueryParametersException("Uploaded file must be a ZIP file.")
613
614                filename = file.filename
615            else:
616                raise QueryParametersException("No file was offered for upload.")
617
618            return {
619                "method": "zip",
620                "filename": filename
621            }
622        elif query.get("method") == "url":
623            urls = query.get("url")
624            if not urls:
625                raise QueryParametersException("Provide at least one dataset URL.")
626
627            urls = urls.split(",")
628            bases = set([url.split("/results/")[0].lower() for url in urls])
629            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
630
631            if len(keys) != 1:
632                # todo: change this to < 1 if we allow multiple datasets
633                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
634
635            if len(bases) != 1:
636                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
637                                                "one 4CAT server at a time.")
638
639            base = urls[0].split("/results/")[0]
640            try:
641                # test if API key is valid and server is reachable
642                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
643            except FourcatImportException as e:
644                raise QueryParametersException(str(e))
645
646            try:
647                # test if we get a response we can parse
648                metadata = test.json()
649            except ValueError:
650                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
651
652            version = get_software_version()
653
654            if metadata.get("current_4CAT_version") != version:
655                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
656                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
657                                               "sure both are running the same version of 4CAT and try again.")
658
659            # OK, we can import at least one dataset
660            return {
661                "url": ",".join(urls),
662                "api-key": query.get("api-key")
663            }
664        else:
665            raise QueryParametersException("Import method not yet implemented.")
666
667    @staticmethod
668    def get_keys_from_urls(urls):
669        """
670        Get dataset keys from 4CAT URLs
671
672        :param list urls:  List of URLs
673        :return list:  List of keys
674        """
675        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
676
677    @staticmethod
678    def ensure_key(query):
679        """
680        Determine key for dataset generated by this processor
681
682        When importing datasets, it's necessary to determine the key of the
683        dataset that is created before it is actually created, because we want
684        to keep the original key of the imported dataset if possible. Luckily,
685        we can deduce it from the URL we're importing the dataset from.
686
687        :param dict query:  Input from the user, through the front-end
688        :return str:  Desired dataset key
689        """
690        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
691        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
692        urls = query.get("url", "").split(",")
693        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
694        return keys[0]
class FourcatImportException(common.lib.exceptions.FourcatException):
18class FourcatImportException(FourcatException):
19    pass

Base 4CAT exception class

class SearchImportFromFourcat(backend.lib.processor.BasicProcessor):
 22class SearchImportFromFourcat(BasicProcessor):
 23    type = "import_4cat-search"  # job ID
 24    category = "Search"  # category
 25    title = "Import 4CAT dataset and analyses"  # title displayed in UI
 26    description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)"  # description displayed in UI
 27    is_local = False  # Whether this datasource is locally scraped
 28    is_static = False  # Whether this datasource is still updated
 29
 30    max_workers = 1  # this cannot be more than 1, else things get VERY messy
 31
 32    options = {
 33        "intro": {
 34            "type": UserInput.OPTION_INFO,
 35            "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. "
 36                    "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. "
 37                    "You can find the current version in the footer at the bottom of the interface."
 38        },
 39        "method": {
 40            "type": UserInput.OPTION_CHOICE,
 41            "help": "Import Type",
 42            "options": {
 43                "zip": "Zip File",
 44                "url": "4CAT URL",
 45            },
 46            "default": "url"
 47        },
 48        "url": {
 49            "type": UserInput.OPTION_TEXT,
 50            "help": "Dataset URL",
 51            "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.",
 52            "requires": "method^=url"
 53        },
 54        "intro2": {
 55            "type": UserInput.OPTION_INFO,
 56            "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need "
 57                    "an API key from **the server you are importing from**, not the one you are looking at right now. "
 58                    "Additionally, you need to have owner access to the dataset you want to import.",
 59            "requires": "method^=url"
 60        },
 61        "api-key": {
 62            "type": UserInput.OPTION_TEXT,
 63            "help": "4CAT API Key",
 64            "sensitive": True,
 65            "cache": True,
 66            "requires": "method^=url"
 67        },
 68        "data_upload": {
 69            "type": UserInput.OPTION_FILE,
 70            "help": "File",
 71            "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.",
 72            "requires": "method^=zip"
 73        },
 74
 75    }
 76
 77    created_datasets = None
 78    base = None
 79    remapped_keys = None
 80    dataset_owner = None
 81
 82    def process(self):
 83        """
 84        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 85        """
 86        self.created_datasets = set()  # keys of created datasets - may not be successful!
 87        self.remapped_keys = {}  # changed dataset keys
 88        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 89        try:
 90            if self.parameters.get("method") == "zip":
 91                self.process_zip()
 92            else:
 93                self.process_urls()
 94        except Exception as e:
 95            # Catch all exceptions and finish the job with an error
 96            # Resuming is impossible because this dataset was overwritten with the importing dataset
 97            # halt_and_catch_fire() will clean up and delete the datasets that were created
 98            self.interrupted = True
 99            try:
100                self.halt_and_catch_fire()
101            except ProcessorInterruptedException:
102                pass
103            # Reraise the original exception for logging
104            raise e
105
106    def after_create(query, dataset, request):
107        """
108        Hook to execute after the dataset for this source has been created
109
110        In this case, put the file in a temporary location so it can be
111        processed properly by the related Job later.
112
113        :param dict query:  Sanitised query parameters
114        :param DataSet dataset:  Dataset created for this query
115        :param request:  Flask request submitted for its creation
116        """
117        if query.get("method") == "zip":
118            file = request.files["option-data_upload"]
119            file.seek(0)
120            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
121                while True:
122                    chunk = file.read(1024)
123                    if len(chunk) == 0:
124                        break
125                    outfile.write(chunk)
126        else:
127            # nothing to do for URLs
128            pass
129
130
131    def process_zip(self):
132        """
133        Import 4CAT dataset from a ZIP file
134        """
135        self.dataset.update_status(f"Importing datasets and analyses from ZIP file.")
136        temp_file = self.dataset.get_results_path().with_suffix(".importing")
137
138        imported = []
139        processed_files = 1 # take into account the export.log file
140        failed_imports = []
141        primary_dataset_original_log = None
142        with zipfile.ZipFile(temp_file, "r") as zip_ref:
143            zip_contents = zip_ref.namelist()
144
145            # Get all metadata files and determine primary dataset
146            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
147            if not metadata_files:
148                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
149                return
150
151            # Get the primary dataset
152            primary_dataset_keys = set()
153            datasets = []
154            parent_child_mapping = {}
155            for file in metadata_files:
156                with zip_ref.open(file) as f:
157                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
158                    metadata = json.loads(content)
159                    if not metadata.get("key_parent"):
160                        primary_dataset_keys.add(metadata.get("key"))
161                        datasets.append(metadata)
162                    else:
163                        # Store the mapping of parent to child datasets
164                        parent_key = metadata.get("key_parent")
165                        if parent_key not in parent_child_mapping:
166                            parent_child_mapping[parent_key] = []
167                        parent_child_mapping[parent_key].append(metadata)
168
169            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
170            if len(primary_dataset_keys) != 1:
171                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
172                return
173
174            # Import datasets
175            while datasets:
176                self.halt_and_catch_fire()
177
178                # Create the datasets
179                metadata = datasets.pop(0)
180                dataset_key = metadata.get("key")
181                processed_metadata = self.process_metadata(metadata)
182                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
183                processed_files += 1
184
185                # Copy the log file
186                self.halt_and_catch_fire()
187                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
188                if log_filename in zip_contents:
189                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
190                    with zip_ref.open(log_filename) as f:
191                        content = f.read().decode('utf-8')
192                        if new_dataset.key == self.dataset.key:
193                            # Hold the original log for the primary dataset and add at the end
194                            primary_dataset_original_log = content
195                        else:
196                            new_dataset.log("Original dataset log included below:")
197                            with new_dataset.get_log_path().open("a") as outfile:
198                                outfile.write(content)
199                    processed_files += 1
200                else:
201                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
202
203                # Copy the results
204                self.halt_and_catch_fire()
205                results_filename = metadata["result_file"]
206                if results_filename in zip_contents:
207                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
208                    with zip_ref.open(results_filename) as f:
209                        with new_dataset.get_results_path().open("wb") as outfile:
210                            outfile.write(f.read())
211                    processed_files += 1
212
213                    if not imported:
214                        # first dataset - use num rows as 'overall'
215                        num_rows = metadata["num_rows"]
216                else:
217                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
218                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
219                    failed_imports.append(dataset_key)
220                    continue
221
222                # finally, the kids
223                self.halt_and_catch_fire()
224                if dataset_key in parent_child_mapping:
225                    datasets.extend(parent_child_mapping[dataset_key])
226                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
227
228                # done - remember that we've imported this one
229                imported.append(new_dataset)
230                new_dataset.update_status(metadata["status"])
231
232                if new_dataset.key != self.dataset.key:
233                    # only finish if this is not the 'main' dataset, or the user
234                    # will think the whole import is done
235                    new_dataset.finish(metadata["num_rows"])
236
237            # Check that all files were processed
238            missed_files = []
239            if len(zip_contents) != processed_files:
240                for file in zip_contents:
241                    if file not in processed_files:
242                        missed_files.append(file)
243
244            # todo: this part needs updating if/when we support importing multiple datasets!
245            if failed_imports:
246                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
247                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
248                                           f"dataset log file for details.", is_final=True)
249            elif missed_files:
250                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
251                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
252                                           f"{len(missed_files)} files were not successfully imported. Check the "
253                                           f"dataset log file for details.", is_final=True)
254            else:
255                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
256                                           is_final=True)
257
258            if not self.dataset.is_finished():
259                # now all related datasets are imported, we can finish the 'main'
260                # dataset, and the user will be alerted that the full import is
261                # complete
262                self.dataset.finish(num_rows)
263
264            # Add the original log for the primary dataset
265            if primary_dataset_original_log:
266                self.dataset.log("Original dataset log included below:\n")
267                with self.dataset.get_log_path().open("a") as outfile:
268                    outfile.write(primary_dataset_original_log)
269
270
271    @staticmethod
272    def process_metadata(metadata):
273        """
274        Process metadata for import
275        """
276        # get rid of some keys that are server-specific and don't need to
277        # be stored (or don't correspond to database columns)
278        metadata.pop("current_4CAT_version")
279        metadata.pop("id")
280        metadata.pop("job")
281        metadata.pop("is_private")
282        metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
283
284        # extra params are stored as JSON...
285        metadata["parameters"] = json.loads(metadata["parameters"])
286        if "copied_from" in metadata["parameters"]:
287            metadata["parameters"].pop("copied_from")
288        metadata["parameters"] = json.dumps(metadata["parameters"])
289
290        return metadata
291
292    def create_dataset(self, metadata, original_key, primary=False):
293        """
294        Create a new dataset
295        """
296        if primary:
297            self.dataset.update_status(f"Importing primary dataset {original_key}.")
298            # if this is the first dataset we're importing, make it the
299            # processor's "own" dataset. the key has already been set to
300            # the imported dataset's key via ensure_key() (or a new unqiue
301            # key if it already existed on this server)
302            # by making it the "own" dataset, the user initiating the
303            # import will see the imported dataset as the "result" of their
304            # import query in the interface, similar to the workflow for
305            # other data sources
306            new_dataset = self.dataset
307
308            # Update metadata and file
309            metadata.pop("key")  # key already OK (see above)
310            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
311
312        else:
313            self.dataset.update_status(f"Importing child dataset {original_key}.")
314            # supernumerary datasets - handle on their own
315            # these include any children of imported datasets
316            try:
317                key_exists = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
318
319                # if we *haven't* thrown a DatasetException now, then the
320                # key is already in use, so create a "dummy" dataset and
321                # overwrite it with the metadata we have (except for the
322                # key). this ensures that a new unique key will be
323                # generated.
324                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
325                metadata.pop("key")
326                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
327
328            except DataSetException:
329                # this is *good* since it means the key doesn't exist, so
330                # we can re-use the key of the imported dataset
331                self.db.insert("datasets", data=metadata)
332                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
333
334        if new_dataset.key != original_key:
335            # could not use original key because it was already in use
336            # so update any references to use the new key
337            self.remapped_keys[original_key] = new_dataset.key
338            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
339                                      f"{new_dataset.key} instead of key {original_key}!")
340
341        # refresh object, make sure it's in sync with the database
342        self.created_datasets.add(new_dataset.key)
343        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
344        current_log = None
345        if new_dataset.key == self.dataset.key:
346            # this ensures that the first imported dataset becomes the
347            # processor's "own" dataset, and that the import logs go to
348            # that dataset's log file. For later imports, this evaluates to
349            # False.
350
351            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
352            current_log = self.dataset.get_log_path().read_text()
353            # Update the dataset
354            self.dataset = new_dataset
355
356        # if the key of the parent dataset was changed, change the
357        # reference to it that the child dataset has
358        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
359            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
360
361        # update some attributes that should come from the new server, not
362        # the old
363        new_dataset.creator = self.dataset_owner
364        new_dataset.original_timestamp = new_dataset.timestamp
365        new_dataset.imported = True
366        new_dataset.timestamp = int(time.time())
367        new_dataset.db.commit()
368
369        # make sure the dataset path uses the new key and local dataset
370        # path settings. this also makes sure the log file is created in
371        # the right place (since it is derived from the results file path)
372        extension = metadata["result_file"].split(".")[-1]
373        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
374        if not updated:
375            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
376
377        if current_log:
378            # Add the current log to the new dataset
379            with new_dataset.get_log_path().open("a") as outfile:
380                outfile.write(current_log)
381
382        return new_dataset
383
384
385    def process_urls(self):
386        """
387        Import 4CAT dataset from another 4CAT server
388
389        Interfaces with another 4CAT server to transfer a dataset's metadata,
390        data files and child datasets.
391        """
392        urls = [url.strip() for url in self.parameters.get("url").split(",")]
393        self.base = urls[0].split("/results/")[0]
394        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
395        api_key = self.parameters.get("api-key")
396
397        imported = []  # successfully imported datasets
398        failed_imports = []  # keys that failed to import
399        num_rows = 0  # will be used later
400
401        # we can add support for multiple datasets later by removing
402        # this part!
403        keys = [keys[0]]
404
405        while keys:
406            dataset_key = keys.pop(0)
407
408            self.halt_and_catch_fire()
409            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
410
411            # first, metadata!
412            try:
413                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
414                metadata = metadata.json()
415            except FourcatImportException as e:
416                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
417                continue
418            except ValueError:
419                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
420                continue
421
422            # copying empty datasets doesn't really make sense
423            if metadata["num_rows"] == 0:
424                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
425                failed_imports.append(dataset_key)
426                continue
427
428            metadata = self.process_metadata(metadata)
429
430            # create the new dataset
431            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
432
433            # then, the log
434            self.halt_and_catch_fire()
435            try:
436                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
437                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
438                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
439                logpath = new_dataset.get_log_path()
440                new_dataset.log("Original dataset log included below:")
441                with logpath.open("a") as outfile:
442                    outfile.write(log.text)
443            except FourcatImportException as e:
444                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
445                failed_imports.append(dataset_key)
446                continue
447            except ValueError:
448                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
449                failed_imports.append(dataset_key)
450                continue
451
452            # then, the results
453            self.halt_and_catch_fire()
454            try:
455                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
456                datapath = new_dataset.get_results_path()
457                data = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
458
459                if not imported:
460                    # first dataset - use num rows as 'overall'
461                    num_rows = metadata["num_rows"]
462
463            except FourcatImportException as e:
464                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
465                if new_dataset.key != self.dataset.key:
466                    new_dataset.delete()
467                continue
468
469            except ValueError:
470                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
471                failed_imports.append(dataset_key)
472                continue
473
474            # finally, the kids
475            self.halt_and_catch_fire()
476            try:
477                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
478                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
479                children = children.json()
480            except FourcatImportException as e:
481                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
482                failed_imports.append(dataset_key)
483                continue
484            except ValueError:
485                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
486                failed_imports.append(dataset_key)
487                continue
488
489            for child in children:
490                keys.append(child)
491                self.dataset.log(f"Adding child dataset {child} to import queue")
492
493            # done - remember that we've imported this one
494            imported.append(new_dataset)
495            new_dataset.update_status(metadata["status"])
496
497            if new_dataset.key != self.dataset.key:
498                # only finish if this is not the 'main' dataset, or the user
499                # will think the whole import is done
500                new_dataset.finish(metadata["num_rows"])
501
502        # todo: this part needs updating if/when we support importing multiple datasets!
503        if failed_imports:
504            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
505                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
506                                       f"dataset log file for details.", is_final=True)
507        else:
508            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
509                                       is_final=True)
510
511        if not self.dataset.is_finished():
512            # now all related datasets are imported, we can finish the 'main'
513            # dataset, and the user will be alerted that the full import is
514            # complete
515            self.dataset.finish(num_rows)
516
517    def halt_and_catch_fire(self):
518        """
519        Clean up on interrupt
520
521        There are multiple places in the code where we can bail out on an
522        interrupt, so abstract that away in its own function.
523        :return:
524        """
525        if self.interrupted:
526            # resuming is impossible because the original dataset (which
527            # has the list of URLs to import) has probably been
528            # overwritten by this point
529            deletables = [k for k in self.created_datasets if k != self.dataset.key]
530            for deletable in deletables:
531                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
532
533            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
534                                           f"will need to initiate the import again.")
535
536            raise ProcessorInterruptedException()
537
538    @staticmethod
539    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
540        """
541        Get dataset component from 4CAT export API
542
543        :param str base:  Server URL base to import from
544        :param str dataset_key:  Key of dataset to import
545        :param str api_key:  API authentication token
546        :param str component:  Component to retrieve
547        :return:  HTTP response object
548        """
549        try:
550            if component == "data" and datapath:
551                # Stream data
552                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
553                                  headers={
554                                            "User-Agent": "4cat/import",
555                                            "Authentication": api_key
556                                        }) as r:
557                    r.raise_for_status()
558                    with datapath.open("wb") as outfile:
559                        for chunk in r.iter_content(chunk_size=8192):
560                            outfile.write(chunk)
561                return r
562            else:
563                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
564                    "User-Agent": "4cat/import",
565                    "Authentication": api_key
566                })
567        except requests.Timeout:
568            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
569                                         f"accessible to external connections and try again.")
570        except requests.RequestException as e:
571            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
572                                         f"accessible to external connections and try again.")
573
574        if response.status_code == 404:
575            raise FourcatImportException(
576                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
577                f"a valid dataset.")
578        elif response.status_code in (401, 403):
579            raise FourcatImportException(
580                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
581                f"dataset and are using the correct API key.")
582        elif response.status_code != 200:
583            raise FourcatImportException(
584                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
585
586        return response
587
588    @staticmethod
589    def validate_query(query, request, user):
590        """
591        Validate custom data input
592
593        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
594        some metadata.
595
596        :param dict query:  Query parameters, from client-side.
597        :param request:  Flask request
598        :param User user:  User object of user who has submitted the query
599        :return dict:  Safe query parameters
600        """
601        if query.get("method") == "zip":
602            filename = ""
603            if "option-data_upload-entries" in request.form:
604                # First pass sends list of files in the zip
605                pass
606            elif "option-data_upload" in request.files:
607                # Second pass sends the actual file
608                file = request.files["option-data_upload"]
609                if not file:
610                    raise QueryParametersException("No file uploaded.")
611
612                if not file.filename.endswith(".zip"):
613                    raise QueryParametersException("Uploaded file must be a ZIP file.")
614
615                filename = file.filename
616            else:
617                raise QueryParametersException("No file was offered for upload.")
618
619            return {
620                "method": "zip",
621                "filename": filename
622            }
623        elif query.get("method") == "url":
624            urls = query.get("url")
625            if not urls:
626                raise QueryParametersException("Provide at least one dataset URL.")
627
628            urls = urls.split(",")
629            bases = set([url.split("/results/")[0].lower() for url in urls])
630            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
631
632            if len(keys) != 1:
633                # todo: change this to < 1 if we allow multiple datasets
634                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
635
636            if len(bases) != 1:
637                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
638                                                "one 4CAT server at a time.")
639
640            base = urls[0].split("/results/")[0]
641            try:
642                # test if API key is valid and server is reachable
643                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
644            except FourcatImportException as e:
645                raise QueryParametersException(str(e))
646
647            try:
648                # test if we get a response we can parse
649                metadata = test.json()
650            except ValueError:
651                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
652
653            version = get_software_version()
654
655            if metadata.get("current_4CAT_version") != version:
656                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
657                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
658                                               "sure both are running the same version of 4CAT and try again.")
659
660            # OK, we can import at least one dataset
661            return {
662                "url": ",".join(urls),
663                "api-key": query.get("api-key")
664            }
665        else:
666            raise QueryParametersException("Import method not yet implemented.")
667
668    @staticmethod
669    def get_keys_from_urls(urls):
670        """
671        Get dataset keys from 4CAT URLs
672
673        :param list urls:  List of URLs
674        :return list:  List of keys
675        """
676        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
677
678    @staticmethod
679    def ensure_key(query):
680        """
681        Determine key for dataset generated by this processor
682
683        When importing datasets, it's necessary to determine the key of the
684        dataset that is created before it is actually created, because we want
685        to keep the original key of the imported dataset if possible. Luckily,
686        we can deduce it from the URL we're importing the dataset from.
687
688        :param dict query:  Input from the user, through the front-end
689        :return str:  Desired dataset key
690        """
691        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
692        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
693        urls = query.get("url", "").split(",")
694        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
695        return keys[0]

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, str user=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:


@classmethod def is_compatible_with(cls, module=None, user=None): return module.type == "linguistic-features"

type = 'import_4cat-search'
category = 'Search'
title = 'Import 4CAT dataset and analyses'
description = 'Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)'
is_local = False
is_static = False
max_workers = 1
options = {'intro': {'type': 'info', 'help': 'Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. \n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. You can find the current version in the footer at the bottom of the interface.'}, 'method': {'type': 'choice', 'help': 'Import Type', 'options': {'zip': 'Zip File', 'url': '4CAT URL'}, 'default': 'url'}, 'url': {'type': 'string', 'help': 'Dataset URL', 'tooltip': "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 'requires': 'method^=url'}, 'intro2': {'type': 'info', 'help': "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need an API key from **the server you are importing from**, not the one you are looking at right now. Additionally, you need to have owner access to the dataset you want to import.", 'requires': 'method^=url'}, 'api-key': {'type': 'string', 'help': '4CAT API Key', 'sensitive': True, 'cache': True, 'requires': 'method^=url'}, 'data_upload': {'type': 'file', 'help': 'File', 'tooltip': 'Upload a ZIP file containing a dataset exported from a 4CAT server.', 'requires': 'method^=zip'}}
created_datasets = None
base = None
remapped_keys = None
dataset_owner = None
def process(self):
 82    def process(self):
 83        """
 84        Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
 85        """
 86        self.created_datasets = set()  # keys of created datasets - may not be successful!
 87        self.remapped_keys = {}  # changed dataset keys
 88        self.dataset_owner = self.dataset.get_owners()[0]  # at this point it has 1 owner
 89        try:
 90            if self.parameters.get("method") == "zip":
 91                self.process_zip()
 92            else:
 93                self.process_urls()
 94        except Exception as e:
 95            # Catch all exceptions and finish the job with an error
 96            # Resuming is impossible because this dataset was overwritten with the importing dataset
 97            # halt_and_catch_fire() will clean up and delete the datasets that were created
 98            self.interrupted = True
 99            try:
100                self.halt_and_catch_fire()
101            except ProcessorInterruptedException:
102                pass
103            # Reraise the original exception for logging
104            raise e

Import 4CAT dataset either from another 4CAT server or from the uploaded zip file

def after_create(query, dataset, request):
106    def after_create(query, dataset, request):
107        """
108        Hook to execute after the dataset for this source has been created
109
110        In this case, put the file in a temporary location so it can be
111        processed properly by the related Job later.
112
113        :param dict query:  Sanitised query parameters
114        :param DataSet dataset:  Dataset created for this query
115        :param request:  Flask request submitted for its creation
116        """
117        if query.get("method") == "zip":
118            file = request.files["option-data_upload"]
119            file.seek(0)
120            with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile:
121                while True:
122                    chunk = file.read(1024)
123                    if len(chunk) == 0:
124                        break
125                    outfile.write(chunk)
126        else:
127            # nothing to do for URLs
128            pass

Hook to execute after the dataset for this source has been created

In this case, put the file in a temporary location so it can be processed properly by the related Job later.

Parameters
  • dict query: Sanitised query parameters
  • DataSet dataset: Dataset created for this query
  • request: Flask request submitted for its creation
def process_zip(self):
131    def process_zip(self):
132        """
133        Import 4CAT dataset from a ZIP file
134        """
135        self.dataset.update_status(f"Importing datasets and analyses from ZIP file.")
136        temp_file = self.dataset.get_results_path().with_suffix(".importing")
137
138        imported = []
139        processed_files = 1 # take into account the export.log file
140        failed_imports = []
141        primary_dataset_original_log = None
142        with zipfile.ZipFile(temp_file, "r") as zip_ref:
143            zip_contents = zip_ref.namelist()
144
145            # Get all metadata files and determine primary dataset
146            metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")]
147            if not metadata_files:
148                self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?")
149                return
150
151            # Get the primary dataset
152            primary_dataset_keys = set()
153            datasets = []
154            parent_child_mapping = {}
155            for file in metadata_files:
156                with zip_ref.open(file) as f:
157                    content = f.read().decode('utf-8')  # Decode the binary content using the desired encoding
158                    metadata = json.loads(content)
159                    if not metadata.get("key_parent"):
160                        primary_dataset_keys.add(metadata.get("key"))
161                        datasets.append(metadata)
162                    else:
163                        # Store the mapping of parent to child datasets
164                        parent_key = metadata.get("key_parent")
165                        if parent_key not in parent_child_mapping:
166                            parent_child_mapping[parent_key] = []
167                        parent_child_mapping[parent_key].append(metadata)
168
169            # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets
170            if len(primary_dataset_keys) != 1:
171                self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.")
172                return
173
174            # Import datasets
175            while datasets:
176                self.halt_and_catch_fire()
177
178                # Create the datasets
179                metadata = datasets.pop(0)
180                dataset_key = metadata.get("key")
181                processed_metadata = self.process_metadata(metadata)
182                new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
183                processed_files += 1
184
185                # Copy the log file
186                self.halt_and_catch_fire()
187                log_filename = Path(metadata["result_file"]).with_suffix(".log").name
188                if log_filename in zip_contents:
189                    self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
190                    with zip_ref.open(log_filename) as f:
191                        content = f.read().decode('utf-8')
192                        if new_dataset.key == self.dataset.key:
193                            # Hold the original log for the primary dataset and add at the end
194                            primary_dataset_original_log = content
195                        else:
196                            new_dataset.log("Original dataset log included below:")
197                            with new_dataset.get_log_path().open("a") as outfile:
198                                outfile.write(content)
199                    processed_files += 1
200                else:
201                    self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")
202
203                # Copy the results
204                self.halt_and_catch_fire()
205                results_filename = metadata["result_file"]
206                if results_filename in zip_contents:
207                    self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
208                    with zip_ref.open(results_filename) as f:
209                        with new_dataset.get_results_path().open("wb") as outfile:
210                            outfile.write(f.read())
211                    processed_files += 1
212
213                    if not imported:
214                        # first dataset - use num rows as 'overall'
215                        num_rows = metadata["num_rows"]
216                else:
217                    self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
218                    new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
219                    failed_imports.append(dataset_key)
220                    continue
221
222                # finally, the kids
223                self.halt_and_catch_fire()
224                if dataset_key in parent_child_mapping:
225                    datasets.extend(parent_child_mapping[dataset_key])
226                    self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue")
227
228                # done - remember that we've imported this one
229                imported.append(new_dataset)
230                new_dataset.update_status(metadata["status"])
231
232                if new_dataset.key != self.dataset.key:
233                    # only finish if this is not the 'main' dataset, or the user
234                    # will think the whole import is done
235                    new_dataset.finish(metadata["num_rows"])
236
237            # Check that all files were processed
238            missed_files = []
239            if len(zip_contents) != processed_files:
240                for file in zip_contents:
241                    if file not in processed_files:
242                        missed_files.append(file)
243
244            # todo: this part needs updating if/when we support importing multiple datasets!
245            if failed_imports:
246                self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
247                                           f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
248                                           f"dataset log file for details.", is_final=True)
249            elif missed_files:
250                self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}")
251                self.dataset.update_status(f"Dataset import finished, but not all files were processed. "
252                                           f"{len(missed_files)} files were not successfully imported. Check the "
253                                           f"dataset log file for details.", is_final=True)
254            else:
255                self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.",
256                                           is_final=True)
257
258            if not self.dataset.is_finished():
259                # now all related datasets are imported, we can finish the 'main'
260                # dataset, and the user will be alerted that the full import is
261                # complete
262                self.dataset.finish(num_rows)
263
264            # Add the original log for the primary dataset
265            if primary_dataset_original_log:
266                self.dataset.log("Original dataset log included below:\n")
267                with self.dataset.get_log_path().open("a") as outfile:
268                    outfile.write(primary_dataset_original_log)

Import 4CAT dataset from a ZIP file

@staticmethod
def process_metadata(metadata):
271    @staticmethod
272    def process_metadata(metadata):
273        """
274        Process metadata for import
275        """
276        # get rid of some keys that are server-specific and don't need to
277        # be stored (or don't correspond to database columns)
278        metadata.pop("current_4CAT_version")
279        metadata.pop("id")
280        metadata.pop("job")
281        metadata.pop("is_private")
282        metadata.pop("is_finished")  # we'll finish it ourselves, thank you!!!
283
284        # extra params are stored as JSON...
285        metadata["parameters"] = json.loads(metadata["parameters"])
286        if "copied_from" in metadata["parameters"]:
287            metadata["parameters"].pop("copied_from")
288        metadata["parameters"] = json.dumps(metadata["parameters"])
289
290        return metadata

Process metadata for import

def create_dataset(self, metadata, original_key, primary=False):
292    def create_dataset(self, metadata, original_key, primary=False):
293        """
294        Create a new dataset
295        """
296        if primary:
297            self.dataset.update_status(f"Importing primary dataset {original_key}.")
298            # if this is the first dataset we're importing, make it the
299            # processor's "own" dataset. the key has already been set to
300            # the imported dataset's key via ensure_key() (or a new unqiue
301            # key if it already existed on this server)
302            # by making it the "own" dataset, the user initiating the
303            # import will see the imported dataset as the "result" of their
304            # import query in the interface, similar to the workflow for
305            # other data sources
306            new_dataset = self.dataset
307
308            # Update metadata and file
309            metadata.pop("key")  # key already OK (see above)
310            self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
311
312        else:
313            self.dataset.update_status(f"Importing child dataset {original_key}.")
314            # supernumerary datasets - handle on their own
315            # these include any children of imported datasets
316            try:
317                key_exists = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
318
319                # if we *haven't* thrown a DatasetException now, then the
320                # key is already in use, so create a "dummy" dataset and
321                # overwrite it with the metadata we have (except for the
322                # key). this ensures that a new unique key will be
323                # generated.
324                new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules)
325                metadata.pop("key")
326                self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)
327
328            except DataSetException:
329                # this is *good* since it means the key doesn't exist, so
330                # we can re-use the key of the imported dataset
331                self.db.insert("datasets", data=metadata)
332                new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)
333
334        if new_dataset.key != original_key:
335            # could not use original key because it was already in use
336            # so update any references to use the new key
337            self.remapped_keys[original_key] = new_dataset.key
338            self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
339                                      f"{new_dataset.key} instead of key {original_key}!")
340
341        # refresh object, make sure it's in sync with the database
342        self.created_datasets.add(new_dataset.key)
343        new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
344        current_log = None
345        if new_dataset.key == self.dataset.key:
346            # this ensures that the first imported dataset becomes the
347            # processor's "own" dataset, and that the import logs go to
348            # that dataset's log file. For later imports, this evaluates to
349            # False.
350
351            # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
352            current_log = self.dataset.get_log_path().read_text()
353            # Update the dataset
354            self.dataset = new_dataset
355
356        # if the key of the parent dataset was changed, change the
357        # reference to it that the child dataset has
358        if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys:
359            new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent]
360
361        # update some attributes that should come from the new server, not
362        # the old
363        new_dataset.creator = self.dataset_owner
364        new_dataset.original_timestamp = new_dataset.timestamp
365        new_dataset.imported = True
366        new_dataset.timestamp = int(time.time())
367        new_dataset.db.commit()
368
369        # make sure the dataset path uses the new key and local dataset
370        # path settings. this also makes sure the log file is created in
371        # the right place (since it is derived from the results file path)
372        extension = metadata["result_file"].split(".")[-1]
373        updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
374        if not updated:
375            self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")
376
377        if current_log:
378            # Add the current log to the new dataset
379            with new_dataset.get_log_path().open("a") as outfile:
380                outfile.write(current_log)
381
382        return new_dataset

Create a new dataset

def process_urls(self):
385    def process_urls(self):
386        """
387        Import 4CAT dataset from another 4CAT server
388
389        Interfaces with another 4CAT server to transfer a dataset's metadata,
390        data files and child datasets.
391        """
392        urls = [url.strip() for url in self.parameters.get("url").split(",")]
393        self.base = urls[0].split("/results/")[0]
394        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
395        api_key = self.parameters.get("api-key")
396
397        imported = []  # successfully imported datasets
398        failed_imports = []  # keys that failed to import
399        num_rows = 0  # will be used later
400
401        # we can add support for multiple datasets later by removing
402        # this part!
403        keys = [keys[0]]
404
405        while keys:
406            dataset_key = keys.pop(0)
407
408            self.halt_and_catch_fire()
409            self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.")
410
411            # first, metadata!
412            try:
413                metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata")
414                metadata = metadata.json()
415            except FourcatImportException as e:
416                self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}")
417                continue
418            except ValueError:
419                self.dataset.log(f"Could not read metadata for dataset {dataset_key}")
420                continue
421
422            # copying empty datasets doesn't really make sense
423            if metadata["num_rows"] == 0:
424                self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
425                failed_imports.append(dataset_key)
426                continue
427
428            metadata = self.process_metadata(metadata)
429
430            # create the new dataset
431            new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False)
432
433            # then, the log
434            self.halt_and_catch_fire()
435            try:
436                self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
437                # TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
438                log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
439                logpath = new_dataset.get_log_path()
440                new_dataset.log("Original dataset log included below:")
441                with logpath.open("a") as outfile:
442                    outfile.write(log.text)
443            except FourcatImportException as e:
444                new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}")
445                failed_imports.append(dataset_key)
446                continue
447            except ValueError:
448                new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset")
449                failed_imports.append(dataset_key)
450                continue
451
452            # then, the results
453            self.halt_and_catch_fire()
454            try:
455                self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
456                datapath = new_dataset.get_results_path()
457                data = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath)
458
459                if not imported:
460                    # first dataset - use num rows as 'overall'
461                    num_rows = metadata["num_rows"]
462
463            except FourcatImportException as e:
464                self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import")
465                if new_dataset.key != self.dataset.key:
466                    new_dataset.delete()
467                continue
468
469            except ValueError:
470                new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}")
471                failed_imports.append(dataset_key)
472                continue
473
474            # finally, the kids
475            self.halt_and_catch_fire()
476            try:
477                self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}")
478                children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children")
479                children = children.json()
480            except FourcatImportException as e:
481                self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}")
482                failed_imports.append(dataset_key)
483                continue
484            except ValueError:
485                self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}")
486                failed_imports.append(dataset_key)
487                continue
488
489            for child in children:
490                keys.append(child)
491                self.dataset.log(f"Adding child dataset {child} to import queue")
492
493            # done - remember that we've imported this one
494            imported.append(new_dataset)
495            new_dataset.update_status(metadata["status"])
496
497            if new_dataset.key != self.dataset.key:
498                # only finish if this is not the 'main' dataset, or the user
499                # will think the whole import is done
500                new_dataset.finish(metadata["num_rows"])
501
502        # todo: this part needs updating if/when we support importing multiple datasets!
503        if failed_imports:
504            self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. "
505                                       f"{len(failed_imports)} dataset(s) were not successfully imported. Check the "
506                                       f"dataset log file for details.", is_final=True)
507        else:
508            self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.",
509                                       is_final=True)
510
511        if not self.dataset.is_finished():
512            # now all related datasets are imported, we can finish the 'main'
513            # dataset, and the user will be alerted that the full import is
514            # complete
515            self.dataset.finish(num_rows)

Import 4CAT dataset from another 4CAT server

Interfaces with another 4CAT server to transfer a dataset's metadata, data files and child datasets.

def halt_and_catch_fire(self):
517    def halt_and_catch_fire(self):
518        """
519        Clean up on interrupt
520
521        There are multiple places in the code where we can bail out on an
522        interrupt, so abstract that away in its own function.
523        :return:
524        """
525        if self.interrupted:
526            # resuming is impossible because the original dataset (which
527            # has the list of URLs to import) has probably been
528            # overwritten by this point
529            deletables = [k for k in self.created_datasets if k != self.dataset.key]
530            for deletable in deletables:
531                DataSet(key=deletable, db=self.db, modules=self.modules).delete()
532
533            self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you "
534                                           f"will need to initiate the import again.")
535
536            raise ProcessorInterruptedException()

Clean up on interrupt

There are multiple places in the code where we can bail out on an interrupt, so abstract that away in its own function.

Returns
@staticmethod
def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
538    @staticmethod
539    def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None):
540        """
541        Get dataset component from 4CAT export API
542
543        :param str base:  Server URL base to import from
544        :param str dataset_key:  Key of dataset to import
545        :param str api_key:  API authentication token
546        :param str component:  Component to retrieve
547        :return:  HTTP response object
548        """
549        try:
550            if component == "data" and datapath:
551                # Stream data
552                with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True,
553                                  headers={
554                                            "User-Agent": "4cat/import",
555                                            "Authentication": api_key
556                                        }) as r:
557                    r.raise_for_status()
558                    with datapath.open("wb") as outfile:
559                        for chunk in r.iter_content(chunk_size=8192):
560                            outfile.write(chunk)
561                return r
562            else:
563                response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={
564                    "User-Agent": "4cat/import",
565                    "Authentication": api_key
566                })
567        except requests.Timeout:
568            raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is "
569                                         f"accessible to external connections and try again.")
570        except requests.RequestException as e:
571            raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is "
572                                         f"accessible to external connections and try again.")
573
574        if response.status_code == 404:
575            raise FourcatImportException(
576                f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to "
577                f"a valid dataset.")
578        elif response.status_code in (401, 403):
579            raise FourcatImportException(
580                f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this "
581                f"dataset and are using the correct API key.")
582        elif response.status_code != 200:
583            raise FourcatImportException(
584                f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}")
585
586        return response

Get dataset component from 4CAT export API

Parameters
  • str base: Server URL base to import from
  • str dataset_key: Key of dataset to import
  • str api_key: API authentication token
  • str component: Component to retrieve
Returns

HTTP response object

@staticmethod
def validate_query(query, request, user):
588    @staticmethod
589    def validate_query(query, request, user):
590        """
591        Validate custom data input
592
593        Confirms that the uploaded file is a valid CSV or tab file and, if so, returns
594        some metadata.
595
596        :param dict query:  Query parameters, from client-side.
597        :param request:  Flask request
598        :param User user:  User object of user who has submitted the query
599        :return dict:  Safe query parameters
600        """
601        if query.get("method") == "zip":
602            filename = ""
603            if "option-data_upload-entries" in request.form:
604                # First pass sends list of files in the zip
605                pass
606            elif "option-data_upload" in request.files:
607                # Second pass sends the actual file
608                file = request.files["option-data_upload"]
609                if not file:
610                    raise QueryParametersException("No file uploaded.")
611
612                if not file.filename.endswith(".zip"):
613                    raise QueryParametersException("Uploaded file must be a ZIP file.")
614
615                filename = file.filename
616            else:
617                raise QueryParametersException("No file was offered for upload.")
618
619            return {
620                "method": "zip",
621                "filename": filename
622            }
623        elif query.get("method") == "url":
624            urls = query.get("url")
625            if not urls:
626                raise QueryParametersException("Provide at least one dataset URL.")
627
628            urls = urls.split(",")
629            bases = set([url.split("/results/")[0].lower() for url in urls])
630            keys = SearchImportFromFourcat.get_keys_from_urls(urls)
631
632            if len(keys) != 1:
633                # todo: change this to < 1 if we allow multiple datasets
634                raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.")
635
636            if len(bases) != 1:
637                raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from "
638                                                "one 4CAT server at a time.")
639
640            base = urls[0].split("/results/")[0]
641            try:
642                # test if API key is valid and server is reachable
643                test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata")
644            except FourcatImportException as e:
645                raise QueryParametersException(str(e))
646
647            try:
648                # test if we get a response we can parse
649                metadata = test.json()
650            except ValueError:
651                raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.")
652
653            version = get_software_version()
654
655            if metadata.get("current_4CAT_version") != version:
656                raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than "
657                                               f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make "
658                                               "sure both are running the same version of 4CAT and try again.")
659
660            # OK, we can import at least one dataset
661            return {
662                "url": ",".join(urls),
663                "api-key": query.get("api-key")
664            }
665        else:
666            raise QueryParametersException("Import method not yet implemented.")

Validate custom data input

Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters

@staticmethod
def get_keys_from_urls(urls):
668    @staticmethod
669    def get_keys_from_urls(urls):
670        """
671        Get dataset keys from 4CAT URLs
672
673        :param list urls:  List of URLs
674        :return list:  List of keys
675        """
676        return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]

Get dataset keys from 4CAT URLs

Parameters
  • list urls: List of URLs
Returns

List of keys

@staticmethod
def ensure_key(query):
678    @staticmethod
679    def ensure_key(query):
680        """
681        Determine key for dataset generated by this processor
682
683        When importing datasets, it's necessary to determine the key of the
684        dataset that is created before it is actually created, because we want
685        to keep the original key of the imported dataset if possible. Luckily,
686        we can deduce it from the URL we're importing the dataset from.
687
688        :param dict query:  Input from the user, through the front-end
689        :return str:  Desired dataset key
690        """
691        #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
692        # this method is called via `after_create`. We could download here and also identify the primary dataset key...
693        urls = query.get("url", "").split(",")
694        keys = SearchImportFromFourcat.get_keys_from_urls(urls)
695        return keys[0]

Determine key for dataset generated by this processor

When importing datasets, it's necessary to determine the key of the dataset that is created before it is actually created, because we want to keep the original key of the imported dataset if possible. Luckily, we can deduce it from the URL we're importing the dataset from.

Parameters
  • dict query: Input from the user, through the front-end
Returns

Desired dataset key