datasources.fourcat_import.import_4cat
Import datasets from other 4CATs
1""" 2Import datasets from other 4CATs 3""" 4import requests 5import json 6import time 7import zipfile 8from pathlib import Path 9 10from backend.lib.processor import BasicProcessor 11from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException, 12 DataSetException) 13from common.lib.helpers import UserInput, get_software_version 14from common.lib.dataset import DataSet 15 16 17class FourcatImportException(FourcatException): 18 pass 19 20 21class SearchImportFromFourcat(BasicProcessor): 22 type = "import_4cat-search" # job ID 23 category = "Search" # category 24 title = "Import 4CAT dataset and analyses" # title displayed in UI 25 description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)" # description displayed in UI 26 is_local = False # Whether this datasource is locally scraped 27 is_static = False # Whether this datasource is still updated 28 29 max_workers = 1 # this cannot be more than 1, else things get VERY messy 30 31 options = { 32 "intro": { 33 "type": UserInput.OPTION_INFO, 34 "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. " 35 "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " 36 "You can find the current version in the footer at the bottom of the interface." 37 }, 38 "method": { 39 "type": UserInput.OPTION_CHOICE, 40 "help": "Import Type", 41 "options": { 42 "zip": "Zip File", 43 "url": "4CAT URL", 44 }, 45 "default": "url" 46 }, 47 "url": { 48 "type": UserInput.OPTION_TEXT, 49 "help": "Dataset URL", 50 "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 51 "requires": "method^=url" 52 }, 53 "intro2": { 54 "type": UserInput.OPTION_INFO, 55 "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " 56 "an API key from **the server you are importing from**, not the one you are looking at right now. " 57 "Additionally, you need to have owner access to the dataset you want to import.", 58 "requires": "method^=url" 59 }, 60 "api-key": { 61 "type": UserInput.OPTION_TEXT, 62 "help": "4CAT API Key", 63 "sensitive": True, 64 "cache": True, 65 "requires": "method^=url" 66 }, 67 "data_upload": { 68 "type": UserInput.OPTION_FILE, 69 "help": "File", 70 "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", 71 "requires": "method^=zip" 72 }, 73 74 } 75 76 created_datasets = None 77 base = None 78 remapped_keys = None 79 dataset_owner = None 80 81 def process(self): 82 """ 83 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 84 """ 85 self.created_datasets = set() # keys of created datasets - may not be successful! 86 self.remapped_keys = {} # changed dataset keys 87 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 88 try: 89 if self.parameters.get("method") == "zip": 90 self.process_zip() 91 else: 92 self.process_urls() 93 except Exception as e: 94 # Catch all exceptions and finish the job with an error 95 # Resuming is impossible because this dataset was overwritten with the importing dataset 96 # halt_and_catch_fire() will clean up and delete the datasets that were created 97 self.interrupted = True 98 try: 99 self.halt_and_catch_fire() 100 except ProcessorInterruptedException: 101 pass 102 # Reraise the original exception for logging 103 raise e 104 105 def after_create(query, dataset, request): 106 """ 107 Hook to execute after the dataset for this source has been created 108 109 In this case, put the file in a temporary location so it can be 110 processed properly by the related Job later. 111 112 :param dict query: Sanitised query parameters 113 :param DataSet dataset: Dataset created for this query 114 :param request: Flask request submitted for its creation 115 """ 116 if query.get("method") == "zip": 117 file = request.files["option-data_upload"] 118 file.seek(0) 119 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 120 while True: 121 chunk = file.read(1024) 122 if len(chunk) == 0: 123 break 124 outfile.write(chunk) 125 else: 126 # nothing to do for URLs 127 pass 128 129 130 def process_zip(self): 131 """ 132 Import 4CAT dataset from a ZIP file 133 """ 134 self.dataset.update_status(f"Importing datasets and analyses from ZIP file.") 135 temp_file = self.dataset.get_results_path().with_suffix(".importing") 136 137 imported = [] 138 processed_files = 1 # take into account the export.log file 139 failed_imports = [] 140 primary_dataset_original_log = None 141 with zipfile.ZipFile(temp_file, "r") as zip_ref: 142 zip_contents = zip_ref.namelist() 143 144 # Get all metadata files and determine primary dataset 145 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 146 if not metadata_files: 147 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 148 return 149 150 # Get the primary dataset 151 primary_dataset_keys = set() 152 datasets = [] 153 parent_child_mapping = {} 154 for file in metadata_files: 155 with zip_ref.open(file) as f: 156 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 157 metadata = json.loads(content) 158 if not metadata.get("key_parent"): 159 primary_dataset_keys.add(metadata.get("key")) 160 datasets.append(metadata) 161 else: 162 # Store the mapping of parent to child datasets 163 parent_key = metadata.get("key_parent") 164 if parent_key not in parent_child_mapping: 165 parent_child_mapping[parent_key] = [] 166 parent_child_mapping[parent_key].append(metadata) 167 168 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 169 if len(primary_dataset_keys) != 1: 170 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 171 return 172 173 # Import datasets 174 while datasets: 175 self.halt_and_catch_fire() 176 177 # Create the datasets 178 metadata = datasets.pop(0) 179 dataset_key = metadata.get("key") 180 processed_metadata = self.process_metadata(metadata) 181 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 182 processed_files += 1 183 184 # Copy the log file 185 self.halt_and_catch_fire() 186 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 187 if log_filename in zip_contents: 188 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 189 with zip_ref.open(log_filename) as f: 190 content = f.read().decode('utf-8') 191 if new_dataset.key == self.dataset.key: 192 # Hold the original log for the primary dataset and add at the end 193 primary_dataset_original_log = content 194 else: 195 new_dataset.log("Original dataset log included below:") 196 with new_dataset.get_log_path().open("a") as outfile: 197 outfile.write(content) 198 processed_files += 1 199 else: 200 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 201 202 # Copy the results 203 self.halt_and_catch_fire() 204 results_filename = metadata["result_file"] 205 if results_filename in zip_contents: 206 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 207 with zip_ref.open(results_filename) as f: 208 with new_dataset.get_results_path().open("wb") as outfile: 209 outfile.write(f.read()) 210 processed_files += 1 211 212 if not imported: 213 # first dataset - use num rows as 'overall' 214 num_rows = metadata["num_rows"] 215 else: 216 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 217 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 218 failed_imports.append(dataset_key) 219 continue 220 221 # finally, the kids 222 self.halt_and_catch_fire() 223 if dataset_key in parent_child_mapping: 224 datasets.extend(parent_child_mapping[dataset_key]) 225 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 226 227 # done - remember that we've imported this one 228 imported.append(new_dataset) 229 new_dataset.update_status(metadata["status"]) 230 231 if new_dataset.key != self.dataset.key: 232 # only finish if this is not the 'main' dataset, or the user 233 # will think the whole import is done 234 new_dataset.finish(metadata["num_rows"]) 235 236 # Check that all files were processed 237 missed_files = [] 238 if len(zip_contents) != processed_files: 239 for file in zip_contents: 240 if file not in processed_files: 241 missed_files.append(file) 242 243 # todo: this part needs updating if/when we support importing multiple datasets! 244 if failed_imports: 245 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 246 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 247 f"dataset log file for details.", is_final=True) 248 elif missed_files: 249 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 250 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 251 f"{len(missed_files)} files were not successfully imported. Check the " 252 f"dataset log file for details.", is_final=True) 253 else: 254 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 255 is_final=True) 256 257 if not self.dataset.is_finished(): 258 # now all related datasets are imported, we can finish the 'main' 259 # dataset, and the user will be alerted that the full import is 260 # complete 261 self.dataset.finish(num_rows) 262 263 # Add the original log for the primary dataset 264 if primary_dataset_original_log: 265 self.dataset.log("Original dataset log included below:\n") 266 with self.dataset.get_log_path().open("a") as outfile: 267 outfile.write(primary_dataset_original_log) 268 269 270 @staticmethod 271 def process_metadata(metadata): 272 """ 273 Process metadata for import 274 """ 275 # get rid of some keys that are server-specific and don't need to 276 # be stored (or don't correspond to database columns) 277 metadata.pop("current_4CAT_version") 278 metadata.pop("id") 279 metadata.pop("job") 280 metadata.pop("is_private") 281 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 282 283 # extra params are stored as JSON... 284 metadata["parameters"] = json.loads(metadata["parameters"]) 285 if "copied_from" in metadata["parameters"]: 286 metadata["parameters"].pop("copied_from") 287 metadata["parameters"] = json.dumps(metadata["parameters"]) 288 289 return metadata 290 291 def create_dataset(self, metadata, original_key, primary=False): 292 """ 293 Create a new dataset 294 """ 295 if primary: 296 self.dataset.update_status(f"Importing primary dataset {original_key}.") 297 # if this is the first dataset we're importing, make it the 298 # processor's "own" dataset. the key has already been set to 299 # the imported dataset's key via ensure_key() (or a new unqiue 300 # key if it already existed on this server) 301 # by making it the "own" dataset, the user initiating the 302 # import will see the imported dataset as the "result" of their 303 # import query in the interface, similar to the workflow for 304 # other data sources 305 new_dataset = self.dataset 306 307 # Update metadata and file 308 metadata.pop("key") # key already OK (see above) 309 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 310 311 else: 312 self.dataset.update_status(f"Importing child dataset {original_key}.") 313 # supernumerary datasets - handle on their own 314 # these include any children of imported datasets 315 try: 316 key_exists = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 317 318 # if we *haven't* thrown a DatasetException now, then the 319 # key is already in use, so create a "dummy" dataset and 320 # overwrite it with the metadata we have (except for the 321 # key). this ensures that a new unique key will be 322 # generated. 323 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 324 metadata.pop("key") 325 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 326 327 except DataSetException: 328 # this is *good* since it means the key doesn't exist, so 329 # we can re-use the key of the imported dataset 330 self.db.insert("datasets", data=metadata) 331 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 332 333 if new_dataset.key != original_key: 334 # could not use original key because it was already in use 335 # so update any references to use the new key 336 self.remapped_keys[original_key] = new_dataset.key 337 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 338 f"{new_dataset.key} instead of key {original_key}!") 339 340 # refresh object, make sure it's in sync with the database 341 self.created_datasets.add(new_dataset.key) 342 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 343 current_log = None 344 if new_dataset.key == self.dataset.key: 345 # this ensures that the first imported dataset becomes the 346 # processor's "own" dataset, and that the import logs go to 347 # that dataset's log file. For later imports, this evaluates to 348 # False. 349 350 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 351 current_log = self.dataset.get_log_path().read_text() 352 # Update the dataset 353 self.dataset = new_dataset 354 355 # if the key of the parent dataset was changed, change the 356 # reference to it that the child dataset has 357 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 358 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 359 360 # update some attributes that should come from the new server, not 361 # the old 362 new_dataset.creator = self.dataset_owner 363 new_dataset.original_timestamp = new_dataset.timestamp 364 new_dataset.imported = True 365 new_dataset.timestamp = int(time.time()) 366 new_dataset.db.commit() 367 368 # make sure the dataset path uses the new key and local dataset 369 # path settings. this also makes sure the log file is created in 370 # the right place (since it is derived from the results file path) 371 extension = metadata["result_file"].split(".")[-1] 372 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 373 if not updated: 374 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 375 376 if current_log: 377 # Add the current log to the new dataset 378 with new_dataset.get_log_path().open("a") as outfile: 379 outfile.write(current_log) 380 381 return new_dataset 382 383 384 def process_urls(self): 385 """ 386 Import 4CAT dataset from another 4CAT server 387 388 Interfaces with another 4CAT server to transfer a dataset's metadata, 389 data files and child datasets. 390 """ 391 urls = [url.strip() for url in self.parameters.get("url").split(",")] 392 self.base = urls[0].split("/results/")[0] 393 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 394 api_key = self.parameters.get("api-key") 395 396 imported = [] # successfully imported datasets 397 failed_imports = [] # keys that failed to import 398 num_rows = 0 # will be used later 399 400 # we can add support for multiple datasets later by removing 401 # this part! 402 keys = [keys[0]] 403 404 while keys: 405 dataset_key = keys.pop(0) 406 407 self.halt_and_catch_fire() 408 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 409 410 # first, metadata! 411 try: 412 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 413 metadata = metadata.json() 414 except FourcatImportException as e: 415 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 416 continue 417 except ValueError: 418 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 419 continue 420 421 # copying empty datasets doesn't really make sense 422 if metadata["num_rows"] == 0: 423 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 424 failed_imports.append(dataset_key) 425 continue 426 427 metadata = self.process_metadata(metadata) 428 429 # create the new dataset 430 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 431 432 # then, the log 433 self.halt_and_catch_fire() 434 try: 435 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 436 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 437 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 438 logpath = new_dataset.get_log_path() 439 new_dataset.log("Original dataset log included below:") 440 with logpath.open("a") as outfile: 441 outfile.write(log.text) 442 except FourcatImportException as e: 443 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 444 failed_imports.append(dataset_key) 445 continue 446 except ValueError: 447 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 448 failed_imports.append(dataset_key) 449 continue 450 451 # then, the results 452 self.halt_and_catch_fire() 453 try: 454 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 455 datapath = new_dataset.get_results_path() 456 data = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 457 458 if not imported: 459 # first dataset - use num rows as 'overall' 460 num_rows = metadata["num_rows"] 461 462 except FourcatImportException as e: 463 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 464 if new_dataset.key != self.dataset.key: 465 new_dataset.delete() 466 continue 467 468 except ValueError: 469 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 470 failed_imports.append(dataset_key) 471 continue 472 473 # finally, the kids 474 self.halt_and_catch_fire() 475 try: 476 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 477 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 478 children = children.json() 479 except FourcatImportException as e: 480 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 481 failed_imports.append(dataset_key) 482 continue 483 except ValueError: 484 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 485 failed_imports.append(dataset_key) 486 continue 487 488 for child in children: 489 keys.append(child) 490 self.dataset.log(f"Adding child dataset {child} to import queue") 491 492 # done - remember that we've imported this one 493 imported.append(new_dataset) 494 new_dataset.update_status(metadata["status"]) 495 496 if new_dataset.key != self.dataset.key: 497 # only finish if this is not the 'main' dataset, or the user 498 # will think the whole import is done 499 new_dataset.finish(metadata["num_rows"]) 500 501 # todo: this part needs updating if/when we support importing multiple datasets! 502 if failed_imports: 503 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 504 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 505 f"dataset log file for details.", is_final=True) 506 else: 507 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 508 is_final=True) 509 510 if not self.dataset.is_finished(): 511 # now all related datasets are imported, we can finish the 'main' 512 # dataset, and the user will be alerted that the full import is 513 # complete 514 self.dataset.finish(num_rows) 515 516 def halt_and_catch_fire(self): 517 """ 518 Clean up on interrupt 519 520 There are multiple places in the code where we can bail out on an 521 interrupt, so abstract that away in its own function. 522 :return: 523 """ 524 if self.interrupted: 525 # resuming is impossible because the original dataset (which 526 # has the list of URLs to import) has probably been 527 # overwritten by this point 528 deletables = [k for k in self.created_datasets if k != self.dataset.key] 529 for deletable in deletables: 530 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 531 532 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 533 f"will need to initiate the import again.") 534 535 raise ProcessorInterruptedException() 536 537 @staticmethod 538 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 539 """ 540 Get dataset component from 4CAT export API 541 542 :param str base: Server URL base to import from 543 :param str dataset_key: Key of dataset to import 544 :param str api_key: API authentication token 545 :param str component: Component to retrieve 546 :return: HTTP response object 547 """ 548 try: 549 if component == "data" and datapath: 550 # Stream data 551 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 552 headers={ 553 "User-Agent": "4cat/import", 554 "Authentication": api_key 555 }) as r: 556 r.raise_for_status() 557 with datapath.open("wb") as outfile: 558 for chunk in r.iter_content(chunk_size=8192): 559 outfile.write(chunk) 560 return r 561 else: 562 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 563 "User-Agent": "4cat/import", 564 "Authentication": api_key 565 }) 566 except requests.Timeout: 567 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 568 f"accessible to external connections and try again.") 569 except requests.RequestException as e: 570 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 571 f"accessible to external connections and try again.") 572 573 if response.status_code == 404: 574 raise FourcatImportException( 575 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 576 f"a valid dataset.") 577 elif response.status_code in (401, 403): 578 raise FourcatImportException( 579 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 580 f"dataset and are using the correct API key.") 581 elif response.status_code != 200: 582 raise FourcatImportException( 583 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 584 585 return response 586 587 @staticmethod 588 def validate_query(query, request, user): 589 """ 590 Validate custom data input 591 592 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 593 some metadata. 594 595 :param dict query: Query parameters, from client-side. 596 :param request: Flask request 597 :param User user: User object of user who has submitted the query 598 :return dict: Safe query parameters 599 """ 600 if query.get("method") == "zip": 601 filename = "" 602 if "option-data_upload-entries" in request.form: 603 # First pass sends list of files in the zip 604 pass 605 elif "option-data_upload" in request.files: 606 # Second pass sends the actual file 607 file = request.files["option-data_upload"] 608 if not file: 609 raise QueryParametersException("No file uploaded.") 610 611 if not file.filename.endswith(".zip"): 612 raise QueryParametersException("Uploaded file must be a ZIP file.") 613 614 filename = file.filename 615 else: 616 raise QueryParametersException("No file was offered for upload.") 617 618 return { 619 "method": "zip", 620 "filename": filename 621 } 622 elif query.get("method") == "url": 623 urls = query.get("url") 624 if not urls: 625 raise QueryParametersException("Provide at least one dataset URL.") 626 627 urls = urls.split(",") 628 bases = set([url.split("/results/")[0].lower() for url in urls]) 629 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 630 631 if len(keys) != 1: 632 # todo: change this to < 1 if we allow multiple datasets 633 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 634 635 if len(bases) != 1: 636 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 637 "one 4CAT server at a time.") 638 639 base = urls[0].split("/results/")[0] 640 try: 641 # test if API key is valid and server is reachable 642 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 643 except FourcatImportException as e: 644 raise QueryParametersException(str(e)) 645 646 try: 647 # test if we get a response we can parse 648 metadata = test.json() 649 except ValueError: 650 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 651 652 version = get_software_version() 653 654 if metadata.get("current_4CAT_version") != version: 655 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 656 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 657 "sure both are running the same version of 4CAT and try again.") 658 659 # OK, we can import at least one dataset 660 return { 661 "url": ",".join(urls), 662 "api-key": query.get("api-key") 663 } 664 else: 665 raise QueryParametersException("Import method not yet implemented.") 666 667 @staticmethod 668 def get_keys_from_urls(urls): 669 """ 670 Get dataset keys from 4CAT URLs 671 672 :param list urls: List of URLs 673 :return list: List of keys 674 """ 675 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls] 676 677 @staticmethod 678 def ensure_key(query): 679 """ 680 Determine key for dataset generated by this processor 681 682 When importing datasets, it's necessary to determine the key of the 683 dataset that is created before it is actually created, because we want 684 to keep the original key of the imported dataset if possible. Luckily, 685 we can deduce it from the URL we're importing the dataset from. 686 687 :param dict query: Input from the user, through the front-end 688 :return str: Desired dataset key 689 """ 690 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 691 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 692 urls = query.get("url", "").split(",") 693 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 694 return keys[0]
Base 4CAT exception class
Inherited Members
22class SearchImportFromFourcat(BasicProcessor): 23 type = "import_4cat-search" # job ID 24 category = "Search" # category 25 title = "Import 4CAT dataset and analyses" # title displayed in UI 26 description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)" # description displayed in UI 27 is_local = False # Whether this datasource is locally scraped 28 is_static = False # Whether this datasource is still updated 29 30 max_workers = 1 # this cannot be more than 1, else things get VERY messy 31 32 options = { 33 "intro": { 34 "type": UserInput.OPTION_INFO, 35 "help": "Provide the URL of a dataset in another 4CAT server that you would like to copy to this one here. " 36 "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " 37 "You can find the current version in the footer at the bottom of the interface." 38 }, 39 "method": { 40 "type": UserInput.OPTION_CHOICE, 41 "help": "Import Type", 42 "options": { 43 "zip": "Zip File", 44 "url": "4CAT URL", 45 }, 46 "default": "url" 47 }, 48 "url": { 49 "type": UserInput.OPTION_TEXT, 50 "help": "Dataset URL", 51 "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", 52 "requires": "method^=url" 53 }, 54 "intro2": { 55 "type": UserInput.OPTION_INFO, 56 "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " 57 "an API key from **the server you are importing from**, not the one you are looking at right now. " 58 "Additionally, you need to have owner access to the dataset you want to import.", 59 "requires": "method^=url" 60 }, 61 "api-key": { 62 "type": UserInput.OPTION_TEXT, 63 "help": "4CAT API Key", 64 "sensitive": True, 65 "cache": True, 66 "requires": "method^=url" 67 }, 68 "data_upload": { 69 "type": UserInput.OPTION_FILE, 70 "help": "File", 71 "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", 72 "requires": "method^=zip" 73 }, 74 75 } 76 77 created_datasets = None 78 base = None 79 remapped_keys = None 80 dataset_owner = None 81 82 def process(self): 83 """ 84 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 85 """ 86 self.created_datasets = set() # keys of created datasets - may not be successful! 87 self.remapped_keys = {} # changed dataset keys 88 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 89 try: 90 if self.parameters.get("method") == "zip": 91 self.process_zip() 92 else: 93 self.process_urls() 94 except Exception as e: 95 # Catch all exceptions and finish the job with an error 96 # Resuming is impossible because this dataset was overwritten with the importing dataset 97 # halt_and_catch_fire() will clean up and delete the datasets that were created 98 self.interrupted = True 99 try: 100 self.halt_and_catch_fire() 101 except ProcessorInterruptedException: 102 pass 103 # Reraise the original exception for logging 104 raise e 105 106 def after_create(query, dataset, request): 107 """ 108 Hook to execute after the dataset for this source has been created 109 110 In this case, put the file in a temporary location so it can be 111 processed properly by the related Job later. 112 113 :param dict query: Sanitised query parameters 114 :param DataSet dataset: Dataset created for this query 115 :param request: Flask request submitted for its creation 116 """ 117 if query.get("method") == "zip": 118 file = request.files["option-data_upload"] 119 file.seek(0) 120 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 121 while True: 122 chunk = file.read(1024) 123 if len(chunk) == 0: 124 break 125 outfile.write(chunk) 126 else: 127 # nothing to do for URLs 128 pass 129 130 131 def process_zip(self): 132 """ 133 Import 4CAT dataset from a ZIP file 134 """ 135 self.dataset.update_status(f"Importing datasets and analyses from ZIP file.") 136 temp_file = self.dataset.get_results_path().with_suffix(".importing") 137 138 imported = [] 139 processed_files = 1 # take into account the export.log file 140 failed_imports = [] 141 primary_dataset_original_log = None 142 with zipfile.ZipFile(temp_file, "r") as zip_ref: 143 zip_contents = zip_ref.namelist() 144 145 # Get all metadata files and determine primary dataset 146 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 147 if not metadata_files: 148 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 149 return 150 151 # Get the primary dataset 152 primary_dataset_keys = set() 153 datasets = [] 154 parent_child_mapping = {} 155 for file in metadata_files: 156 with zip_ref.open(file) as f: 157 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 158 metadata = json.loads(content) 159 if not metadata.get("key_parent"): 160 primary_dataset_keys.add(metadata.get("key")) 161 datasets.append(metadata) 162 else: 163 # Store the mapping of parent to child datasets 164 parent_key = metadata.get("key_parent") 165 if parent_key not in parent_child_mapping: 166 parent_child_mapping[parent_key] = [] 167 parent_child_mapping[parent_key].append(metadata) 168 169 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 170 if len(primary_dataset_keys) != 1: 171 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 172 return 173 174 # Import datasets 175 while datasets: 176 self.halt_and_catch_fire() 177 178 # Create the datasets 179 metadata = datasets.pop(0) 180 dataset_key = metadata.get("key") 181 processed_metadata = self.process_metadata(metadata) 182 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 183 processed_files += 1 184 185 # Copy the log file 186 self.halt_and_catch_fire() 187 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 188 if log_filename in zip_contents: 189 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 190 with zip_ref.open(log_filename) as f: 191 content = f.read().decode('utf-8') 192 if new_dataset.key == self.dataset.key: 193 # Hold the original log for the primary dataset and add at the end 194 primary_dataset_original_log = content 195 else: 196 new_dataset.log("Original dataset log included below:") 197 with new_dataset.get_log_path().open("a") as outfile: 198 outfile.write(content) 199 processed_files += 1 200 else: 201 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 202 203 # Copy the results 204 self.halt_and_catch_fire() 205 results_filename = metadata["result_file"] 206 if results_filename in zip_contents: 207 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 208 with zip_ref.open(results_filename) as f: 209 with new_dataset.get_results_path().open("wb") as outfile: 210 outfile.write(f.read()) 211 processed_files += 1 212 213 if not imported: 214 # first dataset - use num rows as 'overall' 215 num_rows = metadata["num_rows"] 216 else: 217 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 218 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 219 failed_imports.append(dataset_key) 220 continue 221 222 # finally, the kids 223 self.halt_and_catch_fire() 224 if dataset_key in parent_child_mapping: 225 datasets.extend(parent_child_mapping[dataset_key]) 226 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 227 228 # done - remember that we've imported this one 229 imported.append(new_dataset) 230 new_dataset.update_status(metadata["status"]) 231 232 if new_dataset.key != self.dataset.key: 233 # only finish if this is not the 'main' dataset, or the user 234 # will think the whole import is done 235 new_dataset.finish(metadata["num_rows"]) 236 237 # Check that all files were processed 238 missed_files = [] 239 if len(zip_contents) != processed_files: 240 for file in zip_contents: 241 if file not in processed_files: 242 missed_files.append(file) 243 244 # todo: this part needs updating if/when we support importing multiple datasets! 245 if failed_imports: 246 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 247 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 248 f"dataset log file for details.", is_final=True) 249 elif missed_files: 250 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 251 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 252 f"{len(missed_files)} files were not successfully imported. Check the " 253 f"dataset log file for details.", is_final=True) 254 else: 255 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 256 is_final=True) 257 258 if not self.dataset.is_finished(): 259 # now all related datasets are imported, we can finish the 'main' 260 # dataset, and the user will be alerted that the full import is 261 # complete 262 self.dataset.finish(num_rows) 263 264 # Add the original log for the primary dataset 265 if primary_dataset_original_log: 266 self.dataset.log("Original dataset log included below:\n") 267 with self.dataset.get_log_path().open("a") as outfile: 268 outfile.write(primary_dataset_original_log) 269 270 271 @staticmethod 272 def process_metadata(metadata): 273 """ 274 Process metadata for import 275 """ 276 # get rid of some keys that are server-specific and don't need to 277 # be stored (or don't correspond to database columns) 278 metadata.pop("current_4CAT_version") 279 metadata.pop("id") 280 metadata.pop("job") 281 metadata.pop("is_private") 282 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 283 284 # extra params are stored as JSON... 285 metadata["parameters"] = json.loads(metadata["parameters"]) 286 if "copied_from" in metadata["parameters"]: 287 metadata["parameters"].pop("copied_from") 288 metadata["parameters"] = json.dumps(metadata["parameters"]) 289 290 return metadata 291 292 def create_dataset(self, metadata, original_key, primary=False): 293 """ 294 Create a new dataset 295 """ 296 if primary: 297 self.dataset.update_status(f"Importing primary dataset {original_key}.") 298 # if this is the first dataset we're importing, make it the 299 # processor's "own" dataset. the key has already been set to 300 # the imported dataset's key via ensure_key() (or a new unqiue 301 # key if it already existed on this server) 302 # by making it the "own" dataset, the user initiating the 303 # import will see the imported dataset as the "result" of their 304 # import query in the interface, similar to the workflow for 305 # other data sources 306 new_dataset = self.dataset 307 308 # Update metadata and file 309 metadata.pop("key") # key already OK (see above) 310 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 311 312 else: 313 self.dataset.update_status(f"Importing child dataset {original_key}.") 314 # supernumerary datasets - handle on their own 315 # these include any children of imported datasets 316 try: 317 key_exists = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 318 319 # if we *haven't* thrown a DatasetException now, then the 320 # key is already in use, so create a "dummy" dataset and 321 # overwrite it with the metadata we have (except for the 322 # key). this ensures that a new unique key will be 323 # generated. 324 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 325 metadata.pop("key") 326 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 327 328 except DataSetException: 329 # this is *good* since it means the key doesn't exist, so 330 # we can re-use the key of the imported dataset 331 self.db.insert("datasets", data=metadata) 332 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 333 334 if new_dataset.key != original_key: 335 # could not use original key because it was already in use 336 # so update any references to use the new key 337 self.remapped_keys[original_key] = new_dataset.key 338 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 339 f"{new_dataset.key} instead of key {original_key}!") 340 341 # refresh object, make sure it's in sync with the database 342 self.created_datasets.add(new_dataset.key) 343 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 344 current_log = None 345 if new_dataset.key == self.dataset.key: 346 # this ensures that the first imported dataset becomes the 347 # processor's "own" dataset, and that the import logs go to 348 # that dataset's log file. For later imports, this evaluates to 349 # False. 350 351 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 352 current_log = self.dataset.get_log_path().read_text() 353 # Update the dataset 354 self.dataset = new_dataset 355 356 # if the key of the parent dataset was changed, change the 357 # reference to it that the child dataset has 358 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 359 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 360 361 # update some attributes that should come from the new server, not 362 # the old 363 new_dataset.creator = self.dataset_owner 364 new_dataset.original_timestamp = new_dataset.timestamp 365 new_dataset.imported = True 366 new_dataset.timestamp = int(time.time()) 367 new_dataset.db.commit() 368 369 # make sure the dataset path uses the new key and local dataset 370 # path settings. this also makes sure the log file is created in 371 # the right place (since it is derived from the results file path) 372 extension = metadata["result_file"].split(".")[-1] 373 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 374 if not updated: 375 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 376 377 if current_log: 378 # Add the current log to the new dataset 379 with new_dataset.get_log_path().open("a") as outfile: 380 outfile.write(current_log) 381 382 return new_dataset 383 384 385 def process_urls(self): 386 """ 387 Import 4CAT dataset from another 4CAT server 388 389 Interfaces with another 4CAT server to transfer a dataset's metadata, 390 data files and child datasets. 391 """ 392 urls = [url.strip() for url in self.parameters.get("url").split(",")] 393 self.base = urls[0].split("/results/")[0] 394 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 395 api_key = self.parameters.get("api-key") 396 397 imported = [] # successfully imported datasets 398 failed_imports = [] # keys that failed to import 399 num_rows = 0 # will be used later 400 401 # we can add support for multiple datasets later by removing 402 # this part! 403 keys = [keys[0]] 404 405 while keys: 406 dataset_key = keys.pop(0) 407 408 self.halt_and_catch_fire() 409 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 410 411 # first, metadata! 412 try: 413 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 414 metadata = metadata.json() 415 except FourcatImportException as e: 416 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 417 continue 418 except ValueError: 419 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 420 continue 421 422 # copying empty datasets doesn't really make sense 423 if metadata["num_rows"] == 0: 424 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 425 failed_imports.append(dataset_key) 426 continue 427 428 metadata = self.process_metadata(metadata) 429 430 # create the new dataset 431 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 432 433 # then, the log 434 self.halt_and_catch_fire() 435 try: 436 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 437 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 438 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 439 logpath = new_dataset.get_log_path() 440 new_dataset.log("Original dataset log included below:") 441 with logpath.open("a") as outfile: 442 outfile.write(log.text) 443 except FourcatImportException as e: 444 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 445 failed_imports.append(dataset_key) 446 continue 447 except ValueError: 448 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 449 failed_imports.append(dataset_key) 450 continue 451 452 # then, the results 453 self.halt_and_catch_fire() 454 try: 455 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 456 datapath = new_dataset.get_results_path() 457 data = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 458 459 if not imported: 460 # first dataset - use num rows as 'overall' 461 num_rows = metadata["num_rows"] 462 463 except FourcatImportException as e: 464 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 465 if new_dataset.key != self.dataset.key: 466 new_dataset.delete() 467 continue 468 469 except ValueError: 470 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 471 failed_imports.append(dataset_key) 472 continue 473 474 # finally, the kids 475 self.halt_and_catch_fire() 476 try: 477 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 478 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 479 children = children.json() 480 except FourcatImportException as e: 481 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 482 failed_imports.append(dataset_key) 483 continue 484 except ValueError: 485 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 486 failed_imports.append(dataset_key) 487 continue 488 489 for child in children: 490 keys.append(child) 491 self.dataset.log(f"Adding child dataset {child} to import queue") 492 493 # done - remember that we've imported this one 494 imported.append(new_dataset) 495 new_dataset.update_status(metadata["status"]) 496 497 if new_dataset.key != self.dataset.key: 498 # only finish if this is not the 'main' dataset, or the user 499 # will think the whole import is done 500 new_dataset.finish(metadata["num_rows"]) 501 502 # todo: this part needs updating if/when we support importing multiple datasets! 503 if failed_imports: 504 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 505 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 506 f"dataset log file for details.", is_final=True) 507 else: 508 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 509 is_final=True) 510 511 if not self.dataset.is_finished(): 512 # now all related datasets are imported, we can finish the 'main' 513 # dataset, and the user will be alerted that the full import is 514 # complete 515 self.dataset.finish(num_rows) 516 517 def halt_and_catch_fire(self): 518 """ 519 Clean up on interrupt 520 521 There are multiple places in the code where we can bail out on an 522 interrupt, so abstract that away in its own function. 523 :return: 524 """ 525 if self.interrupted: 526 # resuming is impossible because the original dataset (which 527 # has the list of URLs to import) has probably been 528 # overwritten by this point 529 deletables = [k for k in self.created_datasets if k != self.dataset.key] 530 for deletable in deletables: 531 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 532 533 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 534 f"will need to initiate the import again.") 535 536 raise ProcessorInterruptedException() 537 538 @staticmethod 539 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 540 """ 541 Get dataset component from 4CAT export API 542 543 :param str base: Server URL base to import from 544 :param str dataset_key: Key of dataset to import 545 :param str api_key: API authentication token 546 :param str component: Component to retrieve 547 :return: HTTP response object 548 """ 549 try: 550 if component == "data" and datapath: 551 # Stream data 552 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 553 headers={ 554 "User-Agent": "4cat/import", 555 "Authentication": api_key 556 }) as r: 557 r.raise_for_status() 558 with datapath.open("wb") as outfile: 559 for chunk in r.iter_content(chunk_size=8192): 560 outfile.write(chunk) 561 return r 562 else: 563 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 564 "User-Agent": "4cat/import", 565 "Authentication": api_key 566 }) 567 except requests.Timeout: 568 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 569 f"accessible to external connections and try again.") 570 except requests.RequestException as e: 571 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 572 f"accessible to external connections and try again.") 573 574 if response.status_code == 404: 575 raise FourcatImportException( 576 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 577 f"a valid dataset.") 578 elif response.status_code in (401, 403): 579 raise FourcatImportException( 580 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 581 f"dataset and are using the correct API key.") 582 elif response.status_code != 200: 583 raise FourcatImportException( 584 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 585 586 return response 587 588 @staticmethod 589 def validate_query(query, request, user): 590 """ 591 Validate custom data input 592 593 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 594 some metadata. 595 596 :param dict query: Query parameters, from client-side. 597 :param request: Flask request 598 :param User user: User object of user who has submitted the query 599 :return dict: Safe query parameters 600 """ 601 if query.get("method") == "zip": 602 filename = "" 603 if "option-data_upload-entries" in request.form: 604 # First pass sends list of files in the zip 605 pass 606 elif "option-data_upload" in request.files: 607 # Second pass sends the actual file 608 file = request.files["option-data_upload"] 609 if not file: 610 raise QueryParametersException("No file uploaded.") 611 612 if not file.filename.endswith(".zip"): 613 raise QueryParametersException("Uploaded file must be a ZIP file.") 614 615 filename = file.filename 616 else: 617 raise QueryParametersException("No file was offered for upload.") 618 619 return { 620 "method": "zip", 621 "filename": filename 622 } 623 elif query.get("method") == "url": 624 urls = query.get("url") 625 if not urls: 626 raise QueryParametersException("Provide at least one dataset URL.") 627 628 urls = urls.split(",") 629 bases = set([url.split("/results/")[0].lower() for url in urls]) 630 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 631 632 if len(keys) != 1: 633 # todo: change this to < 1 if we allow multiple datasets 634 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 635 636 if len(bases) != 1: 637 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 638 "one 4CAT server at a time.") 639 640 base = urls[0].split("/results/")[0] 641 try: 642 # test if API key is valid and server is reachable 643 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 644 except FourcatImportException as e: 645 raise QueryParametersException(str(e)) 646 647 try: 648 # test if we get a response we can parse 649 metadata = test.json() 650 except ValueError: 651 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 652 653 version = get_software_version() 654 655 if metadata.get("current_4CAT_version") != version: 656 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 657 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 658 "sure both are running the same version of 4CAT and try again.") 659 660 # OK, we can import at least one dataset 661 return { 662 "url": ",".join(urls), 663 "api-key": query.get("api-key") 664 } 665 else: 666 raise QueryParametersException("Import method not yet implemented.") 667 668 @staticmethod 669 def get_keys_from_urls(urls): 670 """ 671 Get dataset keys from 4CAT URLs 672 673 :param list urls: List of URLs 674 :return list: List of keys 675 """ 676 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls] 677 678 @staticmethod 679 def ensure_key(query): 680 """ 681 Determine key for dataset generated by this processor 682 683 When importing datasets, it's necessary to determine the key of the 684 dataset that is created before it is actually created, because we want 685 to keep the original key of the imported dataset if possible. Luckily, 686 we can deduce it from the URL we're importing the dataset from. 687 688 :param dict query: Input from the user, through the front-end 689 :return str: Desired dataset key 690 """ 691 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 692 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 693 urls = query.get("url", "").split(",") 694 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 695 return keys[0]
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, str user=None):) -> bool
class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod def is_compatible_with(cls, module=None, user=None): return module.type == "linguistic-features"
82 def process(self): 83 """ 84 Import 4CAT dataset either from another 4CAT server or from the uploaded zip file 85 """ 86 self.created_datasets = set() # keys of created datasets - may not be successful! 87 self.remapped_keys = {} # changed dataset keys 88 self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner 89 try: 90 if self.parameters.get("method") == "zip": 91 self.process_zip() 92 else: 93 self.process_urls() 94 except Exception as e: 95 # Catch all exceptions and finish the job with an error 96 # Resuming is impossible because this dataset was overwritten with the importing dataset 97 # halt_and_catch_fire() will clean up and delete the datasets that were created 98 self.interrupted = True 99 try: 100 self.halt_and_catch_fire() 101 except ProcessorInterruptedException: 102 pass 103 # Reraise the original exception for logging 104 raise e
Import 4CAT dataset either from another 4CAT server or from the uploaded zip file
106 def after_create(query, dataset, request): 107 """ 108 Hook to execute after the dataset for this source has been created 109 110 In this case, put the file in a temporary location so it can be 111 processed properly by the related Job later. 112 113 :param dict query: Sanitised query parameters 114 :param DataSet dataset: Dataset created for this query 115 :param request: Flask request submitted for its creation 116 """ 117 if query.get("method") == "zip": 118 file = request.files["option-data_upload"] 119 file.seek(0) 120 with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: 121 while True: 122 chunk = file.read(1024) 123 if len(chunk) == 0: 124 break 125 outfile.write(chunk) 126 else: 127 # nothing to do for URLs 128 pass
Hook to execute after the dataset for this source has been created
In this case, put the file in a temporary location so it can be processed properly by the related Job later.
Parameters
- dict query: Sanitised query parameters
- DataSet dataset: Dataset created for this query
- request: Flask request submitted for its creation
131 def process_zip(self): 132 """ 133 Import 4CAT dataset from a ZIP file 134 """ 135 self.dataset.update_status(f"Importing datasets and analyses from ZIP file.") 136 temp_file = self.dataset.get_results_path().with_suffix(".importing") 137 138 imported = [] 139 processed_files = 1 # take into account the export.log file 140 failed_imports = [] 141 primary_dataset_original_log = None 142 with zipfile.ZipFile(temp_file, "r") as zip_ref: 143 zip_contents = zip_ref.namelist() 144 145 # Get all metadata files and determine primary dataset 146 metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] 147 if not metadata_files: 148 self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") 149 return 150 151 # Get the primary dataset 152 primary_dataset_keys = set() 153 datasets = [] 154 parent_child_mapping = {} 155 for file in metadata_files: 156 with zip_ref.open(file) as f: 157 content = f.read().decode('utf-8') # Decode the binary content using the desired encoding 158 metadata = json.loads(content) 159 if not metadata.get("key_parent"): 160 primary_dataset_keys.add(metadata.get("key")) 161 datasets.append(metadata) 162 else: 163 # Store the mapping of parent to child datasets 164 parent_key = metadata.get("key_parent") 165 if parent_key not in parent_child_mapping: 166 parent_child_mapping[parent_key] = [] 167 parent_child_mapping[parent_key].append(metadata) 168 169 # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets 170 if len(primary_dataset_keys) != 1: 171 self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") 172 return 173 174 # Import datasets 175 while datasets: 176 self.halt_and_catch_fire() 177 178 # Create the datasets 179 metadata = datasets.pop(0) 180 dataset_key = metadata.get("key") 181 processed_metadata = self.process_metadata(metadata) 182 new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) 183 processed_files += 1 184 185 # Copy the log file 186 self.halt_and_catch_fire() 187 log_filename = Path(metadata["result_file"]).with_suffix(".log").name 188 if log_filename in zip_contents: 189 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 190 with zip_ref.open(log_filename) as f: 191 content = f.read().decode('utf-8') 192 if new_dataset.key == self.dataset.key: 193 # Hold the original log for the primary dataset and add at the end 194 primary_dataset_original_log = content 195 else: 196 new_dataset.log("Original dataset log included below:") 197 with new_dataset.get_log_path().open("a") as outfile: 198 outfile.write(content) 199 processed_files += 1 200 else: 201 self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") 202 203 # Copy the results 204 self.halt_and_catch_fire() 205 results_filename = metadata["result_file"] 206 if results_filename in zip_contents: 207 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 208 with zip_ref.open(results_filename) as f: 209 with new_dataset.get_results_path().open("wb") as outfile: 210 outfile.write(f.read()) 211 processed_files += 1 212 213 if not imported: 214 # first dataset - use num rows as 'overall' 215 num_rows = metadata["num_rows"] 216 else: 217 self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 218 new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") 219 failed_imports.append(dataset_key) 220 continue 221 222 # finally, the kids 223 self.halt_and_catch_fire() 224 if dataset_key in parent_child_mapping: 225 datasets.extend(parent_child_mapping[dataset_key]) 226 self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") 227 228 # done - remember that we've imported this one 229 imported.append(new_dataset) 230 new_dataset.update_status(metadata["status"]) 231 232 if new_dataset.key != self.dataset.key: 233 # only finish if this is not the 'main' dataset, or the user 234 # will think the whole import is done 235 new_dataset.finish(metadata["num_rows"]) 236 237 # Check that all files were processed 238 missed_files = [] 239 if len(zip_contents) != processed_files: 240 for file in zip_contents: 241 if file not in processed_files: 242 missed_files.append(file) 243 244 # todo: this part needs updating if/when we support importing multiple datasets! 245 if failed_imports: 246 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 247 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 248 f"dataset log file for details.", is_final=True) 249 elif missed_files: 250 self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") 251 self.dataset.update_status(f"Dataset import finished, but not all files were processed. " 252 f"{len(missed_files)} files were not successfully imported. Check the " 253 f"dataset log file for details.", is_final=True) 254 else: 255 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", 256 is_final=True) 257 258 if not self.dataset.is_finished(): 259 # now all related datasets are imported, we can finish the 'main' 260 # dataset, and the user will be alerted that the full import is 261 # complete 262 self.dataset.finish(num_rows) 263 264 # Add the original log for the primary dataset 265 if primary_dataset_original_log: 266 self.dataset.log("Original dataset log included below:\n") 267 with self.dataset.get_log_path().open("a") as outfile: 268 outfile.write(primary_dataset_original_log)
Import 4CAT dataset from a ZIP file
271 @staticmethod 272 def process_metadata(metadata): 273 """ 274 Process metadata for import 275 """ 276 # get rid of some keys that are server-specific and don't need to 277 # be stored (or don't correspond to database columns) 278 metadata.pop("current_4CAT_version") 279 metadata.pop("id") 280 metadata.pop("job") 281 metadata.pop("is_private") 282 metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! 283 284 # extra params are stored as JSON... 285 metadata["parameters"] = json.loads(metadata["parameters"]) 286 if "copied_from" in metadata["parameters"]: 287 metadata["parameters"].pop("copied_from") 288 metadata["parameters"] = json.dumps(metadata["parameters"]) 289 290 return metadata
Process metadata for import
292 def create_dataset(self, metadata, original_key, primary=False): 293 """ 294 Create a new dataset 295 """ 296 if primary: 297 self.dataset.update_status(f"Importing primary dataset {original_key}.") 298 # if this is the first dataset we're importing, make it the 299 # processor's "own" dataset. the key has already been set to 300 # the imported dataset's key via ensure_key() (or a new unqiue 301 # key if it already existed on this server) 302 # by making it the "own" dataset, the user initiating the 303 # import will see the imported dataset as the "result" of their 304 # import query in the interface, similar to the workflow for 305 # other data sources 306 new_dataset = self.dataset 307 308 # Update metadata and file 309 metadata.pop("key") # key already OK (see above) 310 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 311 312 else: 313 self.dataset.update_status(f"Importing child dataset {original_key}.") 314 # supernumerary datasets - handle on their own 315 # these include any children of imported datasets 316 try: 317 key_exists = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 318 319 # if we *haven't* thrown a DatasetException now, then the 320 # key is already in use, so create a "dummy" dataset and 321 # overwrite it with the metadata we have (except for the 322 # key). this ensures that a new unique key will be 323 # generated. 324 new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) 325 metadata.pop("key") 326 self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) 327 328 except DataSetException: 329 # this is *good* since it means the key doesn't exist, so 330 # we can re-use the key of the imported dataset 331 self.db.insert("datasets", data=metadata) 332 new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) 333 334 if new_dataset.key != original_key: 335 # could not use original key because it was already in use 336 # so update any references to use the new key 337 self.remapped_keys[original_key] = new_dataset.key 338 self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " 339 f"{new_dataset.key} instead of key {original_key}!") 340 341 # refresh object, make sure it's in sync with the database 342 self.created_datasets.add(new_dataset.key) 343 new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) 344 current_log = None 345 if new_dataset.key == self.dataset.key: 346 # this ensures that the first imported dataset becomes the 347 # processor's "own" dataset, and that the import logs go to 348 # that dataset's log file. For later imports, this evaluates to 349 # False. 350 351 # Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path) 352 current_log = self.dataset.get_log_path().read_text() 353 # Update the dataset 354 self.dataset = new_dataset 355 356 # if the key of the parent dataset was changed, change the 357 # reference to it that the child dataset has 358 if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: 359 new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] 360 361 # update some attributes that should come from the new server, not 362 # the old 363 new_dataset.creator = self.dataset_owner 364 new_dataset.original_timestamp = new_dataset.timestamp 365 new_dataset.imported = True 366 new_dataset.timestamp = int(time.time()) 367 new_dataset.db.commit() 368 369 # make sure the dataset path uses the new key and local dataset 370 # path settings. this also makes sure the log file is created in 371 # the right place (since it is derived from the results file path) 372 extension = metadata["result_file"].split(".")[-1] 373 updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) 374 if not updated: 375 self.dataset.log(f"Could not reserve result file for {new_dataset.key}!") 376 377 if current_log: 378 # Add the current log to the new dataset 379 with new_dataset.get_log_path().open("a") as outfile: 380 outfile.write(current_log) 381 382 return new_dataset
Create a new dataset
385 def process_urls(self): 386 """ 387 Import 4CAT dataset from another 4CAT server 388 389 Interfaces with another 4CAT server to transfer a dataset's metadata, 390 data files and child datasets. 391 """ 392 urls = [url.strip() for url in self.parameters.get("url").split(",")] 393 self.base = urls[0].split("/results/")[0] 394 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 395 api_key = self.parameters.get("api-key") 396 397 imported = [] # successfully imported datasets 398 failed_imports = [] # keys that failed to import 399 num_rows = 0 # will be used later 400 401 # we can add support for multiple datasets later by removing 402 # this part! 403 keys = [keys[0]] 404 405 while keys: 406 dataset_key = keys.pop(0) 407 408 self.halt_and_catch_fire() 409 self.dataset.log(f"Importing dataset {dataset_key} from 4CAT server {self.base}.") 410 411 # first, metadata! 412 try: 413 metadata = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "metadata") 414 metadata = metadata.json() 415 except FourcatImportException as e: 416 self.dataset.log(f"Error retrieving record for dataset {dataset_key}: {e}") 417 continue 418 except ValueError: 419 self.dataset.log(f"Could not read metadata for dataset {dataset_key}") 420 continue 421 422 # copying empty datasets doesn't really make sense 423 if metadata["num_rows"] == 0: 424 self.dataset.update_status(f"Skipping empty dataset {dataset_key}") 425 failed_imports.append(dataset_key) 426 continue 427 428 metadata = self.process_metadata(metadata) 429 430 # create the new dataset 431 new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) 432 433 # then, the log 434 self.halt_and_catch_fire() 435 try: 436 self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") 437 # TODO: for the primary, this ends up in the middle of the log as we are still adding to it... 438 log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log") 439 logpath = new_dataset.get_log_path() 440 new_dataset.log("Original dataset log included below:") 441 with logpath.open("a") as outfile: 442 outfile.write(log.text) 443 except FourcatImportException as e: 444 new_dataset.finish_with_error(f"Error retrieving log for dataset {new_dataset.key}: {e}") 445 failed_imports.append(dataset_key) 446 continue 447 except ValueError: 448 new_dataset.finish_with_error(f"Could not read log for dataset {new_dataset.key}: skipping dataset") 449 failed_imports.append(dataset_key) 450 continue 451 452 # then, the results 453 self.halt_and_catch_fire() 454 try: 455 self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") 456 datapath = new_dataset.get_results_path() 457 data = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "data", datapath) 458 459 if not imported: 460 # first dataset - use num rows as 'overall' 461 num_rows = metadata["num_rows"] 462 463 except FourcatImportException as e: 464 self.dataset.log(f"Dataset {new_dataset.key} unable to import: {e}, skipping import") 465 if new_dataset.key != self.dataset.key: 466 new_dataset.delete() 467 continue 468 469 except ValueError: 470 new_dataset.finish_with_error(f"Could not read results for dataset {new_dataset.key}") 471 failed_imports.append(dataset_key) 472 continue 473 474 # finally, the kids 475 self.halt_and_catch_fire() 476 try: 477 self.dataset.update_status(f"Looking for child datasets to transfer for dataset {new_dataset.key}") 478 children = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "children") 479 children = children.json() 480 except FourcatImportException as e: 481 self.dataset.update_status(f"Error retrieving children for dataset {new_dataset.key}: {e}") 482 failed_imports.append(dataset_key) 483 continue 484 except ValueError: 485 self.dataset.update_status(f"Could not collect children for dataset {new_dataset.key}") 486 failed_imports.append(dataset_key) 487 continue 488 489 for child in children: 490 keys.append(child) 491 self.dataset.log(f"Adding child dataset {child} to import queue") 492 493 # done - remember that we've imported this one 494 imported.append(new_dataset) 495 new_dataset.update_status(metadata["status"]) 496 497 if new_dataset.key != self.dataset.key: 498 # only finish if this is not the 'main' dataset, or the user 499 # will think the whole import is done 500 new_dataset.finish(metadata["num_rows"]) 501 502 # todo: this part needs updating if/when we support importing multiple datasets! 503 if failed_imports: 504 self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " 505 f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " 506 f"dataset log file for details.", is_final=True) 507 else: 508 self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported from {self.base}.", 509 is_final=True) 510 511 if not self.dataset.is_finished(): 512 # now all related datasets are imported, we can finish the 'main' 513 # dataset, and the user will be alerted that the full import is 514 # complete 515 self.dataset.finish(num_rows)
Import 4CAT dataset from another 4CAT server
Interfaces with another 4CAT server to transfer a dataset's metadata, data files and child datasets.
517 def halt_and_catch_fire(self): 518 """ 519 Clean up on interrupt 520 521 There are multiple places in the code where we can bail out on an 522 interrupt, so abstract that away in its own function. 523 :return: 524 """ 525 if self.interrupted: 526 # resuming is impossible because the original dataset (which 527 # has the list of URLs to import) has probably been 528 # overwritten by this point 529 deletables = [k for k in self.created_datasets if k != self.dataset.key] 530 for deletable in deletables: 531 DataSet(key=deletable, db=self.db, modules=self.modules).delete() 532 533 self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " 534 f"will need to initiate the import again.") 535 536 raise ProcessorInterruptedException()
Clean up on interrupt
There are multiple places in the code where we can bail out on an interrupt, so abstract that away in its own function.
Returns
538 @staticmethod 539 def fetch_from_4cat(base, dataset_key, api_key, component, datapath=None): 540 """ 541 Get dataset component from 4CAT export API 542 543 :param str base: Server URL base to import from 544 :param str dataset_key: Key of dataset to import 545 :param str api_key: API authentication token 546 :param str component: Component to retrieve 547 :return: HTTP response object 548 """ 549 try: 550 if component == "data" and datapath: 551 # Stream data 552 with requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, stream=True, 553 headers={ 554 "User-Agent": "4cat/import", 555 "Authentication": api_key 556 }) as r: 557 r.raise_for_status() 558 with datapath.open("wb") as outfile: 559 for chunk in r.iter_content(chunk_size=8192): 560 outfile.write(chunk) 561 return r 562 else: 563 response = requests.get(f"{base}/api/export-packed-dataset/{dataset_key}/{component}/", timeout=5, headers={ 564 "User-Agent": "4cat/import", 565 "Authentication": api_key 566 }) 567 except requests.Timeout: 568 raise FourcatImportException(f"The 4CAT server at {base} took too long to respond. Make sure it is " 569 f"accessible to external connections and try again.") 570 except requests.RequestException as e: 571 raise FourcatImportException(f"Could not connect to the 4CAT server at {base} ({e}). Make sure it is " 572 f"accessible to external connections and try again.") 573 574 if response.status_code == 404: 575 raise FourcatImportException( 576 f"Dataset {dataset_key} not found at server {base} ({response.text}. Make sure all URLs point to " 577 f"a valid dataset.") 578 elif response.status_code in (401, 403): 579 raise FourcatImportException( 580 f"Dataset {dataset_key} not accessible at server {base}. Make sure you have access to this " 581 f"dataset and are using the correct API key.") 582 elif response.status_code != 200: 583 raise FourcatImportException( 584 f"Unexpected error while requesting {component} for dataset {dataset_key} from server {base}: {response.text}") 585 586 return response
Get dataset component from 4CAT export API
Parameters
- str base: Server URL base to import from
- str dataset_key: Key of dataset to import
- str api_key: API authentication token
- str component: Component to retrieve
Returns
HTTP response object
588 @staticmethod 589 def validate_query(query, request, user): 590 """ 591 Validate custom data input 592 593 Confirms that the uploaded file is a valid CSV or tab file and, if so, returns 594 some metadata. 595 596 :param dict query: Query parameters, from client-side. 597 :param request: Flask request 598 :param User user: User object of user who has submitted the query 599 :return dict: Safe query parameters 600 """ 601 if query.get("method") == "zip": 602 filename = "" 603 if "option-data_upload-entries" in request.form: 604 # First pass sends list of files in the zip 605 pass 606 elif "option-data_upload" in request.files: 607 # Second pass sends the actual file 608 file = request.files["option-data_upload"] 609 if not file: 610 raise QueryParametersException("No file uploaded.") 611 612 if not file.filename.endswith(".zip"): 613 raise QueryParametersException("Uploaded file must be a ZIP file.") 614 615 filename = file.filename 616 else: 617 raise QueryParametersException("No file was offered for upload.") 618 619 return { 620 "method": "zip", 621 "filename": filename 622 } 623 elif query.get("method") == "url": 624 urls = query.get("url") 625 if not urls: 626 raise QueryParametersException("Provide at least one dataset URL.") 627 628 urls = urls.split(",") 629 bases = set([url.split("/results/")[0].lower() for url in urls]) 630 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 631 632 if len(keys) != 1: 633 # todo: change this to < 1 if we allow multiple datasets 634 raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") 635 636 if len(bases) != 1: 637 raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " 638 "one 4CAT server at a time.") 639 640 base = urls[0].split("/results/")[0] 641 try: 642 # test if API key is valid and server is reachable 643 test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") 644 except FourcatImportException as e: 645 raise QueryParametersException(str(e)) 646 647 try: 648 # test if we get a response we can parse 649 metadata = test.json() 650 except ValueError: 651 raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") 652 653 version = get_software_version() 654 655 if metadata.get("current_4CAT_version") != version: 656 raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " 657 f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " 658 "sure both are running the same version of 4CAT and try again.") 659 660 # OK, we can import at least one dataset 661 return { 662 "url": ",".join(urls), 663 "api-key": query.get("api-key") 664 } 665 else: 666 raise QueryParametersException("Import method not yet implemented.")
Validate custom data input
Confirms that the uploaded file is a valid CSV or tab file and, if so, returns some metadata.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
668 @staticmethod 669 def get_keys_from_urls(urls): 670 """ 671 Get dataset keys from 4CAT URLs 672 673 :param list urls: List of URLs 674 :return list: List of keys 675 """ 676 return [url.split("/results/")[-1].split("/")[0].split("#")[0].split("?")[0] for url in urls]
Get dataset keys from 4CAT URLs
Parameters
- list urls: List of URLs
Returns
List of keys
678 @staticmethod 679 def ensure_key(query): 680 """ 681 Determine key for dataset generated by this processor 682 683 When importing datasets, it's necessary to determine the key of the 684 dataset that is created before it is actually created, because we want 685 to keep the original key of the imported dataset if possible. Luckily, 686 we can deduce it from the URL we're importing the dataset from. 687 688 :param dict query: Input from the user, through the front-end 689 :return str: Desired dataset key 690 """ 691 #TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after 692 # this method is called via `after_create`. We could download here and also identify the primary dataset key... 693 urls = query.get("url", "").split(",") 694 keys = SearchImportFromFourcat.get_keys_from_urls(urls) 695 return keys[0]
Determine key for dataset generated by this processor
When importing datasets, it's necessary to determine the key of the dataset that is created before it is actually created, because we want to keep the original key of the imported dataset if possible. Luckily, we can deduce it from the URL we're importing the dataset from.
Parameters
- dict query: Input from the user, through the front-end
Returns
Desired dataset key
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- extension
- config
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor