datasources.fourcat_import.import_4cat
Import datasets from other 4CATs
1""" 2Import datasets from other 4CATs 3""" 4import requests 5import json 6import time 7import zipfile 8from pathlib import Path 9from psycopg2.errors import InFailedSqlTransaction 10 11from backend.lib.processor import BasicProcessor 12from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException, 13 DataSetException) 14from common.lib.helpers import UserInput, get_software_version 15from common.lib.dataset import DataSet 16 17 18class FourcatImportException(FourcatException): 19 pass 20 21 22class SearchImportFromFourcat(BasicProcessor): 23 type = "import_4cat-search" # job ID 24 category = "Search" # category 25 title = "Import 4CAT dataset and analyses" # title displayed in UI 26 description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)" # description displayed in UI 27 is_local = False # Whether this datasource is locally scraped 28 is_static = False # Whether this datasource is still updated 29 30 max_workers = 1 # this cannot be more than 1, else things get VERY messy 31 32 created_datasets = None 33 base = None 34 remapped_keys = None 35 dataset_owner = None 36 37 @classmethod 38 def get_options(cls, parent_dataset=None, config=None) -> dict: 39 """ 40 Get processor options 41 42 :param DataSet parent_dataset: An object representing the dataset that 43 the processor would or was run on 44 :param ConfigManager|None config: Configuration reader (context-aware) 45 """ 46 return { 47 "intro": { 48 "type": UserInput.OPTION_INFO, 49 "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. " 50 "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " 51 "You can find the current version in the footer at the bottom of the interface." 52 }, 53 "method": { 54 "type": UserInput.OPTION_CHOICE, 55 "help": "Import Type", 56 "options": { 57 "zip": "Zip File", 58 "url": "4CAT URL", 59 }, 60 "default": "url" 61 }, 62 "url": { 63 "type": UserInput.OPTION_TEXT, 64 "help": "Dataset URL", 65 "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 66 "requires": "method^=url" 67 }, 68 "intro2": { 69 "type": UserInput.OPTION_INFO, 70 "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " 71 "an API key from **the server you are importing from**, not the one you are looking at right now. " 72 "Additionally, you need to have owner access to the dataset you want to import.", 73 "requires": "method^=url" 74 }, 75 "api-key": { 76 "type": UserInput.OPTION_TEXT, 77 "help": "4CAT API Key", 78 "sensitive": True, 79 "cache": True, 80 "requires": "method^=url" 81 }, 82 "data_upload": { 83 "type": UserInput.OPTION_FILE, 84 "help": "File", 85 "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", 86 "requires": "method^=zip" 87 }, 88 89 } 90 91 def process(self): 92 """ 93 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 94 """ 95 self.created_datasets = set() # keys of created datasets - may not be successful! 96 self.remapped_keys = {} # changed dataset keys 97 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 98 try: 99 if self.parameters.get("method") == "zip": 100 self.process_zip() 101 else: 102 self.process_urls() 103 except Exception as e: 104 # Catch all exceptions and finish the job with an error 105 # Resuming is impossible because this dataset was overwritten with the importing dataset 106 # halt_and_catch_fire() will clean up and delete the datasets that were created 107 self.interrupted = True 108 try: 109 self.halt_and_catch_fire() 110 except ProcessorInterruptedException: 111 pass 112 except InFailedSqlTransaction: 113 # Catch database issue and retry 114 self.db.rollback() 115 try: 116 self.halt_and_catch_fire() 117 except ProcessorInterruptedException: 118 pass 119 # Reraise the original exception for logging 120 raise e 121 122 def after_create(query, dataset, request): 123 """ 124 Hook to execute after the dataset for this source has been created 125 126 In this case, put the file in a temporary location so it can be 127 processed properly by the related Job later. 128 129 :param dict query: Sanitised query parameters 130 :param DataSet dataset: Dataset created for this query 131 :param request: Flask request submitted for its creation 132 """ 133 if query.get("method") == "zip": 134 file = request.files["option-data_upload"] 135 file.seek(0) 136 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 137 while True: 138 chunk = file.read(1024) 139 if len(chunk) == 0: 140 break 141 outfile.write(chunk) 142 else: 143 # nothing to do for URLs 144 pass 145 146 147 def process_zip(self): 148 """ 149 Import 4CAT dataset from a ZIP file 150 """ 151 self.dataset.update_status("Importing datasets and analyses from ZIP file.") 152 temp_file = self.dataset.get_results_path().with_suffix(".importing") 153 154 imported = [] 155 processed_files = 1 # take into account the export.log file 156 failed_imports = [] 157 primary_dataset_original_log = None 158 with zipfile.ZipFile(temp_file, "r") as zip_ref: 159 zip_contents = zip_ref.namelist() 160 161 # Get all metadata files and determine primary dataset 162 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 163 if not metadata_files: 164 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 165 return 166 167 # Get the primary dataset 168 primary_dataset_keys = set() 169 datasets = [] 170 parent_child_mapping = {} 171 for file in metadata_files: 172 with zip_ref.open(file) as f: 173 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 174 metadata = json.loads(content) 175 if not metadata.get("key_parent"): 176 primary_dataset_keys.add(metadata.get("key")) 177 datasets.append(metadata) 178 else: 179 # Store the mapping of parent to child datasets 180 parent_key = metadata.get("key_parent") 181 if parent_key not in parent_child_mapping: 182 parent_child_mapping[parent_key] = [] 183 parent_child_mapping[parent_key].append(metadata) 184 185 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 186 if len(primary_dataset_keys) != 1: 187 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 188 return 189 190 # Import datasets 191 while datasets: 192 self.halt_and_catch_fire() 193 194 # Create the datasets 195 metadata = datasets.pop(0) 196 dataset_key = metadata.get("key") 197 processed_metadata = self.process_metadata(metadata) 198 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 199 processed_files += 1 200 201 # Copy the log file 202 self.halt_and_catch_fire() 203 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 204 if log_filename in zip_contents: 205 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 206 with zip_ref.open(log_filename) as f: 207 content = f.read().decode('utf-8') 208 if new_dataset.key == self.dataset.key: 209 # Hold the original log for the primary dataset and add at the end 210 primary_dataset_original_log = content 211 else: 212 new_dataset.log("Original dataset log included below:") 213 with new_dataset.get_log_path().open("a") as outfile: 214 outfile.write(content) 215 processed_files += 1 216 else: 217 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 218 219 # Copy the results 220 self.halt_and_catch_fire() 221 results_filename = metadata["result_file"] 222 if results_filename in zip_contents: 223 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 224 with zip_ref.open(results_filename) as f: 225 with new_dataset.get_results_path().open("wb") as outfile: 226 outfile.write(f.read()) 227 processed_files += 1 228 229 if not imported: 230 # first dataset - use num rows as 'overall' 231 num_rows = metadata["num_rows"] 232 else: 233 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 234 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 235 failed_imports.append(dataset_key) 236 continue 237 238 # finally, the kids 239 self.halt_and_catch_fire() 240 if dataset_key in parent_child_mapping: 241 datasets.extend(parent_child_mapping[dataset_key]) 242 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 243 244 # done - remember that we've imported this one 245 imported.append(new_dataset) 246 new_dataset.update_status(metadata["status"]) 247 248 if new_dataset.key != self.dataset.key: 249 # only finish if this is not the 'main' dataset, or the user 250 # will think the whole import is done 251 new_dataset.finish(metadata["num_rows"]) 252 253 # Check that all files were processed 254 missed_files = [] 255 if len(zip_contents) != processed_files: 256 for file in zip_contents: 257 if file not in processed_files: 258 missed_files.append(file) 259 260 # todo: this part needs updating if/when we support importing multiple datasets! 261 if failed_imports: 262 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 263 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 264 f"dataset log file for details.", is_final=True) 265 elif missed_files: 266 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 267 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 268 f"{len(missed_files)} files were not successfully imported. Check the " 269 f"dataset log file for details.", is_final=True) 270 else: 271 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 272 is_final=True) 273 274 if not self.dataset.is_finished(): 275 # now all related datasets are imported, we can finish the 'main' 276 # dataset, and the user will be alerted that the full import is 277 # complete 278 self.dataset.finish(num_rows) 279 280 # Add the original log for the primary dataset 281 if primary_dataset_original_log: 282 self.dataset.log("Original dataset log included below:\n") 283 with self.dataset.get_log_path().open("a") as outfile: 284 outfile.write(primary_dataset_original_log) 285 286 287 @staticmethod 288 def process_metadata(metadata): 289 """ 290 Process metadata for import 291 292 :param dict metadata: Metadata of import 293 :return: Relevant metadata, or `None` if parsing error 294 """ 295 # get rid of some keys that are server-specific and don't need to 296 # be stored (or don't correspond to database columns) 297 try: 298 metadata.pop("current_4CAT_version") 299 metadata.pop("id") 300 metadata.pop("job") 301 metadata.pop("is_private") 302 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 303 304 # extra params are stored as JSON... 305 metadata["parameters"] = json.loads(metadata["parameters"]) 306 if "copied_from" in metadata["parameters"]: 307 metadata["parameters"].pop("copied_from") 308 metadata["parameters"] = json.dumps(metadata["parameters"]) 309 310 except (ValueError, KeyError): 311 # we don't need all this metadata but it still should be present, 312 # otherwise something is wrong with the data format 313 return None 314 315 return metadata 316 317 def create_dataset(self, metadata, original_key, primary=False): 318 """ 319 Create a new dataset 320 """ 321 if primary: 322 self.dataset.update_status(f"Importing primary dataset {original_key}.") 323 # if this is the first dataset we're importing, make it the 324 # processor's "own" dataset. the key has already been set to 325 # the imported dataset's key via ensure_key() (or a new unqiue 326 # key if it already existed on this server) 327 # by making it the "own" dataset, the user initiating the 328 # import will see the imported dataset as the "result" of their 329 # import query in the interface, similar to the workflow for 330 # other data sources 331 new_dataset = self.dataset 332 333 # Update metadata and file 334 metadata.pop("key") # key already OK (see above) 335 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 336 337 else: 338 self.dataset.update_status(f"Importing child dataset {original_key}.") 339 # supernumerary datasets - handle on their own 340 # these include any children of imported datasets 341 try: 342 DataSet(key=metadata["key"], db=self.db, modules=self.modules) 343 344 # if we *haven't* thrown a DatasetException now, then the 345 # key is already in use, so create a "dummy" dataset and 346 # overwrite it with the metadata we have (except for the 347 # key). this ensures that a new unique key will be 348 # generated. 349 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 350 metadata.pop("key") 351 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 352 353 except DataSetException: 354 # this is *good* since it means the key doesn't exist, so 355 # we can re-use the key of the imported dataset 356 self.db.insert("datasets", data=metadata) 357 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 358 359 if new_dataset.key != original_key: 360 # could not use original key because it was already in use 361 # so update any references to use the new key 362 self.remapped_keys[original_key] = new_dataset.key 363 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 364 f"{new_dataset.key} instead of key {original_key}!") 365 366 # refresh object, make sure it's in sync with the database 367 self.created_datasets.add(new_dataset.key) 368 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 369 current_log = None 370 if new_dataset.key == self.dataset.key: 371 # this ensures that the first imported dataset becomes the 372 # processor's "own" dataset, and that the import logs go to 373 # that dataset's log file. For later imports, this evaluates to 374 # False. 375 376 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 377 current_log = self.dataset.get_log_path().read_text() 378 # Update the dataset 379 self.dataset = new_dataset 380 381 # if the key of the parent dataset was changed, change the 382 # reference to it that the child dataset has 383 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 384 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 385 386 # update some attributes that should come from the new server, not 387 # the old 388 new_dataset.creator = self.dataset_owner 389 new_dataset.original_timestamp = new_dataset.timestamp 390 new_dataset.imported = True 391 new_dataset.timestamp = int(time.time()) 392 new_dataset.db.commit() 393 394 # make sure the dataset path uses the new key and local dataset 395 # path settings. this also makes sure the log file is created in 396 # the right place (since it is derived from the results file path) 397 extension = metadata["result_file"].split(".")[-1] 398 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 399 if not updated: 400 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 401 402 if current_log: 403 # Add the current log to the new dataset 404 with new_dataset.get_log_path().open("a") as outfile: 405 outfile.write(current_log) 406 407 return new_dataset 408 409 410 def process_urls(self): 411 """ 412 Import 4CAT dataset from another 4CAT server 413 414 Interfaces with another 4CAT server to transfer a dataset's metadata, 415 data files and child datasets. 416 """ 417 urls = [url.strip() for url in self.parameters.get("url").split(",")] 418 self.base = urls[0].split("/results/")[0] 419 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 420 api_key = self.parameters.get("api-key") 421 422 imported = [] # successfully imported datasets 423 failed_imports = [] # keys that failed to import 424 num_rows = 0 # will be used later 425 426 # we can add support for multiple datasets later by removing 427 # this part! 428 keys = [keys[0]] 429 430 while keys: 431 dataset_key = keys.pop(0) 432 433 self.halt_and_catch_fire() 434 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 435 436 # first, metadata! 437 try: 438 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 439 metadata = metadata.json() 440 except FourcatImportException as e: 441 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 442 continue 443 except ValueError: 444 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 445 continue 446 447 # copying empty datasets doesn't really make sense 448 if metadata["num_rows"] == 0: 449 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 450 failed_imports.append(dataset_key) 451 continue 452 453 metadata = self.process_metadata(metadata) 454 if metadata is None: 455 self.dataset.update_status(f"Metadata for dataset {dataset_key} incomplete, skipping") 456 failed_imports.append(dataset_key) 457 continue 458 459 # create the new dataset 460 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 461 462 # then, the log 463 self.halt_and_catch_fire() 464 try: 465 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 466 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 467 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 468 logpath = new_dataset.get_log_path() 469 new_dataset.log("Original dataset log included below:") 470 with logpath.open("a") as outfile: 471 outfile.write(log.text) 472 except FourcatImportException as e: 473 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 474 failed_imports.append(dataset_key) 475 continue 476 except ValueError: 477 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 478 failed_imports.append(dataset_key) 479 continue 480 481 # then, the results 482 self.halt_and_catch_fire() 483 try: 484 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 485 datapath = new_dataset.get_results_path() 486 SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 487 488 if not imported: 489 # first dataset - use num rows as 'overall' 490 num_rows = metadata["num_rows"] 491 492 except FourcatImportException as e: 493 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 494 if new_dataset.key != self.dataset.key: 495 new_dataset.delete() 496 continue 497 498 except ValueError: 499 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 500 failed_imports.append(dataset_key) 501 continue 502 503 # finally, the kids 504 self.halt_and_catch_fire() 505 try: 506 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 507 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 508 children = children.json() 509 except FourcatImportException as e: 510 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 511 failed_imports.append(dataset_key) 512 continue 513 except ValueError: 514 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 515 failed_imports.append(dataset_key) 516 continue 517 518 for child in children: 519 keys.append(child) 520 self.dataset.log(f"Adding child dataset {child} to import queue") 521 522 # done - remember that we've imported this one 523 imported.append(new_dataset) 524 new_dataset.update_status(metadata["status"]) 525 526 if new_dataset.key != self.dataset.key: 527 # only finish if this is not the 'main' dataset, or the user 528 # will think the whole import is done 529 new_dataset.finish(metadata["num_rows"]) 530 531 # todo: this part needs updating if/when we support importing multiple datasets! 532 if failed_imports: 533 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 534 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 535 f"dataset log file for details.", is_final=True) 536 else: 537 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 538 is_final=True) 539 540 if not self.dataset.is_finished(): 541 # now all related datasets are imported, we can finish the 'main' 542 # dataset, and the user will be alerted that the full import is 543 # complete 544 self.dataset.finish(num_rows) 545 546 def halt_and_catch_fire(self): 547 """ 548 Clean up on interrupt 549 550 There are multiple places in the code where we can bail out on an 551 interrupt, so abstract that away in its own function. 552 :return: 553 """ 554 if self.interrupted: 555 # resuming is impossible because the original dataset (which 556 # has the list of URLs to import) has probably been 557 # overwritten by this point 558 deletables = [k for k in self.created_datasets if k != self.dataset.key] 559 for deletable in deletables: 560 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 561 562 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 563 f"will need to initiate the import again.") 564 565 raise ProcessorInterruptedException() 566 567 @staticmethod 568 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 569 """ 570 Get dataset component from 4CAT export API 571 572 :param str base: Server URL base to import from 573 :param str dataset_key: Key of dataset to import 574 :param str api_key: API authentication token 575 :param str component: Component to retrieve 576 :return: HTTP response object 577 """ 578 try: 579 if component == "data" and datapath: 580 # Stream data 581 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 582 headers={ 583 "User-Agent": "4cat/import", 584 "Authentication": api_key 585 }) as r: 586 r.raise_for_status() 587 with datapath.open("wb") as outfile: 588 for chunk in r.iter_content(chunk_size=8192): 589 outfile.write(chunk) 590 return r 591 else: 592 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 593 "User-Agent": "4cat/import", 594 "Authentication": api_key 595 }) 596 except requests.Timeout: 597 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 598 f"accessible to external connections and try again.") 599 except requests.RequestException as e: 600 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 601 f"accessible to external connections and try again.") 602 603 if response.status_code == 404: 604 raise FourcatImportException( 605 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 606 f"a valid dataset.") 607 elif response.status_code in (401, 403): 608 raise FourcatImportException( 609 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 610 f"dataset and are using the correct API key.") 611 elif response.status_code != 200: 612 raise FourcatImportException( 613 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 614 615 return response 616 617 @staticmethod 618 def validate_query(query, request, config): 619 """ 620 Validate custom data input 621 622 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 623 some metadata. 624 625 :param dict query: Query parameters, from client-side. 626 :param request: Flask request 627 :param ConfigManager|None config: Configuration reader (context-aware) 628 :return dict: Safe query parameters 629 """ 630 if query.get("method") == "zip": 631 filename = "" 632 if "option-data_upload-entries" in request.form: 633 # First pass sends list of files in the zip 634 pass 635 elif "option-data_upload" in request.files: 636 # Second pass sends the actual file 637 file = request.files["option-data_upload"] 638 if not file: 639 raise QueryParametersException("No file uploaded.") 640 641 if not file.filename.endswith(".zip"): 642 raise QueryParametersException("Uploaded file must be a ZIP file.") 643 644 filename = file.filename 645 else: 646 raise QueryParametersException("No file was offered for upload.") 647 648 return { 649 "method": "zip", 650 "filename": filename 651 } 652 elif query.get("method") == "url": 653 urls = query.get("url") 654 if not urls: 655 raise QueryParametersException("Provide at least one dataset URL.") 656 657 urls = urls.split(",") 658 bases = set([url.split("/results/")[0].lower() for url in urls]) 659 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 660 661 if len(keys) != 1: 662 # todo: change this to < 1 if we allow multiple datasets 663 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 664 665 if len(bases) != 1: 666 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 667 "one 4CAT server at a time.") 668 669 base = urls[0].split("/results/")[0] 670 try: 671 # test if API key is valid and server is reachable 672 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 673 except FourcatImportException as e: 674 raise QueryParametersException(str(e)) 675 676 try: 677 # test if we get a response we can parse 678 metadata = test.json() 679 except ValueError: 680 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 681 682 version = get_software_version() 683 684 if metadata.get("current_4CAT_version") != version: 685 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 686 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 687 "sure both are running the same version of 4CAT and try again.") 688 689 # OK, we can import at least one dataset 690 return { 691 "url": ",".join(urls), 692 "api-key": query.get("api-key") 693 } 694 else: 695 raise QueryParametersException("Import method not yet implemented.") 696 697 @staticmethod 698 def get_keys_from_urls(urls): 699 """ 700 Get dataset keys from 4CAT URLs 701 702 :param list urls: List of URLs 703 :return list: List of keys 704 """ 705 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls] 706 707 @staticmethod 708 def ensure_key(query): 709 """ 710 Determine key for dataset generated by this processor 711 712 When importing datasets, it's necessary to determine the key of the 713 dataset that is created before it is actually created, because we want 714 to keep the original key of the imported dataset if possible. Luckily, 715 we can deduce it from the URL we're importing the dataset from. 716 717 :param dict query: Input from the user, through the front-end 718 :return str: Desired dataset key 719 """ 720 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 721 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 722 urls = query.get("url", "").split(",") 723 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 724 return keys[0]
Base 4CAT exception class
Inherited Members
23class SearchImportFromFourcat(BasicProcessor): 24 type = "import_4cat-search" # job ID 25 category = "Search" # category 26 title = "Import 4CAT dataset and analyses" # title displayed in UI 27 description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)" # description displayed in UI 28 is_local = False # Whether this datasource is locally scraped 29 is_static = False # Whether this datasource is still updated 30 31 max_workers = 1 # this cannot be more than 1, else things get VERY messy 32 33 created_datasets = None 34 base = None 35 remapped_keys = None 36 dataset_owner = None 37 38 @classmethod 39 def get_options(cls, parent_dataset=None, config=None) -> dict: 40 """ 41 Get processor options 42 43 :param DataSet parent_dataset: An object representing the dataset that 44 the processor would or was run on 45 :param ConfigManager|None config: Configuration reader (context-aware) 46 """ 47 return { 48 "intro": { 49 "type": UserInput.OPTION_INFO, 50 "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. " 51 "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " 52 "You can find the current version in the footer at the bottom of the interface." 53 }, 54 "method": { 55 "type": UserInput.OPTION_CHOICE, 56 "help": "Import Type", 57 "options": { 58 "zip": "Zip File", 59 "url": "4CAT URL", 60 }, 61 "default": "url" 62 }, 63 "url": { 64 "type": UserInput.OPTION_TEXT, 65 "help": "Dataset URL", 66 "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 67 "requires": "method^=url" 68 }, 69 "intro2": { 70 "type": UserInput.OPTION_INFO, 71 "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " 72 "an API key from **the server you are importing from**, not the one you are looking at right now. " 73 "Additionally, you need to have owner access to the dataset you want to import.", 74 "requires": "method^=url" 75 }, 76 "api-key": { 77 "type": UserInput.OPTION_TEXT, 78 "help": "4CAT API Key", 79 "sensitive": True, 80 "cache": True, 81 "requires": "method^=url" 82 }, 83 "data_upload": { 84 "type": UserInput.OPTION_FILE, 85 "help": "File", 86 "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", 87 "requires": "method^=zip" 88 }, 89 90 } 91 92 def process(self): 93 """ 94 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 95 """ 96 self.created_datasets = set() # keys of created datasets - may not be successful! 97 self.remapped_keys = {} # changed dataset keys 98 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 99 try: 100 if self.parameters.get("method") == "zip": 101 self.process_zip() 102 else: 103 self.process_urls() 104 except Exception as e: 105 # Catch all exceptions and finish the job with an error 106 # Resuming is impossible because this dataset was overwritten with the importing dataset 107 # halt_and_catch_fire() will clean up and delete the datasets that were created 108 self.interrupted = True 109 try: 110 self.halt_and_catch_fire() 111 except ProcessorInterruptedException: 112 pass 113 except InFailedSqlTransaction: 114 # Catch database issue and retry 115 self.db.rollback() 116 try: 117 self.halt_and_catch_fire() 118 except ProcessorInterruptedException: 119 pass 120 # Reraise the original exception for logging 121 raise e 122 123 def after_create(query, dataset, request): 124 """ 125 Hook to execute after the dataset for this source has been created 126 127 In this case, put the file in a temporary location so it can be 128 processed properly by the related Job later. 129 130 :param dict query: Sanitised query parameters 131 :param DataSet dataset: Dataset created for this query 132 :param request: Flask request submitted for its creation 133 """ 134 if query.get("method") == "zip": 135 file = request.files["option-data_upload"] 136 file.seek(0) 137 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 138 while True: 139 chunk = file.read(1024) 140 if len(chunk) == 0: 141 break 142 outfile.write(chunk) 143 else: 144 # nothing to do for URLs 145 pass 146 147 148 def process_zip(self): 149 """ 150 Import 4CAT dataset from a ZIP file 151 """ 152 self.dataset.update_status("Importing datasets and analyses from ZIP file.") 153 temp_file = self.dataset.get_results_path().with_suffix(".importing") 154 155 imported = [] 156 processed_files = 1 # take into account the export.log file 157 failed_imports = [] 158 primary_dataset_original_log = None 159 with zipfile.ZipFile(temp_file, "r") as zip_ref: 160 zip_contents = zip_ref.namelist() 161 162 # Get all metadata files and determine primary dataset 163 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 164 if not metadata_files: 165 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 166 return 167 168 # Get the primary dataset 169 primary_dataset_keys = set() 170 datasets = [] 171 parent_child_mapping = {} 172 for file in metadata_files: 173 with zip_ref.open(file) as f: 174 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 175 metadata = json.loads(content) 176 if not metadata.get("key_parent"): 177 primary_dataset_keys.add(metadata.get("key")) 178 datasets.append(metadata) 179 else: 180 # Store the mapping of parent to child datasets 181 parent_key = metadata.get("key_parent") 182 if parent_key not in parent_child_mapping: 183 parent_child_mapping[parent_key] = [] 184 parent_child_mapping[parent_key].append(metadata) 185 186 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 187 if len(primary_dataset_keys) != 1: 188 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 189 return 190 191 # Import datasets 192 while datasets: 193 self.halt_and_catch_fire() 194 195 # Create the datasets 196 metadata = datasets.pop(0) 197 dataset_key = metadata.get("key") 198 processed_metadata = self.process_metadata(metadata) 199 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 200 processed_files += 1 201 202 # Copy the log file 203 self.halt_and_catch_fire() 204 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 205 if log_filename in zip_contents: 206 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 207 with zip_ref.open(log_filename) as f: 208 content = f.read().decode('utf-8') 209 if new_dataset.key == self.dataset.key: 210 # Hold the original log for the primary dataset and add at the end 211 primary_dataset_original_log = content 212 else: 213 new_dataset.log("Original dataset log included below:") 214 with new_dataset.get_log_path().open("a") as outfile: 215 outfile.write(content) 216 processed_files += 1 217 else: 218 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 219 220 # Copy the results 221 self.halt_and_catch_fire() 222 results_filename = metadata["result_file"] 223 if results_filename in zip_contents: 224 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 225 with zip_ref.open(results_filename) as f: 226 with new_dataset.get_results_path().open("wb") as outfile: 227 outfile.write(f.read()) 228 processed_files += 1 229 230 if not imported: 231 # first dataset - use num rows as 'overall' 232 num_rows = metadata["num_rows"] 233 else: 234 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 235 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 236 failed_imports.append(dataset_key) 237 continue 238 239 # finally, the kids 240 self.halt_and_catch_fire() 241 if dataset_key in parent_child_mapping: 242 datasets.extend(parent_child_mapping[dataset_key]) 243 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 244 245 # done - remember that we've imported this one 246 imported.append(new_dataset) 247 new_dataset.update_status(metadata["status"]) 248 249 if new_dataset.key != self.dataset.key: 250 # only finish if this is not the 'main' dataset, or the user 251 # will think the whole import is done 252 new_dataset.finish(metadata["num_rows"]) 253 254 # Check that all files were processed 255 missed_files = [] 256 if len(zip_contents) != processed_files: 257 for file in zip_contents: 258 if file not in processed_files: 259 missed_files.append(file) 260 261 # todo: this part needs updating if/when we support importing multiple datasets! 262 if failed_imports: 263 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 264 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 265 f"dataset log file for details.", is_final=True) 266 elif missed_files: 267 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 268 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 269 f"{len(missed_files)} files were not successfully imported. Check the " 270 f"dataset log file for details.", is_final=True) 271 else: 272 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 273 is_final=True) 274 275 if not self.dataset.is_finished(): 276 # now all related datasets are imported, we can finish the 'main' 277 # dataset, and the user will be alerted that the full import is 278 # complete 279 self.dataset.finish(num_rows) 280 281 # Add the original log for the primary dataset 282 if primary_dataset_original_log: 283 self.dataset.log("Original dataset log included below:\n") 284 with self.dataset.get_log_path().open("a") as outfile: 285 outfile.write(primary_dataset_original_log) 286 287 288 @staticmethod 289 def process_metadata(metadata): 290 """ 291 Process metadata for import 292 293 :param dict metadata: Metadata of import 294 :return: Relevant metadata, or `None` if parsing error 295 """ 296 # get rid of some keys that are server-specific and don't need to 297 # be stored (or don't correspond to database columns) 298 try: 299 metadata.pop("current_4CAT_version") 300 metadata.pop("id") 301 metadata.pop("job") 302 metadata.pop("is_private") 303 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 304 305 # extra params are stored as JSON... 306 metadata["parameters"] = json.loads(metadata["parameters"]) 307 if "copied_from" in metadata["parameters"]: 308 metadata["parameters"].pop("copied_from") 309 metadata["parameters"] = json.dumps(metadata["parameters"]) 310 311 except (ValueError, KeyError): 312 # we don't need all this metadata but it still should be present, 313 # otherwise something is wrong with the data format 314 return None 315 316 return metadata 317 318 def create_dataset(self, metadata, original_key, primary=False): 319 """ 320 Create a new dataset 321 """ 322 if primary: 323 self.dataset.update_status(f"Importing primary dataset {original_key}.") 324 # if this is the first dataset we're importing, make it the 325 # processor's "own" dataset. the key has already been set to 326 # the imported dataset's key via ensure_key() (or a new unqiue 327 # key if it already existed on this server) 328 # by making it the "own" dataset, the user initiating the 329 # import will see the imported dataset as the "result" of their 330 # import query in the interface, similar to the workflow for 331 # other data sources 332 new_dataset = self.dataset 333 334 # Update metadata and file 335 metadata.pop("key") # key already OK (see above) 336 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 337 338 else: 339 self.dataset.update_status(f"Importing child dataset {original_key}.") 340 # supernumerary datasets - handle on their own 341 # these include any children of imported datasets 342 try: 343 DataSet(key=metadata["key"], db=self.db, modules=self.modules) 344 345 # if we *haven't* thrown a DatasetException now, then the 346 # key is already in use, so create a "dummy" dataset and 347 # overwrite it with the metadata we have (except for the 348 # key). this ensures that a new unique key will be 349 # generated. 350 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 351 metadata.pop("key") 352 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 353 354 except DataSetException: 355 # this is *good* since it means the key doesn't exist, so 356 # we can re-use the key of the imported dataset 357 self.db.insert("datasets", data=metadata) 358 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 359 360 if new_dataset.key != original_key: 361 # could not use original key because it was already in use 362 # so update any references to use the new key 363 self.remapped_keys[original_key] = new_dataset.key 364 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 365 f"{new_dataset.key} instead of key {original_key}!") 366 367 # refresh object, make sure it's in sync with the database 368 self.created_datasets.add(new_dataset.key) 369 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 370 current_log = None 371 if new_dataset.key == self.dataset.key: 372 # this ensures that the first imported dataset becomes the 373 # processor's "own" dataset, and that the import logs go to 374 # that dataset's log file. For later imports, this evaluates to 375 # False. 376 377 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 378 current_log = self.dataset.get_log_path().read_text() 379 # Update the dataset 380 self.dataset = new_dataset 381 382 # if the key of the parent dataset was changed, change the 383 # reference to it that the child dataset has 384 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 385 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 386 387 # update some attributes that should come from the new server, not 388 # the old 389 new_dataset.creator = self.dataset_owner 390 new_dataset.original_timestamp = new_dataset.timestamp 391 new_dataset.imported = True 392 new_dataset.timestamp = int(time.time()) 393 new_dataset.db.commit() 394 395 # make sure the dataset path uses the new key and local dataset 396 # path settings. this also makes sure the log file is created in 397 # the right place (since it is derived from the results file path) 398 extension = metadata["result_file"].split(".")[-1] 399 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 400 if not updated: 401 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 402 403 if current_log: 404 # Add the current log to the new dataset 405 with new_dataset.get_log_path().open("a") as outfile: 406 outfile.write(current_log) 407 408 return new_dataset 409 410 411 def process_urls(self): 412 """ 413 Import 4CAT dataset from another 4CAT server 414 415 Interfaces with another 4CAT server to transfer a dataset's metadata, 416 data files and child datasets. 417 """ 418 urls = [url.strip() for url in self.parameters.get("url").split(",")] 419 self.base = urls[0].split("/results/")[0] 420 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 421 api_key = self.parameters.get("api-key") 422 423 imported = [] # successfully imported datasets 424 failed_imports = [] # keys that failed to import 425 num_rows = 0 # will be used later 426 427 # we can add support for multiple datasets later by removing 428 # this part! 429 keys = [keys[0]] 430 431 while keys: 432 dataset_key = keys.pop(0) 433 434 self.halt_and_catch_fire() 435 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 436 437 # first, metadata! 438 try: 439 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 440 metadata = metadata.json() 441 except FourcatImportException as e: 442 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 443 continue 444 except ValueError: 445 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 446 continue 447 448 # copying empty datasets doesn't really make sense 449 if metadata["num_rows"] == 0: 450 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 451 failed_imports.append(dataset_key) 452 continue 453 454 metadata = self.process_metadata(metadata) 455 if metadata is None: 456 self.dataset.update_status(f"Metadata for dataset {dataset_key} incomplete, skipping") 457 failed_imports.append(dataset_key) 458 continue 459 460 # create the new dataset 461 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 462 463 # then, the log 464 self.halt_and_catch_fire() 465 try: 466 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 467 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 468 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 469 logpath = new_dataset.get_log_path() 470 new_dataset.log("Original dataset log included below:") 471 with logpath.open("a") as outfile: 472 outfile.write(log.text) 473 except FourcatImportException as e: 474 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 475 failed_imports.append(dataset_key) 476 continue 477 except ValueError: 478 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 479 failed_imports.append(dataset_key) 480 continue 481 482 # then, the results 483 self.halt_and_catch_fire() 484 try: 485 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 486 datapath = new_dataset.get_results_path() 487 SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 488 489 if not imported: 490 # first dataset - use num rows as 'overall' 491 num_rows = metadata["num_rows"] 492 493 except FourcatImportException as e: 494 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 495 if new_dataset.key != self.dataset.key: 496 new_dataset.delete() 497 continue 498 499 except ValueError: 500 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 501 failed_imports.append(dataset_key) 502 continue 503 504 # finally, the kids 505 self.halt_and_catch_fire() 506 try: 507 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 508 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 509 children = children.json() 510 except FourcatImportException as e: 511 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 512 failed_imports.append(dataset_key) 513 continue 514 except ValueError: 515 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 516 failed_imports.append(dataset_key) 517 continue 518 519 for child in children: 520 keys.append(child) 521 self.dataset.log(f"Adding child dataset {child} to import queue") 522 523 # done - remember that we've imported this one 524 imported.append(new_dataset) 525 new_dataset.update_status(metadata["status"]) 526 527 if new_dataset.key != self.dataset.key: 528 # only finish if this is not the 'main' dataset, or the user 529 # will think the whole import is done 530 new_dataset.finish(metadata["num_rows"]) 531 532 # todo: this part needs updating if/when we support importing multiple datasets! 533 if failed_imports: 534 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 535 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 536 f"dataset log file for details.", is_final=True) 537 else: 538 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 539 is_final=True) 540 541 if not self.dataset.is_finished(): 542 # now all related datasets are imported, we can finish the 'main' 543 # dataset, and the user will be alerted that the full import is 544 # complete 545 self.dataset.finish(num_rows) 546 547 def halt_and_catch_fire(self): 548 """ 549 Clean up on interrupt 550 551 There are multiple places in the code where we can bail out on an 552 interrupt, so abstract that away in its own function. 553 :return: 554 """ 555 if self.interrupted: 556 # resuming is impossible because the original dataset (which 557 # has the list of URLs to import) has probably been 558 # overwritten by this point 559 deletables = [k for k in self.created_datasets if k != self.dataset.key] 560 for deletable in deletables: 561 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 562 563 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 564 f"will need to initiate the import again.") 565 566 raise ProcessorInterruptedException() 567 568 @staticmethod 569 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 570 """ 571 Get dataset component from 4CAT export API 572 573 :param str base: Server URL base to import from 574 :param str dataset_key: Key of dataset to import 575 :param str api_key: API authentication token 576 :param str component: Component to retrieve 577 :return: HTTP response object 578 """ 579 try: 580 if component == "data" and datapath: 581 # Stream data 582 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 583 headers={ 584 "User-Agent": "4cat/import", 585 "Authentication": api_key 586 }) as r: 587 r.raise_for_status() 588 with datapath.open("wb") as outfile: 589 for chunk in r.iter_content(chunk_size=8192): 590 outfile.write(chunk) 591 return r 592 else: 593 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 594 "User-Agent": "4cat/import", 595 "Authentication": api_key 596 }) 597 except requests.Timeout: 598 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 599 f"accessible to external connections and try again.") 600 except requests.RequestException as e: 601 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 602 f"accessible to external connections and try again.") 603 604 if response.status_code == 404: 605 raise FourcatImportException( 606 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 607 f"a valid dataset.") 608 elif response.status_code in (401, 403): 609 raise FourcatImportException( 610 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 611 f"dataset and are using the correct API key.") 612 elif response.status_code != 200: 613 raise FourcatImportException( 614 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 615 616 return response 617 618 @staticmethod 619 def validate_query(query, request, config): 620 """ 621 Validate custom data input 622 623 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 624 some metadata. 625 626 :param dict query: Query parameters, from client-side. 627 :param request: Flask request 628 :param ConfigManager|None config: Configuration reader (context-aware) 629 :return dict: Safe query parameters 630 """ 631 if query.get("method") == "zip": 632 filename = "" 633 if "option-data_upload-entries" in request.form: 634 # First pass sends list of files in the zip 635 pass 636 elif "option-data_upload" in request.files: 637 # Second pass sends the actual file 638 file = request.files["option-data_upload"] 639 if not file: 640 raise QueryParametersException("No file uploaded.") 641 642 if not file.filename.endswith(".zip"): 643 raise QueryParametersException("Uploaded file must be a ZIP file.") 644 645 filename = file.filename 646 else: 647 raise QueryParametersException("No file was offered for upload.") 648 649 return { 650 "method": "zip", 651 "filename": filename 652 } 653 elif query.get("method") == "url": 654 urls = query.get("url") 655 if not urls: 656 raise QueryParametersException("Provide at least one dataset URL.") 657 658 urls = urls.split(",") 659 bases = set([url.split("/results/")[0].lower() for url in urls]) 660 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 661 662 if len(keys) != 1: 663 # todo: change this to < 1 if we allow multiple datasets 664 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 665 666 if len(bases) != 1: 667 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 668 "one 4CAT server at a time.") 669 670 base = urls[0].split("/results/")[0] 671 try: 672 # test if API key is valid and server is reachable 673 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 674 except FourcatImportException as e: 675 raise QueryParametersException(str(e)) 676 677 try: 678 # test if we get a response we can parse 679 metadata = test.json() 680 except ValueError: 681 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 682 683 version = get_software_version() 684 685 if metadata.get("current_4CAT_version") != version: 686 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 687 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 688 "sure both are running the same version of 4CAT and try again.") 689 690 # OK, we can import at least one dataset 691 return { 692 "url": ",".join(urls), 693 "api-key": query.get("api-key") 694 } 695 else: 696 raise QueryParametersException("Import method not yet implemented.") 697 698 @staticmethod 699 def get_keys_from_urls(urls): 700 """ 701 Get dataset keys from 4CAT URLs 702 703 :param list urls: List of URLs 704 :return list: List of keys 705 """ 706 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls] 707 708 @staticmethod 709 def ensure_key(query): 710 """ 711 Determine key for dataset generated by this processor 712 713 When importing datasets, it's necessary to determine the key of the 714 dataset that is created before it is actually created, because we want 715 to keep the original key of the imported dataset if possible. Luckily, 716 we can deduce it from the URL we're importing the dataset from. 717 718 :param dict query: Input from the user, through the front-end 719 :return str: Desired dataset key 720 """ 721 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 722 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 723 urls = query.get("url", "").split(",") 724 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 725 return keys[0]
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"
38 @classmethod 39 def get_options(cls, parent_dataset=None, config=None) -> dict: 40 """ 41 Get processor options 42 43 :param DataSet parent_dataset: An object representing the dataset that 44 the processor would or was run on 45 :param ConfigManager|None config: Configuration reader (context-aware) 46 """ 47 return { 48 "intro": { 49 "type": UserInput.OPTION_INFO, 50 "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. " 51 "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " 52 "You can find the current version in the footer at the bottom of the interface." 53 }, 54 "method": { 55 "type": UserInput.OPTION_CHOICE, 56 "help": "Import Type", 57 "options": { 58 "zip": "Zip File", 59 "url": "4CAT URL", 60 }, 61 "default": "url" 62 }, 63 "url": { 64 "type": UserInput.OPTION_TEXT, 65 "help": "Dataset URL", 66 "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 67 "requires": "method^=url" 68 }, 69 "intro2": { 70 "type": UserInput.OPTION_INFO, 71 "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " 72 "an API key from **the server you are importing from**, not the one you are looking at right now. " 73 "Additionally, you need to have owner access to the dataset you want to import.", 74 "requires": "method^=url" 75 }, 76 "api-key": { 77 "type": UserInput.OPTION_TEXT, 78 "help": "4CAT API Key", 79 "sensitive": True, 80 "cache": True, 81 "requires": "method^=url" 82 }, 83 "data_upload": { 84 "type": UserInput.OPTION_FILE, 85 "help": "File", 86 "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", 87 "requires": "method^=zip" 88 }, 89 90 }
Get processor options
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would or was run on
- ConfigManager|None config: Configuration reader (context-aware)
92 def process(self): 93 """ 94 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 95 """ 96 self.created_datasets = set() # keys of created datasets - may not be successful! 97 self.remapped_keys = {} # changed dataset keys 98 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 99 try: 100 if self.parameters.get("method") == "zip": 101 self.process_zip() 102 else: 103 self.process_urls() 104 except Exception as e: 105 # Catch all exceptions and finish the job with an error 106 # Resuming is impossible because this dataset was overwritten with the importing dataset 107 # halt_and_catch_fire() will clean up and delete the datasets that were created 108 self.interrupted = True 109 try: 110 self.halt_and_catch_fire() 111 except ProcessorInterruptedException: 112 pass 113 except InFailedSqlTransaction: 114 # Catch database issue and retry 115 self.db.rollback() 116 try: 117 self.halt_and_catch_fire() 118 except ProcessorInterruptedException: 119 pass 120 # Reraise the original exception for logging 121 raise e
Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
123 def after_create(query, dataset, request): 124 """ 125 Hook to execute after the dataset for this source has been created 126 127 In this case, put the file in a temporary location so it can be 128 processed properly by the related Job later. 129 130 :param dict query: Sanitised query parameters 131 :param DataSet dataset: Dataset created for this query 132 :param request: Flask request submitted for its creation 133 """ 134 if query.get("method") == "zip": 135 file = request.files["option-data_upload"] 136 file.seek(0) 137 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 138 while True: 139 chunk = file.read(1024) 140 if len(chunk) == 0: 141 break 142 outfile.write(chunk) 143 else: 144 # nothing to do for URLs 145 pass
Hook to execute after the dataset for this source has been created
In this case, put the file in a temporary location so it can be processed properly by the related Job later.
Parameters
- dict query: Sanitised query parameters
- DataSet dataset: Dataset created for this query
- request: Flask request submitted for its creation
148 def process_zip(self): 149 """ 150 Import 4CAT dataset from a ZIP file 151 """ 152 self.dataset.update_status("Importing datasets and analyses from ZIP file.") 153 temp_file = self.dataset.get_results_path().with_suffix(".importing") 154 155 imported = [] 156 processed_files = 1 # take into account the export.log file 157 failed_imports = [] 158 primary_dataset_original_log = None 159 with zipfile.ZipFile(temp_file, "r") as zip_ref: 160 zip_contents = zip_ref.namelist() 161 162 # Get all metadata files and determine primary dataset 163 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 164 if not metadata_files: 165 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 166 return 167 168 # Get the primary dataset 169 primary_dataset_keys = set() 170 datasets = [] 171 parent_child_mapping = {} 172 for file in metadata_files: 173 with zip_ref.open(file) as f: 174 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 175 metadata = json.loads(content) 176 if not metadata.get("key_parent"): 177 primary_dataset_keys.add(metadata.get("key")) 178 datasets.append(metadata) 179 else: 180 # Store the mapping of parent to child datasets 181 parent_key = metadata.get("key_parent") 182 if parent_key not in parent_child_mapping: 183 parent_child_mapping[parent_key] = [] 184 parent_child_mapping[parent_key].append(metadata) 185 186 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 187 if len(primary_dataset_keys) != 1: 188 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 189 return 190 191 # Import datasets 192 while datasets: 193 self.halt_and_catch_fire() 194 195 # Create the datasets 196 metadata = datasets.pop(0) 197 dataset_key = metadata.get("key") 198 processed_metadata = self.process_metadata(metadata) 199 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 200 processed_files += 1 201 202 # Copy the log file 203 self.halt_and_catch_fire() 204 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 205 if log_filename in zip_contents: 206 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 207 with zip_ref.open(log_filename) as f: 208 content = f.read().decode('utf-8') 209 if new_dataset.key == self.dataset.key: 210 # Hold the original log for the primary dataset and add at the end 211 primary_dataset_original_log = content 212 else: 213 new_dataset.log("Original dataset log included below:") 214 with new_dataset.get_log_path().open("a") as outfile: 215 outfile.write(content) 216 processed_files += 1 217 else: 218 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 219 220 # Copy the results 221 self.halt_and_catch_fire() 222 results_filename = metadata["result_file"] 223 if results_filename in zip_contents: 224 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 225 with zip_ref.open(results_filename) as f: 226 with new_dataset.get_results_path().open("wb") as outfile: 227 outfile.write(f.read()) 228 processed_files += 1 229 230 if not imported: 231 # first dataset - use num rows as 'overall' 232 num_rows = metadata["num_rows"] 233 else: 234 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 235 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 236 failed_imports.append(dataset_key) 237 continue 238 239 # finally, the kids 240 self.halt_and_catch_fire() 241 if dataset_key in parent_child_mapping: 242 datasets.extend(parent_child_mapping[dataset_key]) 243 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 244 245 # done - remember that we've imported this one 246 imported.append(new_dataset) 247 new_dataset.update_status(metadata["status"]) 248 249 if new_dataset.key != self.dataset.key: 250 # only finish if this is not the 'main' dataset, or the user 251 # will think the whole import is done 252 new_dataset.finish(metadata["num_rows"]) 253 254 # Check that all files were processed 255 missed_files = [] 256 if len(zip_contents) != processed_files: 257 for file in zip_contents: 258 if file not in processed_files: 259 missed_files.append(file) 260 261 # todo: this part needs updating if/when we support importing multiple datasets! 262 if failed_imports: 263 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 264 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 265 f"dataset log file for details.", is_final=True) 266 elif missed_files: 267 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 268 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 269 f"{len(missed_files)} files were not successfully imported. Check the " 270 f"dataset log file for details.", is_final=True) 271 else: 272 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 273 is_final=True) 274 275 if not self.dataset.is_finished(): 276 # now all related datasets are imported, we can finish the 'main' 277 # dataset, and the user will be alerted that the full import is 278 # complete 279 self.dataset.finish(num_rows) 280 281 # Add the original log for the primary dataset 282 if primary_dataset_original_log: 283 self.dataset.log("Original dataset log included below:\n") 284 with self.dataset.get_log_path().open("a") as outfile: 285 outfile.write(primary_dataset_original_log)
Import 4CAT dataset from a ZIP file
288 @staticmethod 289 def process_metadata(metadata): 290 """ 291 Process metadata for import 292 293 :param dict metadata: Metadata of import 294 :return: Relevant metadata, or `None` if parsing error 295 """ 296 # get rid of some keys that are server-specific and don't need to 297 # be stored (or don't correspond to database columns) 298 try: 299 metadata.pop("current_4CAT_version") 300 metadata.pop("id") 301 metadata.pop("job") 302 metadata.pop("is_private") 303 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 304 305 # extra params are stored as JSON... 306 metadata["parameters"] = json.loads(metadata["parameters"]) 307 if "copied_from" in metadata["parameters"]: 308 metadata["parameters"].pop("copied_from") 309 metadata["parameters"] = json.dumps(metadata["parameters"]) 310 311 except (ValueError, KeyError): 312 # we don't need all this metadata but it still should be present, 313 # otherwise something is wrong with the data format 314 return None 315 316 return metadata
Process metadata for import
Parameters
- dict metadata: Metadata of import
Returns
Relevant metadata, or
Noneif parsing error
318 def create_dataset(self, metadata, original_key, primary=False): 319 """ 320 Create a new dataset 321 """ 322 if primary: 323 self.dataset.update_status(f"Importing primary dataset {original_key}.") 324 # if this is the first dataset we're importing, make it the 325 # processor's "own" dataset. the key has already been set to 326 # the imported dataset's key via ensure_key() (or a new unqiue 327 # key if it already existed on this server) 328 # by making it the "own" dataset, the user initiating the 329 # import will see the imported dataset as the "result" of their 330 # import query in the interface, similar to the workflow for 331 # other data sources 332 new_dataset = self.dataset 333 334 # Update metadata and file 335 metadata.pop("key") # key already OK (see above) 336 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 337 338 else: 339 self.dataset.update_status(f"Importing child dataset {original_key}.") 340 # supernumerary datasets - handle on their own 341 # these include any children of imported datasets 342 try: 343 DataSet(key=metadata["key"], db=self.db, modules=self.modules) 344 345 # if we *haven't* thrown a DatasetException now, then the 346 # key is already in use, so create a "dummy" dataset and 347 # overwrite it with the metadata we have (except for the 348 # key). this ensures that a new unique key will be 349 # generated. 350 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 351 metadata.pop("key") 352 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 353 354 except DataSetException: 355 # this is *good* since it means the key doesn't exist, so 356 # we can re-use the key of the imported dataset 357 self.db.insert("datasets", data=metadata) 358 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 359 360 if new_dataset.key != original_key: 361 # could not use original key because it was already in use 362 # so update any references to use the new key 363 self.remapped_keys[original_key] = new_dataset.key 364 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 365 f"{new_dataset.key} instead of key {original_key}!") 366 367 # refresh object, make sure it's in sync with the database 368 self.created_datasets.add(new_dataset.key) 369 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 370 current_log = None 371 if new_dataset.key == self.dataset.key: 372 # this ensures that the first imported dataset becomes the 373 # processor's "own" dataset, and that the import logs go to 374 # that dataset's log file. For later imports, this evaluates to 375 # False. 376 377 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 378 current_log = self.dataset.get_log_path().read_text() 379 # Update the dataset 380 self.dataset = new_dataset 381 382 # if the key of the parent dataset was changed, change the 383 # reference to it that the child dataset has 384 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 385 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 386 387 # update some attributes that should come from the new server, not 388 # the old 389 new_dataset.creator = self.dataset_owner 390 new_dataset.original_timestamp = new_dataset.timestamp 391 new_dataset.imported = True 392 new_dataset.timestamp = int(time.time()) 393 new_dataset.db.commit() 394 395 # make sure the dataset path uses the new key and local dataset 396 # path settings. this also makes sure the log file is created in 397 # the right place (since it is derived from the results file path) 398 extension = metadata["result_file"].split(".")[-1] 399 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 400 if not updated: 401 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 402 403 if current_log: 404 # Add the current log to the new dataset 405 with new_dataset.get_log_path().open("a") as outfile: 406 outfile.write(current_log) 407 408 return new_dataset
Create a new dataset
411 def process_urls(self): 412 """ 413 Import 4CAT dataset from another 4CAT server 414 415 Interfaces with another 4CAT server to transfer a dataset's metadata, 416 data files and child datasets. 417 """ 418 urls = [url.strip() for url in self.parameters.get("url").split(",")] 419 self.base = urls[0].split("/results/")[0] 420 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 421 api_key = self.parameters.get("api-key") 422 423 imported = [] # successfully imported datasets 424 failed_imports = [] # keys that failed to import 425 num_rows = 0 # will be used later 426 427 # we can add support for multiple datasets later by removing 428 # this part! 429 keys = [keys[0]] 430 431 while keys: 432 dataset_key = keys.pop(0) 433 434 self.halt_and_catch_fire() 435 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 436 437 # first, metadata! 438 try: 439 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 440 metadata = metadata.json() 441 except FourcatImportException as e: 442 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 443 continue 444 except ValueError: 445 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 446 continue 447 448 # copying empty datasets doesn't really make sense 449 if metadata["num_rows"] == 0: 450 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 451 failed_imports.append(dataset_key) 452 continue 453 454 metadata = self.process_metadata(metadata) 455 if metadata is None: 456 self.dataset.update_status(f"Metadata for dataset {dataset_key} incomplete, skipping") 457 failed_imports.append(dataset_key) 458 continue 459 460 # create the new dataset 461 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 462 463 # then, the log 464 self.halt_and_catch_fire() 465 try: 466 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 467 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 468 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 469 logpath = new_dataset.get_log_path() 470 new_dataset.log("Original dataset log included below:") 471 with logpath.open("a") as outfile: 472 outfile.write(log.text) 473 except FourcatImportException as e: 474 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 475 failed_imports.append(dataset_key) 476 continue 477 except ValueError: 478 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 479 failed_imports.append(dataset_key) 480 continue 481 482 # then, the results 483 self.halt_and_catch_fire() 484 try: 485 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 486 datapath = new_dataset.get_results_path() 487 SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 488 489 if not imported: 490 # first dataset - use num rows as 'overall' 491 num_rows = metadata["num_rows"] 492 493 except FourcatImportException as e: 494 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 495 if new_dataset.key != self.dataset.key: 496 new_dataset.delete() 497 continue 498 499 except ValueError: 500 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 501 failed_imports.append(dataset_key) 502 continue 503 504 # finally, the kids 505 self.halt_and_catch_fire() 506 try: 507 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 508 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 509 children = children.json() 510 except FourcatImportException as e: 511 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 512 failed_imports.append(dataset_key) 513 continue 514 except ValueError: 515 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 516 failed_imports.append(dataset_key) 517 continue 518 519 for child in children: 520 keys.append(child) 521 self.dataset.log(f"Adding child dataset {child} to import queue") 522 523 # done - remember that we've imported this one 524 imported.append(new_dataset) 525 new_dataset.update_status(metadata["status"]) 526 527 if new_dataset.key != self.dataset.key: 528 # only finish if this is not the 'main' dataset, or the user 529 # will think the whole import is done 530 new_dataset.finish(metadata["num_rows"]) 531 532 # todo: this part needs updating if/when we support importing multiple datasets! 533 if failed_imports: 534 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 535 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 536 f"dataset log file for details.", is_final=True) 537 else: 538 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 539 is_final=True) 540 541 if not self.dataset.is_finished(): 542 # now all related datasets are imported, we can finish the 'main' 543 # dataset, and the user will be alerted that the full import is 544 # complete 545 self.dataset.finish(num_rows)
Import 4CAT dataset from another 4CAT server
Interfaces with another 4CAT server to transfer a dataset's metadata, data files and child datasets.
547 def halt_and_catch_fire(self): 548 """ 549 Clean up on interrupt 550 551 There are multiple places in the code where we can bail out on an 552 interrupt, so abstract that away in its own function. 553 :return: 554 """ 555 if self.interrupted: 556 # resuming is impossible because the original dataset (which 557 # has the list of URLs to import) has probably been 558 # overwritten by this point 559 deletables = [k for k in self.created_datasets if k != self.dataset.key] 560 for deletable in deletables: 561 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 562 563 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 564 f"will need to initiate the import again.") 565 566 raise ProcessorInterruptedException()
Clean up on interrupt
There are multiple places in the code where we can bail out on an interrupt, so abstract that away in its own function.
Returns
568 @staticmethod 569 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 570 """ 571 Get dataset component from 4CAT export API 572 573 :param str base: Server URL base to import from 574 :param str dataset_key: Key of dataset to import 575 :param str api_key: API authentication token 576 :param str component: Component to retrieve 577 :return: HTTP response object 578 """ 579 try: 580 if component == "data" and datapath: 581 # Stream data 582 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 583 headers={ 584 "User-Agent": "4cat/import", 585 "Authentication": api_key 586 }) as r: 587 r.raise_for_status() 588 with datapath.open("wb") as outfile: 589 for chunk in r.iter_content(chunk_size=8192): 590 outfile.write(chunk) 591 return r 592 else: 593 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 594 "User-Agent": "4cat/import", 595 "Authentication": api_key 596 }) 597 except requests.Timeout: 598 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 599 f"accessible to external connections and try again.") 600 except requests.RequestException as e: 601 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 602 f"accessible to external connections and try again.") 603 604 if response.status_code == 404: 605 raise FourcatImportException( 606 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 607 f"a valid dataset.") 608 elif response.status_code in (401, 403): 609 raise FourcatImportException( 610 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 611 f"dataset and are using the correct API key.") 612 elif response.status_code != 200: 613 raise FourcatImportException( 614 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 615 616 return response
Get dataset component from 4CAT export API
Parameters
- str base: Server URL base to import from
- str dataset_key: Key of dataset to import
- str api_key: API authentication token
- str component: Component to retrieve
Returns
HTTP response object
618 @staticmethod 619 def validate_query(query, request, config): 620 """ 621 Validate custom data input 622 623 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 624 some metadata. 625 626 :param dict query: Query parameters, from client-side. 627 :param request: Flask request 628 :param ConfigManager|None config: Configuration reader (context-aware) 629 :return dict: Safe query parameters 630 """ 631 if query.get("method") == "zip": 632 filename = "" 633 if "option-data_upload-entries" in request.form: 634 # First pass sends list of files in the zip 635 pass 636 elif "option-data_upload" in request.files: 637 # Second pass sends the actual file 638 file = request.files["option-data_upload"] 639 if not file: 640 raise QueryParametersException("No file uploaded.") 641 642 if not file.filename.endswith(".zip"): 643 raise QueryParametersException("Uploaded file must be a ZIP file.") 644 645 filename = file.filename 646 else: 647 raise QueryParametersException("No file was offered for upload.") 648 649 return { 650 "method": "zip", 651 "filename": filename 652 } 653 elif query.get("method") == "url": 654 urls = query.get("url") 655 if not urls: 656 raise QueryParametersException("Provide at least one dataset URL.") 657 658 urls = urls.split(",") 659 bases = set([url.split("/results/")[0].lower() for url in urls]) 660 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 661 662 if len(keys) != 1: 663 # todo: change this to < 1 if we allow multiple datasets 664 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 665 666 if len(bases) != 1: 667 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 668 "one 4CAT server at a time.") 669 670 base = urls[0].split("/results/")[0] 671 try: 672 # test if API key is valid and server is reachable 673 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 674 except FourcatImportException as e: 675 raise QueryParametersException(str(e)) 676 677 try: 678 # test if we get a response we can parse 679 metadata = test.json() 680 except ValueError: 681 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 682 683 version = get_software_version() 684 685 if metadata.get("current_4CAT_version") != version: 686 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 687 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 688 "sure both are running the same version of 4CAT and try again.") 689 690 # OK, we can import at least one dataset 691 return { 692 "url": ",".join(urls), 693 "api-key": query.get("api-key") 694 } 695 else: 696 raise QueryParametersException("Import method not yet implemented.")
Validate custom data input
Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
698 @staticmethod 699 def get_keys_from_urls(urls): 700 """ 701 Get dataset keys from 4CAT URLs 702 703 :param list urls: List of URLs 704 :return list: List of keys 705 """ 706 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
Get dataset keys from 4CAT URLs
Parameters
- list urls: List of URLs
Returns
List of keys
708 @staticmethod 709 def ensure_key(query): 710 """ 711 Determine key for dataset generated by this processor 712 713 When importing datasets, it's necessary to determine the key of the 714 dataset that is created before it is actually created, because we want 715 to keep the original key of the imported dataset if possible. Luckily, 716 we can deduce it from the URL we're importing the dataset from. 717 718 :param dict query: Input from the user, through the front-end 719 :return str: Desired dataset key 720 """ 721 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 722 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 723 urls = query.get("url", "").split(",") 724 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 725 return keys[0]
Determine key for dataset generated by this processor
When importing datasets, it's necessary to determine the key of the dataset that is created before it is actually created, because we want to keep the original key of the imported dataset if possible. Luckily, we can deduce it from the URL we're importing the dataset from.
Parameters
- dict query: Input from the user, through the front-end
Returns
Desired dataset key
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- run_interruptable_process
- get_queue_id
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- extension
- config
- is_running_in_preset
- filepath
- for_cleanup
- work
- after_process
- clean_up_on_error
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor