datasources.fourcat_import.import_4cat
Import datasets from other 4CATs
1""" 2Import datasets from other 4CATs 3""" 4import requests 5import json 6import time 7import zipfile 8from pathlib import Path 9from psycopg2.errors import InFailedSqlTransaction 10 11from backend.lib.processor import BasicProcessor 12from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException, 13 DataSetException) 14from common.lib.helpers import UserInput, get_software_version 15from common.lib.dataset import DataSet 16 17 18class FourcatImportException(FourcatException): 19 pass 20 21 22class SearchImportFromFourcat(BasicProcessor): 23 type = "import_4cat-search" # job ID 24 category = "Search" # category 25 title = "Import 4CAT dataset and analyses" # title displayed in UI 26 description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)" # description displayed in UI 27 is_local = False # Whether this datasource is locally scraped 28 is_static = False # Whether this datasource is still updated 29 30 max_workers = 1 # this cannot be more than 1, else things get VERY messy 31 32 options = { 33 "intro": { 34 "type": UserInput.OPTION_INFO, 35 "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. " 36 "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " 37 "You can find the current version in the footer at the bottom of the interface." 38 }, 39 "method": { 40 "type": UserInput.OPTION_CHOICE, 41 "help": "Import Type", 42 "options": { 43 "zip": "Zip File", 44 "url": "4CAT URL", 45 }, 46 "default": "url" 47 }, 48 "url": { 49 "type": UserInput.OPTION_TEXT, 50 "help": "Dataset URL", 51 "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 52 "requires": "method^=url" 53 }, 54 "intro2": { 55 "type": UserInput.OPTION_INFO, 56 "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " 57 "an API key from **the server you are importing from**, not the one you are looking at right now. " 58 "Additionally, you need to have owner access to the dataset you want to import.", 59 "requires": "method^=url" 60 }, 61 "api-key": { 62 "type": UserInput.OPTION_TEXT, 63 "help": "4CAT API Key", 64 "sensitive": True, 65 "cache": True, 66 "requires": "method^=url" 67 }, 68 "data_upload": { 69 "type": UserInput.OPTION_FILE, 70 "help": "File", 71 "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", 72 "requires": "method^=zip" 73 }, 74 75 } 76 77 created_datasets = None 78 base = None 79 remapped_keys = None 80 dataset_owner = None 81 82 def process(self): 83 """ 84 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 85 """ 86 self.created_datasets = set() # keys of created datasets - may not be successful! 87 self.remapped_keys = {} # changed dataset keys 88 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 89 try: 90 if self.parameters.get("method") == "zip": 91 self.process_zip() 92 else: 93 self.process_urls() 94 except Exception as e: 95 # Catch all exceptions and finish the job with an error 96 # Resuming is impossible because this dataset was overwritten with the importing dataset 97 # halt_and_catch_fire() will clean up and delete the datasets that were created 98 self.interrupted = True 99 try: 100 self.halt_and_catch_fire() 101 except ProcessorInterruptedException: 102 pass 103 except InFailedSqlTransaction: 104 # Catch database issue and retry 105 self.db.rollback() 106 try: 107 self.halt_and_catch_fire() 108 except ProcessorInterruptedException: 109 pass 110 # Reraise the original exception for logging 111 raise e 112 113 def after_create(query, dataset, request): 114 """ 115 Hook to execute after the dataset for this source has been created 116 117 In this case, put the file in a temporary location so it can be 118 processed properly by the related Job later. 119 120 :param dict query: Sanitised query parameters 121 :param DataSet dataset: Dataset created for this query 122 :param request: Flask request submitted for its creation 123 """ 124 if query.get("method") == "zip": 125 file = request.files["option-data_upload"] 126 file.seek(0) 127 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 128 while True: 129 chunk = file.read(1024) 130 if len(chunk) == 0: 131 break 132 outfile.write(chunk) 133 else: 134 # nothing to do for URLs 135 pass 136 137 138 def process_zip(self): 139 """ 140 Import 4CAT dataset from a ZIP file 141 """ 142 self.dataset.update_status("Importing datasets and analyses from ZIP file.") 143 temp_file = self.dataset.get_results_path().with_suffix(".importing") 144 145 imported = [] 146 processed_files = 1 # take into account the export.log file 147 failed_imports = [] 148 primary_dataset_original_log = None 149 with zipfile.ZipFile(temp_file, "r") as zip_ref: 150 zip_contents = zip_ref.namelist() 151 152 # Get all metadata files and determine primary dataset 153 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 154 if not metadata_files: 155 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 156 return 157 158 # Get the primary dataset 159 primary_dataset_keys = set() 160 datasets = [] 161 parent_child_mapping = {} 162 for file in metadata_files: 163 with zip_ref.open(file) as f: 164 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 165 metadata = json.loads(content) 166 if not metadata.get("key_parent"): 167 primary_dataset_keys.add(metadata.get("key")) 168 datasets.append(metadata) 169 else: 170 # Store the mapping of parent to child datasets 171 parent_key = metadata.get("key_parent") 172 if parent_key not in parent_child_mapping: 173 parent_child_mapping[parent_key] = [] 174 parent_child_mapping[parent_key].append(metadata) 175 176 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 177 if len(primary_dataset_keys) != 1: 178 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 179 return 180 181 # Import datasets 182 while datasets: 183 self.halt_and_catch_fire() 184 185 # Create the datasets 186 metadata = datasets.pop(0) 187 dataset_key = metadata.get("key") 188 processed_metadata = self.process_metadata(metadata) 189 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 190 processed_files += 1 191 192 # Copy the log file 193 self.halt_and_catch_fire() 194 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 195 if log_filename in zip_contents: 196 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 197 with zip_ref.open(log_filename) as f: 198 content = f.read().decode('utf-8') 199 if new_dataset.key == self.dataset.key: 200 # Hold the original log for the primary dataset and add at the end 201 primary_dataset_original_log = content 202 else: 203 new_dataset.log("Original dataset log included below:") 204 with new_dataset.get_log_path().open("a") as outfile: 205 outfile.write(content) 206 processed_files += 1 207 else: 208 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 209 210 # Copy the results 211 self.halt_and_catch_fire() 212 results_filename = metadata["result_file"] 213 if results_filename in zip_contents: 214 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 215 with zip_ref.open(results_filename) as f: 216 with new_dataset.get_results_path().open("wb") as outfile: 217 outfile.write(f.read()) 218 processed_files += 1 219 220 if not imported: 221 # first dataset - use num rows as 'overall' 222 num_rows = metadata["num_rows"] 223 else: 224 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 225 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 226 failed_imports.append(dataset_key) 227 continue 228 229 # finally, the kids 230 self.halt_and_catch_fire() 231 if dataset_key in parent_child_mapping: 232 datasets.extend(parent_child_mapping[dataset_key]) 233 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 234 235 # done - remember that we've imported this one 236 imported.append(new_dataset) 237 new_dataset.update_status(metadata["status"]) 238 239 if new_dataset.key != self.dataset.key: 240 # only finish if this is not the 'main' dataset, or the user 241 # will think the whole import is done 242 new_dataset.finish(metadata["num_rows"]) 243 244 # Check that all files were processed 245 missed_files = [] 246 if len(zip_contents) != processed_files: 247 for file in zip_contents: 248 if file not in processed_files: 249 missed_files.append(file) 250 251 # todo: this part needs updating if/when we support importing multiple datasets! 252 if failed_imports: 253 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 254 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 255 f"dataset log file for details.", is_final=True) 256 elif missed_files: 257 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 258 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 259 f"{len(missed_files)} files were not successfully imported. Check the " 260 f"dataset log file for details.", is_final=True) 261 else: 262 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 263 is_final=True) 264 265 if not self.dataset.is_finished(): 266 # now all related datasets are imported, we can finish the 'main' 267 # dataset, and the user will be alerted that the full import is 268 # complete 269 self.dataset.finish(num_rows) 270 271 # Add the original log for the primary dataset 272 if primary_dataset_original_log: 273 self.dataset.log("Original dataset log included below:\n") 274 with self.dataset.get_log_path().open("a") as outfile: 275 outfile.write(primary_dataset_original_log) 276 277 278 @staticmethod 279 def process_metadata(metadata): 280 """ 281 Process metadata for import 282 """ 283 # get rid of some keys that are server-specific and don't need to 284 # be stored (or don't correspond to database columns) 285 metadata.pop("current_4CAT_version") 286 metadata.pop("id") 287 metadata.pop("job") 288 metadata.pop("is_private") 289 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 290 291 # extra params are stored as JSON... 292 metadata["parameters"] = json.loads(metadata["parameters"]) 293 if "copied_from" in metadata["parameters"]: 294 metadata["parameters"].pop("copied_from") 295 metadata["parameters"] = json.dumps(metadata["parameters"]) 296 297 return metadata 298 299 def create_dataset(self, metadata, original_key, primary=False): 300 """ 301 Create a new dataset 302 """ 303 if primary: 304 self.dataset.update_status(f"Importing primary dataset {original_key}.") 305 # if this is the first dataset we're importing, make it the 306 # processor's "own" dataset. the key has already been set to 307 # the imported dataset's key via ensure_key() (or a new unqiue 308 # key if it already existed on this server) 309 # by making it the "own" dataset, the user initiating the 310 # import will see the imported dataset as the "result" of their 311 # import query in the interface, similar to the workflow for 312 # other data sources 313 new_dataset = self.dataset 314 315 # Update metadata and file 316 metadata.pop("key") # key already OK (see above) 317 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 318 319 else: 320 self.dataset.update_status(f"Importing child dataset {original_key}.") 321 # supernumerary datasets - handle on their own 322 # these include any children of imported datasets 323 try: 324 DataSet(key=metadata["key"], db=self.db, modules=self.modules) 325 326 # if we *haven't* thrown a DatasetException now, then the 327 # key is already in use, so create a "dummy" dataset and 328 # overwrite it with the metadata we have (except for the 329 # key). this ensures that a new unique key will be 330 # generated. 331 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 332 metadata.pop("key") 333 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 334 335 except DataSetException: 336 # this is *good* since it means the key doesn't exist, so 337 # we can re-use the key of the imported dataset 338 self.db.insert("datasets", data=metadata) 339 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 340 341 if new_dataset.key != original_key: 342 # could not use original key because it was already in use 343 # so update any references to use the new key 344 self.remapped_keys[original_key] = new_dataset.key 345 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 346 f"{new_dataset.key} instead of key {original_key}!") 347 348 # refresh object, make sure it's in sync with the database 349 self.created_datasets.add(new_dataset.key) 350 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 351 current_log = None 352 if new_dataset.key == self.dataset.key: 353 # this ensures that the first imported dataset becomes the 354 # processor's "own" dataset, and that the import logs go to 355 # that dataset's log file. For later imports, this evaluates to 356 # False. 357 358 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 359 current_log = self.dataset.get_log_path().read_text() 360 # Update the dataset 361 self.dataset = new_dataset 362 363 # if the key of the parent dataset was changed, change the 364 # reference to it that the child dataset has 365 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 366 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 367 368 # update some attributes that should come from the new server, not 369 # the old 370 new_dataset.creator = self.dataset_owner 371 new_dataset.original_timestamp = new_dataset.timestamp 372 new_dataset.imported = True 373 new_dataset.timestamp = int(time.time()) 374 new_dataset.db.commit() 375 376 # make sure the dataset path uses the new key and local dataset 377 # path settings. this also makes sure the log file is created in 378 # the right place (since it is derived from the results file path) 379 extension = metadata["result_file"].split(".")[-1] 380 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 381 if not updated: 382 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 383 384 if current_log: 385 # Add the current log to the new dataset 386 with new_dataset.get_log_path().open("a") as outfile: 387 outfile.write(current_log) 388 389 return new_dataset 390 391 392 def process_urls(self): 393 """ 394 Import 4CAT dataset from another 4CAT server 395 396 Interfaces with another 4CAT server to transfer a dataset's metadata, 397 data files and child datasets. 398 """ 399 urls = [url.strip() for url in self.parameters.get("url").split(",")] 400 self.base = urls[0].split("/results/")[0] 401 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 402 api_key = self.parameters.get("api-key") 403 404 imported = [] # successfully imported datasets 405 failed_imports = [] # keys that failed to import 406 num_rows = 0 # will be used later 407 408 # we can add support for multiple datasets later by removing 409 # this part! 410 keys = [keys[0]] 411 412 while keys: 413 dataset_key = keys.pop(0) 414 415 self.halt_and_catch_fire() 416 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 417 418 # first, metadata! 419 try: 420 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 421 metadata = metadata.json() 422 except FourcatImportException as e: 423 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 424 continue 425 except ValueError: 426 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 427 continue 428 429 # copying empty datasets doesn't really make sense 430 if metadata["num_rows"] == 0: 431 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 432 failed_imports.append(dataset_key) 433 continue 434 435 metadata = self.process_metadata(metadata) 436 437 # create the new dataset 438 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 439 440 # then, the log 441 self.halt_and_catch_fire() 442 try: 443 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 444 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 445 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 446 logpath = new_dataset.get_log_path() 447 new_dataset.log("Original dataset log included below:") 448 with logpath.open("a") as outfile: 449 outfile.write(log.text) 450 except FourcatImportException as e: 451 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 452 failed_imports.append(dataset_key) 453 continue 454 except ValueError: 455 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 456 failed_imports.append(dataset_key) 457 continue 458 459 # then, the results 460 self.halt_and_catch_fire() 461 try: 462 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 463 datapath = new_dataset.get_results_path() 464 SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 465 466 if not imported: 467 # first dataset - use num rows as 'overall' 468 num_rows = metadata["num_rows"] 469 470 except FourcatImportException as e: 471 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 472 if new_dataset.key != self.dataset.key: 473 new_dataset.delete() 474 continue 475 476 except ValueError: 477 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 478 failed_imports.append(dataset_key) 479 continue 480 481 # finally, the kids 482 self.halt_and_catch_fire() 483 try: 484 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 485 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 486 children = children.json() 487 except FourcatImportException as e: 488 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 489 failed_imports.append(dataset_key) 490 continue 491 except ValueError: 492 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 493 failed_imports.append(dataset_key) 494 continue 495 496 for child in children: 497 keys.append(child) 498 self.dataset.log(f"Adding child dataset {child} to import queue") 499 500 # done - remember that we've imported this one 501 imported.append(new_dataset) 502 new_dataset.update_status(metadata["status"]) 503 504 if new_dataset.key != self.dataset.key: 505 # only finish if this is not the 'main' dataset, or the user 506 # will think the whole import is done 507 new_dataset.finish(metadata["num_rows"]) 508 509 # todo: this part needs updating if/when we support importing multiple datasets! 510 if failed_imports: 511 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 512 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 513 f"dataset log file for details.", is_final=True) 514 else: 515 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 516 is_final=True) 517 518 if not self.dataset.is_finished(): 519 # now all related datasets are imported, we can finish the 'main' 520 # dataset, and the user will be alerted that the full import is 521 # complete 522 self.dataset.finish(num_rows) 523 524 def halt_and_catch_fire(self): 525 """ 526 Clean up on interrupt 527 528 There are multiple places in the code where we can bail out on an 529 interrupt, so abstract that away in its own function. 530 :return: 531 """ 532 if self.interrupted: 533 # resuming is impossible because the original dataset (which 534 # has the list of URLs to import) has probably been 535 # overwritten by this point 536 deletables = [k for k in self.created_datasets if k != self.dataset.key] 537 for deletable in deletables: 538 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 539 540 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 541 f"will need to initiate the import again.") 542 543 raise ProcessorInterruptedException() 544 545 @staticmethod 546 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 547 """ 548 Get dataset component from 4CAT export API 549 550 :param str base: Server URL base to import from 551 :param str dataset_key: Key of dataset to import 552 :param str api_key: API authentication token 553 :param str component: Component to retrieve 554 :return: HTTP response object 555 """ 556 try: 557 if component == "data" and datapath: 558 # Stream data 559 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 560 headers={ 561 "User-Agent": "4cat/import", 562 "Authentication": api_key 563 }) as r: 564 r.raise_for_status() 565 with datapath.open("wb") as outfile: 566 for chunk in r.iter_content(chunk_size=8192): 567 outfile.write(chunk) 568 return r 569 else: 570 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 571 "User-Agent": "4cat/import", 572 "Authentication": api_key 573 }) 574 except requests.Timeout: 575 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 576 f"accessible to external connections and try again.") 577 except requests.RequestException as e: 578 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 579 f"accessible to external connections and try again.") 580 581 if response.status_code == 404: 582 raise FourcatImportException( 583 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 584 f"a valid dataset.") 585 elif response.status_code in (401, 403): 586 raise FourcatImportException( 587 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 588 f"dataset and are using the correct API key.") 589 elif response.status_code != 200: 590 raise FourcatImportException( 591 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 592 593 return response 594 595 @staticmethod 596 def validate_query(query, request, config): 597 """ 598 Validate custom data input 599 600 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 601 some metadata. 602 603 :param dict query: Query parameters, from client-side. 604 :param request: Flask request 605 :param ConfigManager|None config: Configuration reader (context-aware) 606 :return dict: Safe query parameters 607 """ 608 if query.get("method") == "zip": 609 filename = "" 610 if "option-data_upload-entries" in request.form: 611 # First pass sends list of files in the zip 612 pass 613 elif "option-data_upload" in request.files: 614 # Second pass sends the actual file 615 file = request.files["option-data_upload"] 616 if not file: 617 raise QueryParametersException("No file uploaded.") 618 619 if not file.filename.endswith(".zip"): 620 raise QueryParametersException("Uploaded file must be a ZIP file.") 621 622 filename = file.filename 623 else: 624 raise QueryParametersException("No file was offered for upload.") 625 626 return { 627 "method": "zip", 628 "filename": filename 629 } 630 elif query.get("method") == "url": 631 urls = query.get("url") 632 if not urls: 633 raise QueryParametersException("Provide at least one dataset URL.") 634 635 urls = urls.split(",") 636 bases = set([url.split("/results/")[0].lower() for url in urls]) 637 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 638 639 if len(keys) != 1: 640 # todo: change this to < 1 if we allow multiple datasets 641 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 642 643 if len(bases) != 1: 644 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 645 "one 4CAT server at a time.") 646 647 base = urls[0].split("/results/")[0] 648 try: 649 # test if API key is valid and server is reachable 650 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 651 except FourcatImportException as e: 652 raise QueryParametersException(str(e)) 653 654 try: 655 # test if we get a response we can parse 656 metadata = test.json() 657 except ValueError: 658 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 659 660 version = get_software_version() 661 662 if metadata.get("current_4CAT_version") != version: 663 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 664 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 665 "sure both are running the same version of 4CAT and try again.") 666 667 # OK, we can import at least one dataset 668 return { 669 "url": ",".join(urls), 670 "api-key": query.get("api-key") 671 } 672 else: 673 raise QueryParametersException("Import method not yet implemented.") 674 675 @staticmethod 676 def get_keys_from_urls(urls): 677 """ 678 Get dataset keys from 4CAT URLs 679 680 :param list urls: List of URLs 681 :return list: List of keys 682 """ 683 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls] 684 685 @staticmethod 686 def ensure_key(query): 687 """ 688 Determine key for dataset generated by this processor 689 690 When importing datasets, it's necessary to determine the key of the 691 dataset that is created before it is actually created, because we want 692 to keep the original key of the imported dataset if possible. Luckily, 693 we can deduce it from the URL we're importing the dataset from. 694 695 :param dict query: Input from the user, through the front-end 696 :return str: Desired dataset key 697 """ 698 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 699 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 700 urls = query.get("url", "").split(",") 701 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 702 return keys[0]
Base 4CAT exception class
Inherited Members
23class SearchImportFromFourcat(BasicProcessor): 24 type = "import_4cat-search" # job ID 25 category = "Search" # category 26 title = "Import 4CAT dataset and analyses" # title displayed in UI 27 description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)" # description displayed in UI 28 is_local = False # Whether this datasource is locally scraped 29 is_static = False # Whether this datasource is still updated 30 31 max_workers = 1 # this cannot be more than 1, else things get VERY messy 32 33 options = { 34 "intro": { 35 "type": UserInput.OPTION_INFO, 36 "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. " 37 "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " 38 "You can find the current version in the footer at the bottom of the interface." 39 }, 40 "method": { 41 "type": UserInput.OPTION_CHOICE, 42 "help": "Import Type", 43 "options": { 44 "zip": "Zip File", 45 "url": "4CAT URL", 46 }, 47 "default": "url" 48 }, 49 "url": { 50 "type": UserInput.OPTION_TEXT, 51 "help": "Dataset URL", 52 "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 53 "requires": "method^=url" 54 }, 55 "intro2": { 56 "type": UserInput.OPTION_INFO, 57 "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " 58 "an API key from **the server you are importing from**, not the one you are looking at right now. " 59 "Additionally, you need to have owner access to the dataset you want to import.", 60 "requires": "method^=url" 61 }, 62 "api-key": { 63 "type": UserInput.OPTION_TEXT, 64 "help": "4CAT API Key", 65 "sensitive": True, 66 "cache": True, 67 "requires": "method^=url" 68 }, 69 "data_upload": { 70 "type": UserInput.OPTION_FILE, 71 "help": "File", 72 "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", 73 "requires": "method^=zip" 74 }, 75 76 } 77 78 created_datasets = None 79 base = None 80 remapped_keys = None 81 dataset_owner = None 82 83 def process(self): 84 """ 85 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 86 """ 87 self.created_datasets = set() # keys of created datasets - may not be successful! 88 self.remapped_keys = {} # changed dataset keys 89 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 90 try: 91 if self.parameters.get("method") == "zip": 92 self.process_zip() 93 else: 94 self.process_urls() 95 except Exception as e: 96 # Catch all exceptions and finish the job with an error 97 # Resuming is impossible because this dataset was overwritten with the importing dataset 98 # halt_and_catch_fire() will clean up and delete the datasets that were created 99 self.interrupted = True 100 try: 101 self.halt_and_catch_fire() 102 except ProcessorInterruptedException: 103 pass 104 except InFailedSqlTransaction: 105 # Catch database issue and retry 106 self.db.rollback() 107 try: 108 self.halt_and_catch_fire() 109 except ProcessorInterruptedException: 110 pass 111 # Reraise the original exception for logging 112 raise e 113 114 def after_create(query, dataset, request): 115 """ 116 Hook to execute after the dataset for this source has been created 117 118 In this case, put the file in a temporary location so it can be 119 processed properly by the related Job later. 120 121 :param dict query: Sanitised query parameters 122 :param DataSet dataset: Dataset created for this query 123 :param request: Flask request submitted for its creation 124 """ 125 if query.get("method") == "zip": 126 file = request.files["option-data_upload"] 127 file.seek(0) 128 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 129 while True: 130 chunk = file.read(1024) 131 if len(chunk) == 0: 132 break 133 outfile.write(chunk) 134 else: 135 # nothing to do for URLs 136 pass 137 138 139 def process_zip(self): 140 """ 141 Import 4CAT dataset from a ZIP file 142 """ 143 self.dataset.update_status("Importing datasets and analyses from ZIP file.") 144 temp_file = self.dataset.get_results_path().with_suffix(".importing") 145 146 imported = [] 147 processed_files = 1 # take into account the export.log file 148 failed_imports = [] 149 primary_dataset_original_log = None 150 with zipfile.ZipFile(temp_file, "r") as zip_ref: 151 zip_contents = zip_ref.namelist() 152 153 # Get all metadata files and determine primary dataset 154 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 155 if not metadata_files: 156 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 157 return 158 159 # Get the primary dataset 160 primary_dataset_keys = set() 161 datasets = [] 162 parent_child_mapping = {} 163 for file in metadata_files: 164 with zip_ref.open(file) as f: 165 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 166 metadata = json.loads(content) 167 if not metadata.get("key_parent"): 168 primary_dataset_keys.add(metadata.get("key")) 169 datasets.append(metadata) 170 else: 171 # Store the mapping of parent to child datasets 172 parent_key = metadata.get("key_parent") 173 if parent_key not in parent_child_mapping: 174 parent_child_mapping[parent_key] = [] 175 parent_child_mapping[parent_key].append(metadata) 176 177 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 178 if len(primary_dataset_keys) != 1: 179 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 180 return 181 182 # Import datasets 183 while datasets: 184 self.halt_and_catch_fire() 185 186 # Create the datasets 187 metadata = datasets.pop(0) 188 dataset_key = metadata.get("key") 189 processed_metadata = self.process_metadata(metadata) 190 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 191 processed_files += 1 192 193 # Copy the log file 194 self.halt_and_catch_fire() 195 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 196 if log_filename in zip_contents: 197 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 198 with zip_ref.open(log_filename) as f: 199 content = f.read().decode('utf-8') 200 if new_dataset.key == self.dataset.key: 201 # Hold the original log for the primary dataset and add at the end 202 primary_dataset_original_log = content 203 else: 204 new_dataset.log("Original dataset log included below:") 205 with new_dataset.get_log_path().open("a") as outfile: 206 outfile.write(content) 207 processed_files += 1 208 else: 209 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 210 211 # Copy the results 212 self.halt_and_catch_fire() 213 results_filename = metadata["result_file"] 214 if results_filename in zip_contents: 215 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 216 with zip_ref.open(results_filename) as f: 217 with new_dataset.get_results_path().open("wb") as outfile: 218 outfile.write(f.read()) 219 processed_files += 1 220 221 if not imported: 222 # first dataset - use num rows as 'overall' 223 num_rows = metadata["num_rows"] 224 else: 225 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 226 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 227 failed_imports.append(dataset_key) 228 continue 229 230 # finally, the kids 231 self.halt_and_catch_fire() 232 if dataset_key in parent_child_mapping: 233 datasets.extend(parent_child_mapping[dataset_key]) 234 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 235 236 # done - remember that we've imported this one 237 imported.append(new_dataset) 238 new_dataset.update_status(metadata["status"]) 239 240 if new_dataset.key != self.dataset.key: 241 # only finish if this is not the 'main' dataset, or the user 242 # will think the whole import is done 243 new_dataset.finish(metadata["num_rows"]) 244 245 # Check that all files were processed 246 missed_files = [] 247 if len(zip_contents) != processed_files: 248 for file in zip_contents: 249 if file not in processed_files: 250 missed_files.append(file) 251 252 # todo: this part needs updating if/when we support importing multiple datasets! 253 if failed_imports: 254 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 255 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 256 f"dataset log file for details.", is_final=True) 257 elif missed_files: 258 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 259 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 260 f"{len(missed_files)} files were not successfully imported. Check the " 261 f"dataset log file for details.", is_final=True) 262 else: 263 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 264 is_final=True) 265 266 if not self.dataset.is_finished(): 267 # now all related datasets are imported, we can finish the 'main' 268 # dataset, and the user will be alerted that the full import is 269 # complete 270 self.dataset.finish(num_rows) 271 272 # Add the original log for the primary dataset 273 if primary_dataset_original_log: 274 self.dataset.log("Original dataset log included below:\n") 275 with self.dataset.get_log_path().open("a") as outfile: 276 outfile.write(primary_dataset_original_log) 277 278 279 @staticmethod 280 def process_metadata(metadata): 281 """ 282 Process metadata for import 283 """ 284 # get rid of some keys that are server-specific and don't need to 285 # be stored (or don't correspond to database columns) 286 metadata.pop("current_4CAT_version") 287 metadata.pop("id") 288 metadata.pop("job") 289 metadata.pop("is_private") 290 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 291 292 # extra params are stored as JSON... 293 metadata["parameters"] = json.loads(metadata["parameters"]) 294 if "copied_from" in metadata["parameters"]: 295 metadata["parameters"].pop("copied_from") 296 metadata["parameters"] = json.dumps(metadata["parameters"]) 297 298 return metadata 299 300 def create_dataset(self, metadata, original_key, primary=False): 301 """ 302 Create a new dataset 303 """ 304 if primary: 305 self.dataset.update_status(f"Importing primary dataset {original_key}.") 306 # if this is the first dataset we're importing, make it the 307 # processor's "own" dataset. the key has already been set to 308 # the imported dataset's key via ensure_key() (or a new unqiue 309 # key if it already existed on this server) 310 # by making it the "own" dataset, the user initiating the 311 # import will see the imported dataset as the "result" of their 312 # import query in the interface, similar to the workflow for 313 # other data sources 314 new_dataset = self.dataset 315 316 # Update metadata and file 317 metadata.pop("key") # key already OK (see above) 318 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 319 320 else: 321 self.dataset.update_status(f"Importing child dataset {original_key}.") 322 # supernumerary datasets - handle on their own 323 # these include any children of imported datasets 324 try: 325 DataSet(key=metadata["key"], db=self.db, modules=self.modules) 326 327 # if we *haven't* thrown a DatasetException now, then the 328 # key is already in use, so create a "dummy" dataset and 329 # overwrite it with the metadata we have (except for the 330 # key). this ensures that a new unique key will be 331 # generated. 332 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 333 metadata.pop("key") 334 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 335 336 except DataSetException: 337 # this is *good* since it means the key doesn't exist, so 338 # we can re-use the key of the imported dataset 339 self.db.insert("datasets", data=metadata) 340 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 341 342 if new_dataset.key != original_key: 343 # could not use original key because it was already in use 344 # so update any references to use the new key 345 self.remapped_keys[original_key] = new_dataset.key 346 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 347 f"{new_dataset.key} instead of key {original_key}!") 348 349 # refresh object, make sure it's in sync with the database 350 self.created_datasets.add(new_dataset.key) 351 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 352 current_log = None 353 if new_dataset.key == self.dataset.key: 354 # this ensures that the first imported dataset becomes the 355 # processor's "own" dataset, and that the import logs go to 356 # that dataset's log file. For later imports, this evaluates to 357 # False. 358 359 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 360 current_log = self.dataset.get_log_path().read_text() 361 # Update the dataset 362 self.dataset = new_dataset 363 364 # if the key of the parent dataset was changed, change the 365 # reference to it that the child dataset has 366 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 367 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 368 369 # update some attributes that should come from the new server, not 370 # the old 371 new_dataset.creator = self.dataset_owner 372 new_dataset.original_timestamp = new_dataset.timestamp 373 new_dataset.imported = True 374 new_dataset.timestamp = int(time.time()) 375 new_dataset.db.commit() 376 377 # make sure the dataset path uses the new key and local dataset 378 # path settings. this also makes sure the log file is created in 379 # the right place (since it is derived from the results file path) 380 extension = metadata["result_file"].split(".")[-1] 381 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 382 if not updated: 383 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 384 385 if current_log: 386 # Add the current log to the new dataset 387 with new_dataset.get_log_path().open("a") as outfile: 388 outfile.write(current_log) 389 390 return new_dataset 391 392 393 def process_urls(self): 394 """ 395 Import 4CAT dataset from another 4CAT server 396 397 Interfaces with another 4CAT server to transfer a dataset's metadata, 398 data files and child datasets. 399 """ 400 urls = [url.strip() for url in self.parameters.get("url").split(",")] 401 self.base = urls[0].split("/results/")[0] 402 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 403 api_key = self.parameters.get("api-key") 404 405 imported = [] # successfully imported datasets 406 failed_imports = [] # keys that failed to import 407 num_rows = 0 # will be used later 408 409 # we can add support for multiple datasets later by removing 410 # this part! 411 keys = [keys[0]] 412 413 while keys: 414 dataset_key = keys.pop(0) 415 416 self.halt_and_catch_fire() 417 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 418 419 # first, metadata! 420 try: 421 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 422 metadata = metadata.json() 423 except FourcatImportException as e: 424 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 425 continue 426 except ValueError: 427 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 428 continue 429 430 # copying empty datasets doesn't really make sense 431 if metadata["num_rows"] == 0: 432 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 433 failed_imports.append(dataset_key) 434 continue 435 436 metadata = self.process_metadata(metadata) 437 438 # create the new dataset 439 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 440 441 # then, the log 442 self.halt_and_catch_fire() 443 try: 444 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 445 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 446 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 447 logpath = new_dataset.get_log_path() 448 new_dataset.log("Original dataset log included below:") 449 with logpath.open("a") as outfile: 450 outfile.write(log.text) 451 except FourcatImportException as e: 452 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 453 failed_imports.append(dataset_key) 454 continue 455 except ValueError: 456 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 457 failed_imports.append(dataset_key) 458 continue 459 460 # then, the results 461 self.halt_and_catch_fire() 462 try: 463 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 464 datapath = new_dataset.get_results_path() 465 SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 466 467 if not imported: 468 # first dataset - use num rows as 'overall' 469 num_rows = metadata["num_rows"] 470 471 except FourcatImportException as e: 472 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 473 if new_dataset.key != self.dataset.key: 474 new_dataset.delete() 475 continue 476 477 except ValueError: 478 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 479 failed_imports.append(dataset_key) 480 continue 481 482 # finally, the kids 483 self.halt_and_catch_fire() 484 try: 485 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 486 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 487 children = children.json() 488 except FourcatImportException as e: 489 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 490 failed_imports.append(dataset_key) 491 continue 492 except ValueError: 493 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 494 failed_imports.append(dataset_key) 495 continue 496 497 for child in children: 498 keys.append(child) 499 self.dataset.log(f"Adding child dataset {child} to import queue") 500 501 # done - remember that we've imported this one 502 imported.append(new_dataset) 503 new_dataset.update_status(metadata["status"]) 504 505 if new_dataset.key != self.dataset.key: 506 # only finish if this is not the 'main' dataset, or the user 507 # will think the whole import is done 508 new_dataset.finish(metadata["num_rows"]) 509 510 # todo: this part needs updating if/when we support importing multiple datasets! 511 if failed_imports: 512 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 513 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 514 f"dataset log file for details.", is_final=True) 515 else: 516 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 517 is_final=True) 518 519 if not self.dataset.is_finished(): 520 # now all related datasets are imported, we can finish the 'main' 521 # dataset, and the user will be alerted that the full import is 522 # complete 523 self.dataset.finish(num_rows) 524 525 def halt_and_catch_fire(self): 526 """ 527 Clean up on interrupt 528 529 There are multiple places in the code where we can bail out on an 530 interrupt, so abstract that away in its own function. 531 :return: 532 """ 533 if self.interrupted: 534 # resuming is impossible because the original dataset (which 535 # has the list of URLs to import) has probably been 536 # overwritten by this point 537 deletables = [k for k in self.created_datasets if k != self.dataset.key] 538 for deletable in deletables: 539 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 540 541 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 542 f"will need to initiate the import again.") 543 544 raise ProcessorInterruptedException() 545 546 @staticmethod 547 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 548 """ 549 Get dataset component from 4CAT export API 550 551 :param str base: Server URL base to import from 552 :param str dataset_key: Key of dataset to import 553 :param str api_key: API authentication token 554 :param str component: Component to retrieve 555 :return: HTTP response object 556 """ 557 try: 558 if component == "data" and datapath: 559 # Stream data 560 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 561 headers={ 562 "User-Agent": "4cat/import", 563 "Authentication": api_key 564 }) as r: 565 r.raise_for_status() 566 with datapath.open("wb") as outfile: 567 for chunk in r.iter_content(chunk_size=8192): 568 outfile.write(chunk) 569 return r 570 else: 571 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 572 "User-Agent": "4cat/import", 573 "Authentication": api_key 574 }) 575 except requests.Timeout: 576 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 577 f"accessible to external connections and try again.") 578 except requests.RequestException as e: 579 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 580 f"accessible to external connections and try again.") 581 582 if response.status_code == 404: 583 raise FourcatImportException( 584 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 585 f"a valid dataset.") 586 elif response.status_code in (401, 403): 587 raise FourcatImportException( 588 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 589 f"dataset and are using the correct API key.") 590 elif response.status_code != 200: 591 raise FourcatImportException( 592 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 593 594 return response 595 596 @staticmethod 597 def validate_query(query, request, config): 598 """ 599 Validate custom data input 600 601 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 602 some metadata. 603 604 :param dict query: Query parameters, from client-side. 605 :param request: Flask request 606 :param ConfigManager|None config: Configuration reader (context-aware) 607 :return dict: Safe query parameters 608 """ 609 if query.get("method") == "zip": 610 filename = "" 611 if "option-data_upload-entries" in request.form: 612 # First pass sends list of files in the zip 613 pass 614 elif "option-data_upload" in request.files: 615 # Second pass sends the actual file 616 file = request.files["option-data_upload"] 617 if not file: 618 raise QueryParametersException("No file uploaded.") 619 620 if not file.filename.endswith(".zip"): 621 raise QueryParametersException("Uploaded file must be a ZIP file.") 622 623 filename = file.filename 624 else: 625 raise QueryParametersException("No file was offered for upload.") 626 627 return { 628 "method": "zip", 629 "filename": filename 630 } 631 elif query.get("method") == "url": 632 urls = query.get("url") 633 if not urls: 634 raise QueryParametersException("Provide at least one dataset URL.") 635 636 urls = urls.split(",") 637 bases = set([url.split("/results/")[0].lower() for url in urls]) 638 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 639 640 if len(keys) != 1: 641 # todo: change this to < 1 if we allow multiple datasets 642 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 643 644 if len(bases) != 1: 645 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 646 "one 4CAT server at a time.") 647 648 base = urls[0].split("/results/")[0] 649 try: 650 # test if API key is valid and server is reachable 651 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 652 except FourcatImportException as e: 653 raise QueryParametersException(str(e)) 654 655 try: 656 # test if we get a response we can parse 657 metadata = test.json() 658 except ValueError: 659 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 660 661 version = get_software_version() 662 663 if metadata.get("current_4CAT_version") != version: 664 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 665 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 666 "sure both are running the same version of 4CAT and try again.") 667 668 # OK, we can import at least one dataset 669 return { 670 "url": ",".join(urls), 671 "api-key": query.get("api-key") 672 } 673 else: 674 raise QueryParametersException("Import method not yet implemented.") 675 676 @staticmethod 677 def get_keys_from_urls(urls): 678 """ 679 Get dataset keys from 4CAT URLs 680 681 :param list urls: List of URLs 682 :return list: List of keys 683 """ 684 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls] 685 686 @staticmethod 687 def ensure_key(query): 688 """ 689 Determine key for dataset generated by this processor 690 691 When importing datasets, it's necessary to determine the key of the 692 dataset that is created before it is actually created, because we want 693 to keep the original key of the imported dataset if possible. Luckily, 694 we can deduce it from the URL we're importing the dataset from. 695 696 :param dict query: Input from the user, through the front-end 697 :return str: Desired dataset key 698 """ 699 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 700 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 701 urls = query.get("url", "").split(",") 702 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 703 return keys[0]
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, config=None):) -> bool
class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"
83 def process(self): 84 """ 85 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 86 """ 87 self.created_datasets = set() # keys of created datasets - may not be successful! 88 self.remapped_keys = {} # changed dataset keys 89 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 90 try: 91 if self.parameters.get("method") == "zip": 92 self.process_zip() 93 else: 94 self.process_urls() 95 except Exception as e: 96 # Catch all exceptions and finish the job with an error 97 # Resuming is impossible because this dataset was overwritten with the importing dataset 98 # halt_and_catch_fire() will clean up and delete the datasets that were created 99 self.interrupted = True 100 try: 101 self.halt_and_catch_fire() 102 except ProcessorInterruptedException: 103 pass 104 except InFailedSqlTransaction: 105 # Catch database issue and retry 106 self.db.rollback() 107 try: 108 self.halt_and_catch_fire() 109 except ProcessorInterruptedException: 110 pass 111 # Reraise the original exception for logging 112 raise e
Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
114 def after_create(query, dataset, request): 115 """ 116 Hook to execute after the dataset for this source has been created 117 118 In this case, put the file in a temporary location so it can be 119 processed properly by the related Job later. 120 121 :param dict query: Sanitised query parameters 122 :param DataSet dataset: Dataset created for this query 123 :param request: Flask request submitted for its creation 124 """ 125 if query.get("method") == "zip": 126 file = request.files["option-data_upload"] 127 file.seek(0) 128 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 129 while True: 130 chunk = file.read(1024) 131 if len(chunk) == 0: 132 break 133 outfile.write(chunk) 134 else: 135 # nothing to do for URLs 136 pass
Hook to execute after the dataset for this source has been created
In this case, put the file in a temporary location so it can be processed properly by the related Job later.
Parameters
- dict query: Sanitised query parameters
- DataSet dataset: Dataset created for this query
- request: Flask request submitted for its creation
139 def process_zip(self): 140 """ 141 Import 4CAT dataset from a ZIP file 142 """ 143 self.dataset.update_status("Importing datasets and analyses from ZIP file.") 144 temp_file = self.dataset.get_results_path().with_suffix(".importing") 145 146 imported = [] 147 processed_files = 1 # take into account the export.log file 148 failed_imports = [] 149 primary_dataset_original_log = None 150 with zipfile.ZipFile(temp_file, "r") as zip_ref: 151 zip_contents = zip_ref.namelist() 152 153 # Get all metadata files and determine primary dataset 154 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 155 if not metadata_files: 156 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 157 return 158 159 # Get the primary dataset 160 primary_dataset_keys = set() 161 datasets = [] 162 parent_child_mapping = {} 163 for file in metadata_files: 164 with zip_ref.open(file) as f: 165 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 166 metadata = json.loads(content) 167 if not metadata.get("key_parent"): 168 primary_dataset_keys.add(metadata.get("key")) 169 datasets.append(metadata) 170 else: 171 # Store the mapping of parent to child datasets 172 parent_key = metadata.get("key_parent") 173 if parent_key not in parent_child_mapping: 174 parent_child_mapping[parent_key] = [] 175 parent_child_mapping[parent_key].append(metadata) 176 177 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 178 if len(primary_dataset_keys) != 1: 179 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 180 return 181 182 # Import datasets 183 while datasets: 184 self.halt_and_catch_fire() 185 186 # Create the datasets 187 metadata = datasets.pop(0) 188 dataset_key = metadata.get("key") 189 processed_metadata = self.process_metadata(metadata) 190 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 191 processed_files += 1 192 193 # Copy the log file 194 self.halt_and_catch_fire() 195 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 196 if log_filename in zip_contents: 197 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 198 with zip_ref.open(log_filename) as f: 199 content = f.read().decode('utf-8') 200 if new_dataset.key == self.dataset.key: 201 # Hold the original log for the primary dataset and add at the end 202 primary_dataset_original_log = content 203 else: 204 new_dataset.log("Original dataset log included below:") 205 with new_dataset.get_log_path().open("a") as outfile: 206 outfile.write(content) 207 processed_files += 1 208 else: 209 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 210 211 # Copy the results 212 self.halt_and_catch_fire() 213 results_filename = metadata["result_file"] 214 if results_filename in zip_contents: 215 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 216 with zip_ref.open(results_filename) as f: 217 with new_dataset.get_results_path().open("wb") as outfile: 218 outfile.write(f.read()) 219 processed_files += 1 220 221 if not imported: 222 # first dataset - use num rows as 'overall' 223 num_rows = metadata["num_rows"] 224 else: 225 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 226 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 227 failed_imports.append(dataset_key) 228 continue 229 230 # finally, the kids 231 self.halt_and_catch_fire() 232 if dataset_key in parent_child_mapping: 233 datasets.extend(parent_child_mapping[dataset_key]) 234 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 235 236 # done - remember that we've imported this one 237 imported.append(new_dataset) 238 new_dataset.update_status(metadata["status"]) 239 240 if new_dataset.key != self.dataset.key: 241 # only finish if this is not the 'main' dataset, or the user 242 # will think the whole import is done 243 new_dataset.finish(metadata["num_rows"]) 244 245 # Check that all files were processed 246 missed_files = [] 247 if len(zip_contents) != processed_files: 248 for file in zip_contents: 249 if file not in processed_files: 250 missed_files.append(file) 251 252 # todo: this part needs updating if/when we support importing multiple datasets! 253 if failed_imports: 254 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 255 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 256 f"dataset log file for details.", is_final=True) 257 elif missed_files: 258 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 259 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 260 f"{len(missed_files)} files were not successfully imported. Check the " 261 f"dataset log file for details.", is_final=True) 262 else: 263 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 264 is_final=True) 265 266 if not self.dataset.is_finished(): 267 # now all related datasets are imported, we can finish the 'main' 268 # dataset, and the user will be alerted that the full import is 269 # complete 270 self.dataset.finish(num_rows) 271 272 # Add the original log for the primary dataset 273 if primary_dataset_original_log: 274 self.dataset.log("Original dataset log included below:\n") 275 with self.dataset.get_log_path().open("a") as outfile: 276 outfile.write(primary_dataset_original_log)
Import 4CAT dataset from a ZIP file
279 @staticmethod 280 def process_metadata(metadata): 281 """ 282 Process metadata for import 283 """ 284 # get rid of some keys that are server-specific and don't need to 285 # be stored (or don't correspond to database columns) 286 metadata.pop("current_4CAT_version") 287 metadata.pop("id") 288 metadata.pop("job") 289 metadata.pop("is_private") 290 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 291 292 # extra params are stored as JSON... 293 metadata["parameters"] = json.loads(metadata["parameters"]) 294 if "copied_from" in metadata["parameters"]: 295 metadata["parameters"].pop("copied_from") 296 metadata["parameters"] = json.dumps(metadata["parameters"]) 297 298 return metadata
Process metadata for import
300 def create_dataset(self, metadata, original_key, primary=False): 301 """ 302 Create a new dataset 303 """ 304 if primary: 305 self.dataset.update_status(f"Importing primary dataset {original_key}.") 306 # if this is the first dataset we're importing, make it the 307 # processor's "own" dataset. the key has already been set to 308 # the imported dataset's key via ensure_key() (or a new unqiue 309 # key if it already existed on this server) 310 # by making it the "own" dataset, the user initiating the 311 # import will see the imported dataset as the "result" of their 312 # import query in the interface, similar to the workflow for 313 # other data sources 314 new_dataset = self.dataset 315 316 # Update metadata and file 317 metadata.pop("key") # key already OK (see above) 318 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 319 320 else: 321 self.dataset.update_status(f"Importing child dataset {original_key}.") 322 # supernumerary datasets - handle on their own 323 # these include any children of imported datasets 324 try: 325 DataSet(key=metadata["key"], db=self.db, modules=self.modules) 326 327 # if we *haven't* thrown a DatasetException now, then the 328 # key is already in use, so create a "dummy" dataset and 329 # overwrite it with the metadata we have (except for the 330 # key). this ensures that a new unique key will be 331 # generated. 332 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 333 metadata.pop("key") 334 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 335 336 except DataSetException: 337 # this is *good* since it means the key doesn't exist, so 338 # we can re-use the key of the imported dataset 339 self.db.insert("datasets", data=metadata) 340 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 341 342 if new_dataset.key != original_key: 343 # could not use original key because it was already in use 344 # so update any references to use the new key 345 self.remapped_keys[original_key] = new_dataset.key 346 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 347 f"{new_dataset.key} instead of key {original_key}!") 348 349 # refresh object, make sure it's in sync with the database 350 self.created_datasets.add(new_dataset.key) 351 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 352 current_log = None 353 if new_dataset.key == self.dataset.key: 354 # this ensures that the first imported dataset becomes the 355 # processor's "own" dataset, and that the import logs go to 356 # that dataset's log file. For later imports, this evaluates to 357 # False. 358 359 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 360 current_log = self.dataset.get_log_path().read_text() 361 # Update the dataset 362 self.dataset = new_dataset 363 364 # if the key of the parent dataset was changed, change the 365 # reference to it that the child dataset has 366 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 367 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 368 369 # update some attributes that should come from the new server, not 370 # the old 371 new_dataset.creator = self.dataset_owner 372 new_dataset.original_timestamp = new_dataset.timestamp 373 new_dataset.imported = True 374 new_dataset.timestamp = int(time.time()) 375 new_dataset.db.commit() 376 377 # make sure the dataset path uses the new key and local dataset 378 # path settings. this also makes sure the log file is created in 379 # the right place (since it is derived from the results file path) 380 extension = metadata["result_file"].split(".")[-1] 381 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 382 if not updated: 383 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 384 385 if current_log: 386 # Add the current log to the new dataset 387 with new_dataset.get_log_path().open("a") as outfile: 388 outfile.write(current_log) 389 390 return new_dataset
Create a new dataset
393 def process_urls(self): 394 """ 395 Import 4CAT dataset from another 4CAT server 396 397 Interfaces with another 4CAT server to transfer a dataset's metadata, 398 data files and child datasets. 399 """ 400 urls = [url.strip() for url in self.parameters.get("url").split(",")] 401 self.base = urls[0].split("/results/")[0] 402 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 403 api_key = self.parameters.get("api-key") 404 405 imported = [] # successfully imported datasets 406 failed_imports = [] # keys that failed to import 407 num_rows = 0 # will be used later 408 409 # we can add support for multiple datasets later by removing 410 # this part! 411 keys = [keys[0]] 412 413 while keys: 414 dataset_key = keys.pop(0) 415 416 self.halt_and_catch_fire() 417 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 418 419 # first, metadata! 420 try: 421 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 422 metadata = metadata.json() 423 except FourcatImportException as e: 424 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 425 continue 426 except ValueError: 427 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 428 continue 429 430 # copying empty datasets doesn't really make sense 431 if metadata["num_rows"] == 0: 432 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 433 failed_imports.append(dataset_key) 434 continue 435 436 metadata = self.process_metadata(metadata) 437 438 # create the new dataset 439 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 440 441 # then, the log 442 self.halt_and_catch_fire() 443 try: 444 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 445 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 446 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 447 logpath = new_dataset.get_log_path() 448 new_dataset.log("Original dataset log included below:") 449 with logpath.open("a") as outfile: 450 outfile.write(log.text) 451 except FourcatImportException as e: 452 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 453 failed_imports.append(dataset_key) 454 continue 455 except ValueError: 456 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 457 failed_imports.append(dataset_key) 458 continue 459 460 # then, the results 461 self.halt_and_catch_fire() 462 try: 463 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 464 datapath = new_dataset.get_results_path() 465 SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 466 467 if not imported: 468 # first dataset - use num rows as 'overall' 469 num_rows = metadata["num_rows"] 470 471 except FourcatImportException as e: 472 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 473 if new_dataset.key != self.dataset.key: 474 new_dataset.delete() 475 continue 476 477 except ValueError: 478 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 479 failed_imports.append(dataset_key) 480 continue 481 482 # finally, the kids 483 self.halt_and_catch_fire() 484 try: 485 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 486 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 487 children = children.json() 488 except FourcatImportException as e: 489 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 490 failed_imports.append(dataset_key) 491 continue 492 except ValueError: 493 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 494 failed_imports.append(dataset_key) 495 continue 496 497 for child in children: 498 keys.append(child) 499 self.dataset.log(f"Adding child dataset {child} to import queue") 500 501 # done - remember that we've imported this one 502 imported.append(new_dataset) 503 new_dataset.update_status(metadata["status"]) 504 505 if new_dataset.key != self.dataset.key: 506 # only finish if this is not the 'main' dataset, or the user 507 # will think the whole import is done 508 new_dataset.finish(metadata["num_rows"]) 509 510 # todo: this part needs updating if/when we support importing multiple datasets! 511 if failed_imports: 512 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 513 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 514 f"dataset log file for details.", is_final=True) 515 else: 516 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 517 is_final=True) 518 519 if not self.dataset.is_finished(): 520 # now all related datasets are imported, we can finish the 'main' 521 # dataset, and the user will be alerted that the full import is 522 # complete 523 self.dataset.finish(num_rows)
Import 4CAT dataset from another 4CAT server
Interfaces with another 4CAT server to transfer a dataset's metadata, data files and child datasets.
525 def halt_and_catch_fire(self): 526 """ 527 Clean up on interrupt 528 529 There are multiple places in the code where we can bail out on an 530 interrupt, so abstract that away in its own function. 531 :return: 532 """ 533 if self.interrupted: 534 # resuming is impossible because the original dataset (which 535 # has the list of URLs to import) has probably been 536 # overwritten by this point 537 deletables = [k for k in self.created_datasets if k != self.dataset.key] 538 for deletable in deletables: 539 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 540 541 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 542 f"will need to initiate the import again.") 543 544 raise ProcessorInterruptedException()
Clean up on interrupt
There are multiple places in the code where we can bail out on an interrupt, so abstract that away in its own function.
Returns
546 @staticmethod 547 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 548 """ 549 Get dataset component from 4CAT export API 550 551 :param str base: Server URL base to import from 552 :param str dataset_key: Key of dataset to import 553 :param str api_key: API authentication token 554 :param str component: Component to retrieve 555 :return: HTTP response object 556 """ 557 try: 558 if component == "data" and datapath: 559 # Stream data 560 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 561 headers={ 562 "User-Agent": "4cat/import", 563 "Authentication": api_key 564 }) as r: 565 r.raise_for_status() 566 with datapath.open("wb") as outfile: 567 for chunk in r.iter_content(chunk_size=8192): 568 outfile.write(chunk) 569 return r 570 else: 571 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 572 "User-Agent": "4cat/import", 573 "Authentication": api_key 574 }) 575 except requests.Timeout: 576 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 577 f"accessible to external connections and try again.") 578 except requests.RequestException as e: 579 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 580 f"accessible to external connections and try again.") 581 582 if response.status_code == 404: 583 raise FourcatImportException( 584 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 585 f"a valid dataset.") 586 elif response.status_code in (401, 403): 587 raise FourcatImportException( 588 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 589 f"dataset and are using the correct API key.") 590 elif response.status_code != 200: 591 raise FourcatImportException( 592 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 593 594 return response
Get dataset component from 4CAT export API
Parameters
- str base: Server URL base to import from
- str dataset_key: Key of dataset to import
- str api_key: API authentication token
- str component: Component to retrieve
Returns
HTTP response object
596 @staticmethod 597 def validate_query(query, request, config): 598 """ 599 Validate custom data input 600 601 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 602 some metadata. 603 604 :param dict query: Query parameters, from client-side. 605 :param request: Flask request 606 :param ConfigManager|None config: Configuration reader (context-aware) 607 :return dict: Safe query parameters 608 """ 609 if query.get("method") == "zip": 610 filename = "" 611 if "option-data_upload-entries" in request.form: 612 # First pass sends list of files in the zip 613 pass 614 elif "option-data_upload" in request.files: 615 # Second pass sends the actual file 616 file = request.files["option-data_upload"] 617 if not file: 618 raise QueryParametersException("No file uploaded.") 619 620 if not file.filename.endswith(".zip"): 621 raise QueryParametersException("Uploaded file must be a ZIP file.") 622 623 filename = file.filename 624 else: 625 raise QueryParametersException("No file was offered for upload.") 626 627 return { 628 "method": "zip", 629 "filename": filename 630 } 631 elif query.get("method") == "url": 632 urls = query.get("url") 633 if not urls: 634 raise QueryParametersException("Provide at least one dataset URL.") 635 636 urls = urls.split(",") 637 bases = set([url.split("/results/")[0].lower() for url in urls]) 638 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 639 640 if len(keys) != 1: 641 # todo: change this to < 1 if we allow multiple datasets 642 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 643 644 if len(bases) != 1: 645 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 646 "one 4CAT server at a time.") 647 648 base = urls[0].split("/results/")[0] 649 try: 650 # test if API key is valid and server is reachable 651 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 652 except FourcatImportException as e: 653 raise QueryParametersException(str(e)) 654 655 try: 656 # test if we get a response we can parse 657 metadata = test.json() 658 except ValueError: 659 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 660 661 version = get_software_version() 662 663 if metadata.get("current_4CAT_version") != version: 664 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 665 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 666 "sure both are running the same version of 4CAT and try again.") 667 668 # OK, we can import at least one dataset 669 return { 670 "url": ",".join(urls), 671 "api-key": query.get("api-key") 672 } 673 else: 674 raise QueryParametersException("Import method not yet implemented.")
Validate custom data input
Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
676 @staticmethod 677 def get_keys_from_urls(urls): 678 """ 679 Get dataset keys from 4CAT URLs 680 681 :param list urls: List of URLs 682 :return list: List of keys 683 """ 684 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
Get dataset keys from 4CAT URLs
Parameters
- list urls: List of URLs
Returns
List of keys
686 @staticmethod 687 def ensure_key(query): 688 """ 689 Determine key for dataset generated by this processor 690 691 When importing datasets, it's necessary to determine the key of the 692 dataset that is created before it is actually created, because we want 693 to keep the original key of the imported dataset if possible. Luckily, 694 we can deduce it from the URL we're importing the dataset from. 695 696 :param dict query: Input from the user, through the front-end 697 :return str: Desired dataset key 698 """ 699 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 700 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 701 urls = query.get("url", "").split(",") 702 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 703 return keys[0]
Determine key for dataset generated by this processor
When importing datasets, it's necessary to determine the key of the dataset that is created before it is actually created, because we want to keep the original key of the imported dataset if possible. Luckily, we can deduce it from the URL we're importing the dataset from.
Parameters
- dict query: Input from the user, through the front-end
Returns
Desired dataset key
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- extension
- config
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor