common.lib.dataset
1import collections 2import itertools 3import datetime 4import zipfile 5import fnmatch 6import random 7import shutil 8import json 9import time 10import csv 11import re 12import os 13 14from common.lib.annotation import Annotation 15from common.lib.job import Job, JobNotFoundException 16 17from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, call_api, hash_to_md5, convert_to_float 18from common.lib.item_mapping import MappedItem, DatasetItem 19from common.lib.fourcat_module import FourcatModule 20from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException, 21 MapItemException, MappedItemIncompleteException, AnnotationException) 22 23 24class DataSet(FourcatModule): 25 """ 26 Provide interface to safely register and run operations on a dataset 27 28 A dataset is a collection of: 29 - A unique identifier 30 - A set of parameters that demarcate the data contained within 31 - The data 32 33 The data is usually stored in a file on the disk; the parameters are stored 34 in a database. The handling of the data, et cetera, is done by other 35 workers; this class defines method to create and manipulate the dataset's 36 properties. 37 """ 38 39 # Attributes must be created here to ensure getattr and setattr work properly 40 data = None 41 key = "" 42 43 _children = None 44 available_processors = None 45 _genealogy = None 46 preset_parent = None 47 parameters = None 48 modules = None 49 annotation_fields = None 50 51 owners = None 52 tagged_owners = None 53 54 db = None 55 folder = None 56 is_new = True 57 58 no_status_updates = False 59 disposable_files = None 60 _queue_position = None 61 62 def __init__( 63 self, 64 parameters=None, 65 key=None, 66 job=None, 67 data=None, 68 db=None, 69 parent="", 70 extension=None, 71 type=None, 72 is_private=True, 73 owner="anonymous", 74 modules=None 75 ): 76 """ 77 Create new dataset object 78 79 If the dataset is not in the database yet, it is added. 80 81 :param dict parameters: Only when creating a new dataset. Dataset 82 parameters, free-form dictionary. 83 :param str key: Dataset key. If given, dataset with this key is loaded. 84 :param int job: Job ID. If given, dataset corresponding to job is 85 loaded. 86 :param dict data: Dataset data, corresponding to a row in the datasets 87 database table. If not given, retrieved from database depending on key. 88 :param db: Database connection 89 :param str parent: Only when creating a new dataset. Parent dataset 90 key to which the one being created is a child. 91 :param str extension: Only when creating a new dataset. Extension of 92 dataset result file. 93 :param str type: Only when creating a new dataset. Type of the dataset, 94 corresponding to the type property of a processor class. 95 :param bool is_private: Only when creating a new dataset. Whether the 96 dataset is private or public. 97 :param str owner: Only when creating a new dataset. The user name of 98 the dataset's creator. 99 :param modules: Module cache. If not given, will be loaded when needed 100 (expensive). Used to figure out what processors are compatible with 101 this dataset. 102 """ 103 self.db = db 104 105 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 106 self.data = {} 107 self.parameters = {} 108 self.available_processors = {} 109 self._genealogy = None 110 self.disposable_files = [] 111 self.modules = modules 112 113 if key is not None: 114 self.key = key 115 current = self.db.fetchone( 116 "SELECT * FROM datasets WHERE key = %s", (self.key,) 117 ) 118 if not current: 119 raise DataSetNotFoundException( 120 "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" 121 % key 122 ) 123 124 elif job is not None: 125 current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),)) 126 if not current: 127 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 128 129 self.key = current["key"] 130 elif data is not None: 131 current = data 132 if ( 133 "query" not in data 134 or "key" not in data 135 or "parameters" not in data 136 or "key_parent" not in data 137 ): 138 raise DataSetException( 139 "DataSet() requires a complete dataset record for its 'data' argument" 140 ) 141 142 self.key = current["key"] 143 else: 144 if parameters is None: 145 raise DataSetException( 146 "DataSet() requires either 'key', or 'parameters' to be given" 147 ) 148 149 if not type: 150 raise DataSetException("Datasets must have their type set explicitly") 151 152 query = self.get_label(parameters, default=type) 153 self.key = self.get_key(query, parameters, parent) 154 current = self.db.fetchone( 155 "SELECT * FROM datasets WHERE key = %s AND query = %s", 156 (self.key, query), 157 ) 158 159 if current: 160 self.data = current 161 self.parameters = json.loads(self.data["parameters"]) 162 self.annotation_fields = json.loads(self.data["annotation_fields"]) \ 163 if self.data.get("annotation_fields") else {} 164 self.is_new = False 165 else: 166 self.data = {"type": type} # get_own_processor needs this 167 own_processor = self.get_own_processor() 168 version = get_software_commit(own_processor) 169 self.data = { 170 "key": self.key, 171 "query": self.get_label(parameters, default=type), 172 "parameters": json.dumps(parameters), 173 "result_file": "", 174 "creator": owner, 175 "status": "", 176 "type": type, 177 "timestamp": int(time.time()), 178 "is_finished": False, 179 "is_private": is_private, 180 "software_version": version[0], 181 "software_source": version[1], 182 "software_file": "", 183 "num_rows": 0, 184 "progress": 0.0, 185 "key_parent": parent, 186 "annotation_fields": "{}" 187 } 188 self.parameters = parameters 189 self.annotation_fields = {} 190 191 self.db.insert("datasets", data=self.data) 192 self.refresh_owners() 193 self.add_owner(owner) 194 195 # Find desired extension from processor if not explicitly set 196 if extension is None: 197 if own_processor: 198 extension = own_processor.get_extension( 199 parent_dataset=DataSet(key=parent, db=db, modules=self.modules) 200 if parent 201 else None 202 ) 203 # Still no extension, default to 'csv' 204 if not extension: 205 extension = "csv" 206 207 # Reserve filename and update data['result_file'] 208 self.reserve_result_file(parameters, extension) 209 210 self.refresh_owners() 211 212 def check_dataset_finished(self): 213 """ 214 Checks if dataset is finished. Returns path to results file is not empty, 215 or 'empty_file' when there were not matches. 216 217 Only returns a path if the dataset is complete. In other words, if this 218 method returns a path, a file with the complete results for this dataset 219 will exist at that location. 220 221 :return: A path to the results file, 'empty_file', or `None` 222 """ 223 if self.data["is_finished"] and self.data["num_rows"] > 0: 224 return self.get_results_path() 225 elif self.data["is_finished"] and self.data["num_rows"] == 0: 226 return 'empty' 227 else: 228 return None 229 230 def get_results_path(self): 231 """ 232 Get path to results file 233 234 Always returns a path, that will at some point contain the dataset 235 data, but may not do so yet. Use this to get the location to write 236 generated results to. 237 238 :return Path: A path to the results file 239 """ 240 # alas we need to instantiate a config reader here - no way around it 241 if not self.folder: 242 self.folder = self.modules.config.get('PATH_DATA') 243 return self.folder.joinpath(self.data["result_file"]) 244 245 def get_results_folder_path(self): 246 """ 247 Get path to folder containing accompanying results 248 249 Returns a path that may not yet be created 250 251 :return Path: A path to the results file 252 """ 253 return self.get_results_path().parent.joinpath("folder_" + self.key) 254 255 def get_log_path(self): 256 """ 257 Get path to dataset log file 258 259 Each dataset has a single log file that documents its creation. This 260 method returns the path to that file. It is identical to the path of 261 the dataset result file, with 'log' as its extension instead. 262 263 :return Path: A path to the log file 264 """ 265 return self.get_results_path().with_suffix(".log") 266 267 def clear_log(self): 268 """ 269 Clears the dataset log file 270 271 If the log file does not exist, it is created empty. The log file will 272 have the same file name as the dataset result file, with the 'log' 273 extension. 274 """ 275 log_path = self.get_log_path() 276 with log_path.open("w"): 277 pass 278 279 def log(self, log): 280 """ 281 Write log message to file 282 283 Writes the log message to the log file on a new line, including a 284 timestamp at the start of the line. Note that this assumes the log file 285 already exists - it should have been created/cleared with clear_log() 286 prior to calling this. 287 288 :param str log: Log message to write 289 """ 290 log_path = self.get_log_path() 291 with log_path.open("a", encoding="utf-8") as outfile: 292 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log)) 293 294 def _iterate_items(self, processor=None, offset=0, *args, **kwargs): 295 """ 296 A generator that iterates through a CSV or NDJSON file 297 298 This is an internal method and should not be called directly. Rather, 299 call iterate_items() and use the generated dictionary and its properties. 300 301 If a reference to a processor is provided, with every iteration, 302 the processor's 'interrupted' flag is checked, and if set a 303 ProcessorInterruptedException is raised, which by default is caught 304 in the worker and subsequently stops execution gracefully. 305 306 There are two file types that can be iterated (currently): CSV files 307 and NDJSON (newline-delimited JSON) files. In the future, one could 308 envision adding a pathway to retrieve items from e.g. a MongoDB 309 collection directly instead of from a static file 310 311 :param BasicProcessor processor: A reference to the processor 312 iterating the dataset. 313 :param offset int: How many items to skip. 314 :return generator: A generator that yields each item as a dictionary 315 """ 316 path = self.get_results_path() 317 318 # Yield through items one by one 319 if path.suffix.lower() == ".csv": 320 with path.open("rb") as infile: 321 wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8") 322 reader = csv.DictReader(wrapped_infile) 323 324 if not self.get_own_processor(): 325 # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not 326 first_item = next(reader) 327 if first_item is None or any( 328 [True for key in first_item if type(key) is not str] 329 ): 330 raise NotImplementedError( 331 f"Cannot iterate through CSV file (deprecated processor {self.type})" 332 ) 333 yield first_item 334 335 for i, item in enumerate(reader): 336 if hasattr(processor, "interrupted") and processor.interrupted: 337 raise ProcessorInterruptedException( 338 "Processor interrupted while iterating through CSV file" 339 ) 340 341 if i < offset: 342 continue 343 344 yield item 345 346 elif path.suffix.lower() == ".ndjson": 347 # In NDJSON format each line in the file is a self-contained JSON 348 with path.open(encoding="utf-8") as infile: 349 for i, line in enumerate(infile): 350 if hasattr(processor, "interrupted") and processor.interrupted: 351 raise ProcessorInterruptedException( 352 "Processor interrupted while iterating through NDJSON file" 353 ) 354 355 if i < offset: 356 continue 357 358 yield json.loads(line) 359 360 else: 361 raise NotImplementedError(f"Cannot iterate through {path.suffix} file") 362 363 def _iterate_archive_contents( 364 self, 365 staging_area=None, 366 immediately_delete=True, 367 filename_filter=None, 368 processor=None, 369 offset=0, 370 *args, **kwargs 371 ): 372 """ 373 A generator that iterates through files in an archive 374 375 With every iteration, the processor's 'interrupted' flag is checked, 376 and if set a ProcessorInterruptedException is raised, which by default 377 is caught and subsequently stops execution gracefully. 378 379 Files are temporarily unzipped and deleted after use. 380 381 :param Path staging_area: Where to store the files while they're 382 being worked with. If omitted, a temporary folder is created and 383 marked for deletion after all files have been yielded 384 :param bool immediately_delete: Temporary files are removed after 385 yielding; False keeps files until the staging_area is removed 386 :param list filename_filter: Whitelist of filenames to iterate. If 387 empty, do not filter 388 :param BasicProcessor processor: A reference to the processor 389 iterating the dataset. 390 :param int offset: Skip this many files before yielding (warning: may 391 skip a metadata file too!) 392 :return: An iterator with a dictionary for each file, containing an 393 `id`, a `path`, and all attributes of the `ZipInfo` object as keys 394 """ 395 path = self.get_results_path() 396 if not path.exists(): 397 return 398 399 if not staging_area: 400 staging_area = self.get_staging_area() 401 402 if not staging_area.exists() or not staging_area.is_dir(): 403 raise RuntimeError(f"Staging area {staging_area} is not a valid folder") 404 405 iterations = 0 406 with zipfile.ZipFile(path, "r") as archive_file: 407 # sorting is important because it ensures .metadata.json is read 408 # first 409 archive_contents = sorted(archive_file.infolist(), key=lambda x: x.filename) 410 for archived_file in archive_contents: 411 412 if filename_filter and archived_file.filename not in filename_filter: 413 continue 414 415 if archived_file.is_dir(): 416 # do not yield folders - we'll get to the files in them 417 continue 418 419 if iterations < offset: 420 iterations += 1 421 continue 422 423 if hasattr(processor, "interrupted") and processor.interrupted: 424 raise ProcessorInterruptedException( 425 "Processor interrupted while iterating through Zip archive" 426 ) 427 428 iterations += 1 429 temp_file = staging_area.joinpath(archived_file.filename) 430 archive_file.extract(archived_file.filename, staging_area) 431 432 # iterated items are expected as a dictionary 433 # we thus make a dictionary from the ZipInfo object 434 # and use the path (inside the archive) as a unique ID 435 yield { 436 "id": archived_file.filename, 437 "path": temp_file, 438 **{ 439 attribute: getattr(archived_file, attribute) for attribute in dir(archived_file) if not attribute.startswith("_") 440 } 441 } 442 443 if immediately_delete: 444 # this, effectively, triggers when the *next* item is 445 # asked for, or if it is the last file 446 temp_file.unlink() 447 448 def iterate_items( 449 self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None, 450 offset=0, *args, **kwargs 451 ): 452 """ 453 Generate mapped dataset items 454 455 Wrapper for _iterate_items that returns a DatasetItem, which can be 456 accessed as a dict returning the original item or (if a mapper is 457 available) the mapped item. Mapped or original versions of the item can 458 also be accessed via the `original` and `mapped_object` properties of 459 the DatasetItem. 460 461 Processors can define a method called `map_item` that can be used to map 462 an item from the dataset file before it is processed any further. This is 463 slower than storing the data file in the right format to begin with but 464 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 465 are nested objects when retrieved from the twitter API that are easier 466 to store as a JSON file than as a flat CSV file, and it would be a shame 467 to throw away that data. 468 469 Note the two parameters warn_unmappable and map_missing. Items can be 470 unmappable in that their structure is too different to coerce into a 471 neat dictionary of the structure the data source expects. This makes it 472 'unmappable' and warn_unmappable determines what happens in this case. 473 It can also be of the right structure, but with some fields missing or 474 incomplete. map_missing determines what happens in that case. The 475 latter is for example possible when importing data via Zeeschuimer, 476 which produces unstably-structured data captured from social media 477 sites. 478 479 :param BasicProcessor processor: A reference to the processor 480 iterating the dataset. 481 :param bool warn_unmappable: If an item is not mappable, skip the item 482 and log a warning 483 :param max_unmappable: Skip at most this many unmappable items; if 484 more are encountered, stop iterating. `None` to never stop. 485 :param map_missing: Indicates what to do with mapped items for which 486 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 487 - 'default': fill missing fields with the default passed by map_item 488 - 'abort': raise a MappedItemIncompleteException if a field is missing 489 - a callback: replace missing field with the return value of the 490 callback. The MappedItem object is passed to the callback as the 491 first argument and the name of the missing field as the second. 492 - a dictionary with a key for each possible missing field: replace missing 493 field with a strategy for that field ('default', 'abort', or a callback) 494 :param get_annotations: Whether to also fetch annotations from the database. 495 This can be disabled to help speed up iteration. 496 :param offset: After how many rows we should yield items. 497 :param bool immediately_delete: Only used when iterating a file 498 archive. Defaults to `True`, if set to `False`, files are not deleted 499 from the staging area after the iteration, so they can be re-used. 500 :param staging_area: Only used when iterating a file archive. Where to 501 store the files while they're being worked with. If omitted, a 502 temporary folder is created and marked for deletion after all files 503 have been yielded. 504 :param list filename_filter: Only used when iterating a file archive. 505 Whitelist of filenames to iterate, others are skipped. If empty, do 506 not filter. 507 :return generator: A generator that yields DatasetItems 508 """ 509 unmapped_items = 0 510 511 # Collect item_mapper for use with filter 512 item_mapper = False 513 own_processor = self.get_own_processor() 514 if own_processor and own_processor.map_item_method_available(dataset=self): 515 item_mapper = True 516 517 # Annotations are dynamically added, and we're handling them as 'extra' map_item fields. 518 # If we're getting annotations, we're caching items so we don't need to retrieve annotations one-by-one. 519 get_annotations = True if self.annotation_fields and get_annotations else False 520 if get_annotations: 521 annotation_fields = self.annotation_fields.copy() 522 item_batch_size = 500 523 dataset_item_cache = [] 524 annotations_before = int(time.time()) 525 526 # Append a number to annotation labels if there's duplicate ones 527 annotation_labels = {} 528 for (annotation_field_id, annotation_field_items,) in annotation_fields.items(): 529 unique_label = annotation_field_items["label"] 530 counter = 1 531 while unique_label in annotation_labels.values(): 532 counter += 1 533 unique_label = f"{annotation_field_items['label']}_{counter}" 534 annotation_labels[annotation_field_id] = unique_label 535 536 # missing field strategy can be for all fields at once, or per field 537 # if it is per field, it is a dictionary with field names and their strategy 538 # if it is for all fields, it may be a callback, 'abort', or 'default' 539 default_strategy = "default" 540 if type(map_missing) is not dict: 541 default_strategy = map_missing 542 map_missing = {} 543 544 iterator = self._iterate_items if self.get_extension() != "zip" else self._iterate_archive_contents 545 546 # Loop through items 547 for i, item in enumerate(iterator(processor=processor, offset=offset, *args, **kwargs)): 548 # Save original to yield 549 original_item = item.copy() 550 551 # Map item 552 if item_mapper: 553 try: 554 mapped_item = own_processor.get_mapped_item(item) 555 except MapItemException as e: 556 if warn_unmappable: 557 self.warn_unmappable_item( 558 i, processor, e, warn_admins=unmapped_items is False 559 ) 560 561 unmapped_items += 1 562 if max_unmappable and unmapped_items > max_unmappable: 563 break 564 else: 565 continue 566 567 # check if fields have been marked as 'missing' in the 568 # underlying data, and treat according to the chosen strategy 569 if mapped_item.get_missing_fields(): 570 for missing_field in mapped_item.get_missing_fields(): 571 strategy = map_missing.get(missing_field, default_strategy) 572 573 if callable(strategy): 574 # delegate handling to a callback 575 mapped_item.data[missing_field] = strategy( 576 mapped_item.data, missing_field 577 ) 578 elif strategy == "abort": 579 # raise an exception to be handled at the processor level 580 raise MappedItemIncompleteException( 581 f"Cannot process item, field {missing_field} missing in source data." 582 ) 583 elif strategy == "default": 584 # use whatever was passed to the object constructor 585 mapped_item.data[missing_field] = mapped_item.data[ 586 missing_field 587 ].value 588 else: 589 raise ValueError( 590 "map_missing must be 'abort', 'default', or a callback." 591 ) 592 else: 593 mapped_item = original_item 594 595 # yield a DatasetItem, which is a dict with some special properties 596 dataset_item = DatasetItem( 597 mapper=item_mapper, 598 original=original_item, 599 mapped_object=mapped_item, 600 data_file=original_item["path"] if "path" in original_item and issubclass(type(original_item["path"]), os.PathLike) else None, 601 **( 602 mapped_item.get_item_data() 603 if type(mapped_item) is MappedItem 604 else mapped_item 605 ), 606 ) 607 608 # If we're getting annotations, yield in items batches so we don't need to get annotations per item. 609 if get_annotations: 610 dataset_item_cache.append(dataset_item) 611 612 # When we reach the batch limit or the end of the dataset, 613 # get the annotations for cached items and yield the entire thing. 614 if len(dataset_item_cache) >= item_batch_size or i == (self.num_rows - 1): 615 616 item_ids = [dataset_item.get("id") for dataset_item in dataset_item_cache] 617 618 # Dict with item ids for fast lookup 619 annotations_dict = collections.defaultdict(dict) 620 annotations = self.get_annotations_for_item(item_ids, before=annotations_before) 621 for item_annotation in annotations: 622 item_id = item_annotation.item_id 623 if item_annotation: 624 annotations_dict[item_id][item_annotation.field_id] = item_annotation.value 625 626 # Process each dataset item 627 for dataset_item in dataset_item_cache: 628 item_id = dataset_item.get("id") 629 item_annotations = annotations_dict.get(item_id, {}) 630 631 for annotation_field_id, annotation_field_items in annotation_fields.items(): 632 # Get annotation value 633 value = item_annotations.get(annotation_field_id, "") 634 635 # Convert list to string if needed 636 if isinstance(value, list): 637 value = ",".join(value) 638 elif value != "": 639 value = str(value) # Ensure string type 640 else: 641 value = "" 642 643 dataset_item[annotation_labels[annotation_field_id]] = value 644 645 yield dataset_item 646 647 dataset_item_cache = [] 648 649 else: 650 yield dataset_item 651 652 653 def sort_and_iterate_items( 654 self, sort="", reverse=False, chunk_size=50000, **kwargs 655 ) -> dict: 656 """ 657 Loop through items in a dataset, sorted by a given key. 658 659 This is a wrapper function for `iterate_items()` with the 660 added functionality of sorting a dataset. 661 662 :param sort: The item key that determines the sort order. 663 :param reverse: Whether to sort by largest values first. 664 :param chunk_size: How many items to write 665 666 :returns dict: Yields iterated post 667 """ 668 669 def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False): 670 """ 671 Sort items based on the given key and order. 672 673 :param items_to_sort: The items to sort 674 :param sort_key: The key to sort by 675 :param reverse: Whether to sort in reverse order 676 :return: Sorted items 677 """ 678 if reverse is False and (sort_key == "dataset-order" or sort_key == ""): 679 # Sort by dataset order 680 yield from items_to_sort 681 elif sort_key == "dataset-order" and reverse: 682 # Sort by dataset order in reverse 683 yield from reversed(list(items_to_sort)) 684 else: 685 # Sort on the basis of a column value 686 if not convert_sort_to_float: 687 yield from sorted( 688 items_to_sort, 689 key=lambda x: x.get(sort_key, ""), 690 reverse=reverse, 691 ) 692 else: 693 # Dataset fields can contain integers and empty strings. 694 # Since these cannot be compared, we will convert every 695 # empty string to 0. 696 yield from sorted( 697 items_to_sort, 698 key=lambda x: convert_to_float(x.get(sort_key, ""), force=True), 699 reverse=reverse, 700 ) 701 702 if self.num_rows < chunk_size: 703 try: 704 # First try to force-sort float values. If this doesn't work, it'll be alphabetical. 705 yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True) 706 except (TypeError, ValueError): 707 yield from sort_items( 708 self.iterate_items(**kwargs), 709 sort, 710 reverse, 711 convert_sort_to_float=False 712 ) 713 714 else: 715 # For large datasets, we will use chunk sorting 716 staging_area = self.get_staging_area() 717 buffer = [] 718 chunk_files = [] 719 convert_sort_to_float = True 720 fieldnames = self.get_columns() 721 722 def write_chunk(buffer, chunk_index): 723 """ 724 Write a chunk of data to a temporary file 725 726 :param buffer: The buffer containing the chunk of data 727 :param chunk_index: The index of the chunk 728 :return: The path to the temporary file 729 """ 730 temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv") 731 with temp_file.open("w", encoding="utf-8") as chunk_file: 732 writer = csv.DictWriter(chunk_file, fieldnames=fieldnames) 733 writer.writeheader() 734 writer.writerows(buffer) 735 return temp_file 736 737 # Divide the dataset into sorted chunks 738 for item in self.iterate_items(**kwargs): 739 buffer.append(item) 740 if len(buffer) >= chunk_size: 741 try: 742 buffer = list( 743 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 744 ) 745 except (TypeError, ValueError): 746 convert_sort_to_float = False 747 buffer = list( 748 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 749 ) 750 751 chunk_files.append(write_chunk(buffer, len(chunk_files))) 752 buffer.clear() 753 754 # Sort and write any remaining items in the buffer 755 if buffer: 756 buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float)) 757 chunk_files.append(write_chunk(buffer, len(chunk_files))) 758 buffer.clear() 759 760 # Merge sorted chunks into the final sorted file 761 sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv") 762 with sorted_file.open("w", encoding="utf-8") as outfile: 763 writer = csv.DictWriter(outfile, fieldnames=self.get_columns()) 764 writer.writeheader() 765 766 # Open all chunk files for reading 767 chunk_readers = [ 768 csv.DictReader(chunk.open("r", encoding="utf-8")) 769 for chunk in chunk_files 770 ] 771 heap = [] 772 773 # Initialize the heap with the first row from each chunk 774 for i, reader in enumerate(chunk_readers): 775 try: 776 row = next(reader) 777 if sort == "dataset-order" and reverse: 778 # Use a reverse index for "dataset-order" and reverse=True 779 sort_key = -i 780 elif convert_sort_to_float: 781 # Negate numeric keys for reverse sorting 782 sort_key = ( 783 -convert_to_float(row.get(sort, "")) 784 if reverse 785 else convert_to_float(row.get(sort, "")) 786 ) 787 else: 788 if reverse: 789 # For reverse string sorting, invert string comparison by creating a tuple 790 # with an inverted string - this makes Python's tuple comparison work in reverse 791 sort_key = ( 792 tuple(-ord(c) for c in row.get(sort, "")), 793 -i, 794 ) 795 else: 796 sort_key = (row.get(sort, ""), i) 797 heap.append((sort_key, i, row)) 798 except StopIteration: 799 pass 800 801 # Use a heap to merge sorted chunks 802 import heapq 803 804 heapq.heapify(heap) 805 while heap: 806 _, chunk_index, smallest_row = heapq.heappop(heap) 807 writer.writerow(smallest_row) 808 try: 809 next_row = next(chunk_readers[chunk_index]) 810 if sort == "dataset-order" and reverse: 811 # Use a reverse index for "dataset-order" and reverse=True 812 sort_key = -chunk_index 813 elif convert_sort_to_float: 814 sort_key = ( 815 -convert_to_float(next_row.get(sort, "")) 816 if reverse 817 else convert_to_float(next_row.get(sort, "")) 818 ) 819 else: 820 # Use the same inverted comparison for string values 821 if reverse: 822 sort_key = ( 823 tuple(-ord(c) for c in next_row.get(sort, "")), 824 -chunk_index, 825 ) 826 else: 827 sort_key = (next_row.get(sort, ""), chunk_index) 828 heapq.heappush(heap, (sort_key, chunk_index, next_row)) 829 except StopIteration: 830 pass 831 832 # Read the sorted file and yield each item 833 with sorted_file.open("r", encoding="utf-8") as infile: 834 reader = csv.DictReader(infile) 835 for item in reader: 836 yield item 837 838 # Remove the temporary files 839 if staging_area.is_dir(): 840 shutil.rmtree(staging_area) 841 842 def get_staging_area(self): 843 """ 844 Get path to a temporary folder in which files can be stored before 845 finishing 846 847 This folder must be created before use, but is guaranteed to not exist 848 yet. The folder may be used as a staging area for the dataset data 849 while it is being processed. 850 851 :return Path: Path to folder 852 """ 853 results_file = self.get_results_path() 854 855 results_dir_base = results_file.parent 856 results_dir = results_file.name.replace(".", "") + "-staging" 857 results_path = results_dir_base.joinpath(results_dir) 858 index = 1 859 while results_path.exists(): 860 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 861 index += 1 862 863 # create temporary folder 864 results_path.mkdir() 865 866 # Storing the staging area with the dataset so that it can be removed later 867 self.disposable_files.append(results_path) 868 869 return results_path 870 871 def remove_disposable_files(self): 872 """ 873 Remove any disposable files and folders, such as staging areas 874 875 Called from BasicProcessor after processing a dataset finishes. 876 """ 877 # Remove DataSet staging areas 878 if self.disposable_files: 879 for disposable_file in self.disposable_files: 880 if disposable_file.exists(): 881 shutil.rmtree(disposable_file) 882 883 def finish(self, num_rows=0): 884 """ 885 Declare the dataset finished 886 """ 887 if self.data["is_finished"]: 888 raise RuntimeError("Cannot finish a finished dataset again") 889 890 self.db.update( 891 "datasets", 892 where={"key": self.data["key"]}, 893 data={ 894 "is_finished": True, 895 "num_rows": num_rows, 896 "progress": 1.0, 897 "timestamp_finished": int(time.time()), 898 }, 899 ) 900 self.data["is_finished"] = True 901 self.data["num_rows"] = num_rows 902 903 def copy(self, shallow=True): 904 """ 905 Copies the dataset, making a new version with a unique key 906 907 908 :param bool shallow: Shallow copy: does not copy the result file, but 909 instead refers to the same file as the original dataset did 910 :return Dataset: Copied dataset 911 """ 912 parameters = self.parameters.copy() 913 914 # a key is partially based on the parameters. so by setting these extra 915 # attributes, we also ensure a unique key will be generated for the 916 # copy 917 # possibly todo: don't use time for uniqueness (but one shouldn't be 918 # copying a dataset multiple times per microsecond, that's not what 919 # this is for) 920 parameters["copied_from"] = self.key 921 parameters["copied_at"] = time.time() 922 923 copy = DataSet( 924 parameters=parameters, 925 db=self.db, 926 extension=self.result_file.split(".")[-1], 927 type=self.type, 928 modules=self.modules 929 ) 930 931 for field in self.data: 932 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 933 continue 934 copy.__setattr__(field, self.data[field]) 935 936 if shallow: 937 # use the same result file 938 copy.result_file = self.result_file 939 else: 940 # copy to new file with new key 941 shutil.copy(self.get_results_path(), copy.get_results_path()) 942 943 if self.is_finished(): 944 copy.finish(self.num_rows) 945 946 # make sure ownership is also copied 947 copy.copy_ownership_from(self) 948 949 return copy 950 951 def delete(self, commit=True, queue=None): 952 """ 953 Delete the dataset, and all its children 954 955 Deletes both database records and result files. Note that manipulating 956 a dataset object after it has been deleted is undefined behaviour. 957 958 :param bool commit: Commit SQL DELETE query? 959 """ 960 # first, recursively delete children 961 children = self.db.fetchall( 962 "SELECT * FROM datasets WHERE key_parent = %s", (self.key,) 963 ) 964 for child in children: 965 try: 966 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 967 child.delete(commit=commit) 968 except DataSetException: 969 # dataset already deleted - race condition? 970 pass 971 972 # delete any queued jobs for this dataset 973 try: 974 job = Job.get_by_remote_ID(self.key, self.db, self.type) 975 if job.is_claimed: 976 # tell API to stop any jobs running for this dataset 977 # level 2 = cancel job 978 # we're not interested in the result - if the API is available, 979 # it will do its thing, if it's not the backend is probably not 980 # running so the job also doesn't need to be interrupted 981 call_api( 982 "cancel-job", 983 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 984 False, 985 ) 986 987 # this deletes the job from the database 988 job.finish(True) 989 990 except JobNotFoundException: 991 pass 992 993 # delete this dataset's own annotations 994 self.db.delete("annotations", where={"dataset": self.key}, commit=commit) 995 # delete annotations that have been generated as part of this dataset 996 self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit) 997 # delete annotation fields on parent dataset(s) stemming from this dataset 998 for related_dataset in self.get_genealogy(update_cache=True): 999 field_deleted = False 1000 annotation_fields = related_dataset.annotation_fields 1001 if annotation_fields: 1002 for field_id in list(annotation_fields.keys()): 1003 if annotation_fields[field_id].get("from_dataset", "") == self.key: 1004 del annotation_fields[field_id] 1005 field_deleted = True 1006 if field_deleted: 1007 related_dataset.save_annotation_fields(annotation_fields) 1008 1009 # delete dataset from database 1010 self.db.delete("datasets", where={"key": self.key}, commit=commit) 1011 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 1012 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 1013 1014 # delete from drive 1015 try: 1016 if self.get_results_path().exists(): 1017 self.get_results_path().unlink() 1018 if self.get_results_path().with_suffix(".log").exists(): 1019 self.get_results_path().with_suffix(".log").unlink() 1020 if self.get_results_folder_path().exists(): 1021 shutil.rmtree(self.get_results_folder_path()) 1022 1023 except FileNotFoundError: 1024 # already deleted, apparently 1025 pass 1026 except PermissionError as e: 1027 self.db.log.error( 1028 f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}" 1029 ) 1030 1031 def update_children(self, **kwargs): 1032 """ 1033 Update an attribute for all child datasets 1034 1035 Can be used to e.g. change the owner, version, finished status for all 1036 datasets in a tree 1037 1038 :param kwargs: Parameters corresponding to known dataset attributes 1039 """ 1040 for child in self.get_children(update=True): 1041 for attr, value in kwargs.items(): 1042 child.__setattr__(attr, value) 1043 1044 child.update_children(**kwargs) 1045 1046 def is_finished(self): 1047 """ 1048 Check if dataset is finished 1049 :return bool: 1050 """ 1051 return bool(self.data["is_finished"]) 1052 1053 def is_rankable(self, multiple_items=True): 1054 """ 1055 Determine if a dataset is rankable 1056 1057 Rankable means that it is a CSV file with 'date' and 'value' columns 1058 as well as one or more item label columns 1059 1060 :param bool multiple_items: Consider datasets with multiple items per 1061 item (e.g. word_1, word_2, etc)? 1062 1063 :return bool: Whether the dataset is rankable or not 1064 """ 1065 if ( 1066 self.get_results_path().suffix != ".csv" 1067 or not self.get_results_path().exists() 1068 ): 1069 return False 1070 1071 column_options = {"date", "value", "item"} 1072 if multiple_items: 1073 column_options.add("word_1") 1074 1075 with self.get_results_path().open(encoding="utf-8") as infile: 1076 reader = csv.DictReader(infile) 1077 try: 1078 return len(set(reader.fieldnames) & column_options) >= 3 1079 except (TypeError, ValueError): 1080 return False 1081 1082 def is_accessible_by(self, username, role="owner"): 1083 """ 1084 Check if dataset has given user as owner 1085 1086 :param str|User username: Username to check for 1087 :return bool: 1088 """ 1089 if type(username) is not str: 1090 if hasattr(username, "get_id"): 1091 username = username.get_id() 1092 else: 1093 raise TypeError("User must be a str or User object") 1094 1095 # 'normal' owners 1096 if username in [ 1097 owner 1098 for owner, meta in self.owners.items() 1099 if (role is None or meta["role"] == role) 1100 ]: 1101 return True 1102 1103 # owners that are owner by being part of a tag 1104 if username in itertools.chain( 1105 *[ 1106 tagged_owners 1107 for tag, tagged_owners in self.tagged_owners.items() 1108 if (role is None or self.owners[f"tag:{tag}"]["role"] == role) 1109 ] 1110 ): 1111 return True 1112 1113 return False 1114 1115 def get_owners_users(self, role="owner"): 1116 """ 1117 Get list of dataset owners 1118 1119 This returns a list of *users* that are considered owners. Tags are 1120 transparently replaced with the users with that tag. 1121 1122 :param str|None role: Role to check for. If `None`, all owners are 1123 returned regardless of role. 1124 1125 :return set: De-duplicated owner list 1126 """ 1127 # 'normal' owners 1128 owners = [ 1129 owner 1130 for owner, meta in self.owners.items() 1131 if (role is None or meta["role"] == role) and not owner.startswith("tag:") 1132 ] 1133 1134 # owners that are owner by being part of a tag 1135 owners.extend( 1136 itertools.chain( 1137 *[ 1138 tagged_owners 1139 for tag, tagged_owners in self.tagged_owners.items() 1140 if role is None or self.owners[f"tag:{tag}"]["role"] == role 1141 ] 1142 ) 1143 ) 1144 1145 # de-duplicate before returning 1146 return set(owners) 1147 1148 def get_owners(self, role="owner"): 1149 """ 1150 Get list of dataset owners 1151 1152 This returns a list of all owners, and does not transparently resolve 1153 tags (like `get_owners_users` does). 1154 1155 :param str|None role: Role to check for. If `None`, all owners are 1156 returned regardless of role. 1157 1158 :return set: De-duplicated owner list 1159 """ 1160 return [ 1161 owner 1162 for owner, meta in self.owners.items() 1163 if (role is None or meta["role"] == role) 1164 ] 1165 1166 def add_owner(self, username, role="owner"): 1167 """ 1168 Set dataset owner 1169 1170 If the user is already an owner, but with a different role, the role is 1171 updated. If the user is already an owner with the same role, nothing happens. 1172 1173 :param str|User username: Username to set as owner 1174 :param str|None role: Role to add user with. 1175 """ 1176 if type(username) is not str: 1177 if hasattr(username, "get_id"): 1178 username = username.get_id() 1179 else: 1180 raise TypeError("User must be a str or User object") 1181 1182 if username not in self.owners: 1183 self.owners[username] = {"name": username, "key": self.key, "role": role} 1184 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 1185 1186 elif username in self.owners and self.owners[username]["role"] != role: 1187 self.db.update( 1188 "datasets_owners", 1189 data={"role": role}, 1190 where={"name": username, "key": self.key}, 1191 ) 1192 self.owners[username]["role"] = role 1193 1194 if username.startswith("tag:"): 1195 # this is a bit more complicated than just adding to the list of 1196 # owners, so do a full refresh 1197 self.refresh_owners() 1198 1199 # make sure children's owners remain in sync 1200 for child in self.get_children(update=True): 1201 child.add_owner(username, role) 1202 # not recursive, since we're calling it from recursive code! 1203 child.copy_ownership_from(self, recursive=False) 1204 1205 def remove_owner(self, username): 1206 """ 1207 Remove dataset owner 1208 1209 If no owner is set, the dataset is assigned to the anonymous user. 1210 If the user is not an owner, nothing happens. 1211 1212 :param str|User username: Username to set as owner 1213 """ 1214 if type(username) is not str: 1215 if hasattr(username, "get_id"): 1216 username = username.get_id() 1217 else: 1218 raise TypeError("User must be a str or User object") 1219 1220 if username in self.owners: 1221 del self.owners[username] 1222 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 1223 1224 if not self.owners: 1225 self.add_owner("anonymous") 1226 1227 if username in self.tagged_owners: 1228 del self.tagged_owners[username] 1229 1230 # make sure children's owners remain in sync 1231 for child in self.get_children(update=True): 1232 child.remove_owner(username) 1233 # not recursive, since we're calling it from recursive code! 1234 child.copy_ownership_from(self, recursive=False) 1235 1236 def refresh_owners(self): 1237 """ 1238 Update internal owner cache 1239 1240 This makes sure that the list of *users* and *tags* which can access the 1241 dataset is up to date. 1242 """ 1243 self.owners = { 1244 owner["name"]: owner 1245 for owner in self.db.fetchall( 1246 "SELECT * FROM datasets_owners WHERE key = %s", (self.key,) 1247 ) 1248 } 1249 1250 # determine which users (if any) are owners of the dataset by having a 1251 # tag that is listed as an owner 1252 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 1253 if owner_tags: 1254 tagged_owners = self.db.fetchall( 1255 "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,) 1256 ) 1257 self.tagged_owners = { 1258 owner_tag: [ 1259 user["name"] for user in tagged_owners if owner_tag in user["tags"] 1260 ] 1261 for owner_tag in owner_tags 1262 } 1263 else: 1264 self.tagged_owners = {} 1265 1266 def copy_ownership_from(self, dataset, recursive=True): 1267 """ 1268 Copy ownership 1269 1270 This is useful to e.g. make sure a dataset's ownership stays in sync 1271 with its parent 1272 1273 :param Dataset dataset: Parent to copy from 1274 :return: 1275 """ 1276 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 1277 1278 for role in ("owner", "viewer"): 1279 owners = dataset.get_owners(role=role) 1280 for owner in owners: 1281 self.db.insert( 1282 "datasets_owners", 1283 data={"key": self.key, "name": owner, "role": role}, 1284 commit=False, 1285 safe=True, 1286 ) 1287 1288 self.db.commit() 1289 if recursive: 1290 for child in self.get_children(update=True): 1291 child.copy_ownership_from(self, recursive=recursive) 1292 1293 def get_parameters(self): 1294 """ 1295 Get dataset parameters 1296 1297 The dataset parameters are stored as JSON in the database - parse them 1298 and return the resulting object 1299 1300 :return: Dataset parameters as originally stored 1301 """ 1302 try: 1303 return json.loads(self.data["parameters"]) 1304 except json.JSONDecodeError: 1305 return {} 1306 1307 def get_columns(self): 1308 """ 1309 Returns the dataset columns. 1310 1311 Useful for processor input forms. Can deal with both CSV and NDJSON 1312 files, the latter only if a `map_item` function is available in the 1313 processor that generated it. While in other cases one could use the 1314 keys of the JSON object, this is not always possible in follow-up code 1315 that uses the 'column' names, so for consistency this function acts as 1316 if no column can be parsed if no `map_item` function exists. 1317 1318 :return list: List of dataset columns; empty list if unable to parse 1319 """ 1320 if not self.get_results_path().exists(): 1321 # no file to get columns from 1322 return [] 1323 1324 if (self.get_results_path().suffix.lower() == ".csv") or ( 1325 self.get_results_path().suffix.lower() == ".ndjson" 1326 and self.get_own_processor() is not None 1327 and self.get_own_processor().map_item_method_available(dataset=self) 1328 ): 1329 items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100) 1330 try: 1331 keys = list(next(items).keys()) 1332 if self.annotation_fields: 1333 for annotation_field in self.annotation_fields.values(): 1334 annotation_column = annotation_field["label"] 1335 label_count = 1 1336 while annotation_column in keys: 1337 label_count += 1 1338 annotation_column = ( 1339 f"{annotation_field['label']}_{label_count}" 1340 ) 1341 keys.append(annotation_column) 1342 columns = keys 1343 except (StopIteration, NotImplementedError): 1344 # No items or otherwise unable to iterate 1345 columns = [] 1346 finally: 1347 del items 1348 else: 1349 # Filetype not CSV or an NDJSON with `map_item` 1350 columns = [] 1351 1352 return columns 1353 1354 def update_label(self, label): 1355 """ 1356 Update label for this dataset 1357 1358 :param str label: New label 1359 :return str: The new label, as returned by get_label 1360 """ 1361 self.parameters["label"] = label 1362 1363 self.db.update( 1364 "datasets", 1365 data={"parameters": json.dumps(self.parameters)}, 1366 where={"key": self.key}, 1367 ) 1368 return self.get_label() 1369 1370 def get_label(self, parameters=None, default="Query"): 1371 """ 1372 Generate a readable label for the dataset 1373 1374 :param dict parameters: Parameters of the dataset 1375 :param str default: Label to use if it cannot be inferred from the 1376 parameters 1377 1378 :return str: Label 1379 """ 1380 if not parameters: 1381 parameters = self.parameters 1382 1383 if parameters.get("label"): 1384 return parameters["label"] 1385 elif parameters.get("body_query") and parameters["body_query"] != "empty": 1386 return parameters["body_query"] 1387 elif parameters.get("body_match") and parameters["body_match"] != "empty": 1388 return parameters["body_match"] 1389 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 1390 return parameters["subject_query"] 1391 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 1392 return parameters["subject_match"] 1393 elif parameters.get("query"): 1394 label = parameters["query"] 1395 # Some legacy datasets have lists as query data 1396 if isinstance(label, list): 1397 label = ", ".join(label) 1398 1399 label = label if len(label) < 30 else label[:25] + "..." 1400 label = label.strip().replace("\n", ", ") 1401 return label 1402 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 1403 return "Flag: %s" % parameters["country_flag"] 1404 elif parameters.get("country_name") and parameters["country_name"] != "all": 1405 return "Country: %s" % parameters["country_name"] 1406 elif parameters.get("filename"): 1407 return parameters["filename"] 1408 elif parameters.get("board") and "datasource" in parameters: 1409 return parameters["datasource"] + "/" + parameters["board"] 1410 elif ( 1411 "datasource" in parameters 1412 and parameters["datasource"] in self.modules.datasources 1413 ): 1414 return ( 1415 self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 1416 ) 1417 else: 1418 return default 1419 1420 def change_datasource(self, datasource): 1421 """ 1422 Change the datasource type for this dataset 1423 1424 :param str label: New datasource type 1425 :return str: The new datasource type 1426 """ 1427 1428 self.parameters["datasource"] = datasource 1429 1430 self.db.update( 1431 "datasets", 1432 data={"parameters": json.dumps(self.parameters)}, 1433 where={"key": self.key}, 1434 ) 1435 return datasource 1436 1437 def reserve_result_file(self, parameters=None, extension="csv"): 1438 """ 1439 Generate a unique path to the results file for this dataset 1440 1441 This generates a file name for the data file of this dataset, and makes sure 1442 no file exists or will exist at that location other than the file we 1443 expect (i.e. the data for this particular dataset). 1444 1445 :param str extension: File extension, "csv" by default 1446 :param parameters: Dataset parameters 1447 :return bool: Whether the file path was successfully reserved 1448 """ 1449 if self.data["is_finished"]: 1450 raise RuntimeError("Cannot reserve results file for a finished dataset") 1451 1452 # Use 'random' for random post queries 1453 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 1454 file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"] 1455 # Use country code for country flag queries 1456 elif "country_flag" in parameters and parameters["country_flag"] != "all": 1457 file = ( 1458 "countryflag-" 1459 + str(parameters["country_flag"]) 1460 + "-" 1461 + self.data["key"] 1462 ) 1463 # Use the query string for all other queries 1464 else: 1465 query_bit = self.data["query"].replace(" ", "-").lower() 1466 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 1467 query_bit = query_bit[:100] # Crop to avoid OSError 1468 file = query_bit + "-" + self.data["key"] 1469 file = re.sub(r"[-]+", "-", file) 1470 1471 self.data["result_file"] = file + "." + extension.lower() 1472 index = 1 1473 while self.get_results_path().is_file(): 1474 self.data["result_file"] = file + "-" + str(index) + "." + extension.lower() 1475 index += 1 1476 1477 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 1478 data={"result_file": self.data["result_file"]}) 1479 return updated > 0 1480 1481 def get_key(self, query, parameters, parent="", time_offset=0): 1482 """ 1483 Generate a unique key for this dataset that can be used to identify it 1484 1485 The key is a hash of a combination of the query string and parameters. 1486 You never need to call this, really: it's used internally. 1487 1488 :param str query: Query string 1489 :param parameters: Dataset parameters 1490 :param parent: Parent dataset's key (if applicable) 1491 :param time_offset: Offset to add to the time component of the dataset 1492 key. This can be used to ensure a unique key even if the parameters and 1493 timing is otherwise identical to an existing dataset's 1494 1495 :return str: Dataset key 1496 """ 1497 # Return a hash based on parameters 1498 # we're going to use the hash of the parameters to uniquely identify 1499 # the dataset, so make sure it's always in the same order, or we might 1500 # end up creating multiple keys for the same dataset if python 1501 # decides to return the dict in a different order 1502 param_key = collections.OrderedDict() 1503 for key in sorted(parameters): 1504 param_key[key] = parameters[key] 1505 1506 # we additionally use the current time as a salt - this should usually 1507 # ensure a unique key for the dataset. if for some reason there is a 1508 # hash collision 1509 param_key["_salt"] = int(time.time()) + time_offset 1510 1511 parent_key = str(parent) if parent else "" 1512 plain_key = repr(param_key) + str(query) + parent_key 1513 hashed_key = hash_to_md5(plain_key) 1514 1515 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1516 # key exists, generate a new one 1517 return self.get_key( 1518 query, parameters, parent, time_offset=random.randint(1, 10) 1519 ) 1520 else: 1521 return hashed_key 1522 1523 def set_key(self, key): 1524 """ 1525 Change dataset key 1526 1527 In principe, keys should never be changed. But there are rare cases 1528 where it is useful to do so, in particular when importing a dataset 1529 from another 4CAT instance; in that case it makes sense to try and 1530 ensure that the key is the same as it was before. This function sets 1531 the dataset key and updates any dataset references to it. 1532 1533 :param str key: Key to set 1534 :return str: Key that was set. If the desired key already exists, the 1535 original key is kept. 1536 """ 1537 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1538 if key_exists or not key: 1539 return self.key 1540 1541 old_key = self.key 1542 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1543 1544 # update references 1545 self.db.update( 1546 "datasets", data={"key_parent": key}, where={"key_parent": old_key} 1547 ) 1548 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1549 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1550 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1551 1552 # for good measure 1553 self.db.commit() 1554 self.key = key 1555 1556 return self.key 1557 1558 def get_status(self): 1559 """ 1560 Get Dataset status 1561 1562 :return string: Dataset status 1563 """ 1564 return self.data["status"] 1565 1566 def update_status(self, status, is_final=False): 1567 """ 1568 Update dataset status 1569 1570 The status is a string that may be displayed to a user to keep them 1571 updated and informed about the progress of a dataset. No memory is kept 1572 of earlier dataset statuses; the current status is overwritten when 1573 updated. 1574 1575 Statuses are also written to the dataset log file. 1576 1577 :param string status: Dataset status 1578 :param bool is_final: If this is `True`, subsequent calls to this 1579 method while the object is instantiated will not update the dataset 1580 status. 1581 :return bool: Status update successful? 1582 """ 1583 if self.no_status_updates: 1584 return 1585 1586 # for presets, copy the updated status to the preset(s) this is part of 1587 if self.preset_parent is None: 1588 self.preset_parent = [ 1589 parent 1590 for parent in self.get_genealogy() 1591 if parent.type.find("preset-") == 0 and parent.key != self.key 1592 ][:1] 1593 1594 if self.preset_parent: 1595 for preset_parent in self.preset_parent: 1596 if not preset_parent.is_finished(): 1597 preset_parent.update_status(status) 1598 1599 self.data["status"] = status 1600 updated = self.db.update( 1601 "datasets", where={"key": self.data["key"]}, data={"status": status} 1602 ) 1603 1604 if is_final: 1605 self.no_status_updates = True 1606 1607 self.log(status) 1608 1609 return updated > 0 1610 1611 def update_progress(self, progress): 1612 """ 1613 Update dataset progress 1614 1615 The progress can be used to indicate to a user how close the dataset 1616 is to completion. 1617 1618 :param float progress: Between 0 and 1. 1619 :return: 1620 """ 1621 progress = min(1, max(0, progress)) # clamp 1622 if type(progress) is int: 1623 progress = float(progress) 1624 1625 self.data["progress"] = progress 1626 updated = self.db.update( 1627 "datasets", where={"key": self.data["key"]}, data={"progress": progress} 1628 ) 1629 return updated > 0 1630 1631 def get_progress(self): 1632 """ 1633 Get dataset progress 1634 1635 :return float: Progress, between 0 and 1 1636 """ 1637 return self.data["progress"] 1638 1639 def finish_with_error(self, error): 1640 """ 1641 Set error as final status, and finish with 0 results 1642 1643 This is a convenience function to avoid having to repeat 1644 "update_status" and "finish" a lot. 1645 1646 :param str error: Error message for final dataset status. 1647 :return: 1648 """ 1649 self.update_status(error, is_final=True) 1650 self.finish(0) 1651 1652 return None 1653 1654 def update_version(self, version): 1655 """ 1656 Update software version used for this dataset 1657 1658 This can be used to verify the code that was used to process this dataset. 1659 1660 :param string version: Version identifier 1661 :return bool: Update successul? 1662 """ 1663 try: 1664 # this fails if the processor type is unknown 1665 # edge case, but let's not crash... 1666 processor_path = self.modules.processors.get(self.data["type"]).filepath 1667 except AttributeError: 1668 processor_path = "" 1669 1670 updated = self.db.update( 1671 "datasets", 1672 where={"key": self.data["key"]}, 1673 data={ 1674 "software_version": version[0], 1675 "software_source": version[1], 1676 "software_file": processor_path, 1677 }, 1678 ) 1679 1680 return updated > 0 1681 1682 def delete_parameter(self, parameter, instant=True): 1683 """ 1684 Delete a parameter from the dataset metadata 1685 1686 :param string parameter: Parameter to delete 1687 :param bool instant: Also delete parameters in this instance object? 1688 :return bool: Update successul? 1689 """ 1690 parameters = self.parameters.copy() 1691 if parameter in parameters: 1692 del parameters[parameter] 1693 else: 1694 return False 1695 1696 updated = self.db.update( 1697 "datasets", 1698 where={"key": self.data["key"]}, 1699 data={"parameters": json.dumps(parameters)}, 1700 ) 1701 1702 if instant: 1703 self.parameters = parameters 1704 1705 return updated > 0 1706 1707 def get_version_url(self, file): 1708 """ 1709 Get a versioned github URL for the version this dataset was processed with 1710 1711 :param file: File to link within the repository 1712 :return: URL, or an empty string 1713 """ 1714 if not self.data["software_source"]: 1715 return "" 1716 1717 filepath = self.data.get("software_file", "") 1718 if filepath.startswith("/config/extensions/"): 1719 # go to root of extension 1720 filepath = "/" + "/".join(filepath.split("/")[3:]) 1721 1722 return ( 1723 self.data["software_source"] 1724 + "/blob/" 1725 + self.data["software_version"] 1726 + filepath 1727 ) 1728 1729 def top_parent(self): 1730 """ 1731 Get root dataset 1732 1733 Traverses the tree of datasets this one is part of until it finds one 1734 with no source_dataset dataset, then returns that dataset. 1735 1736 :return Dataset: Parent dataset 1737 """ 1738 genealogy = self.get_genealogy() 1739 return genealogy[0] 1740 1741 def get_genealogy(self, update_cache=False): 1742 """ 1743 Get genealogy of this dataset 1744 1745 Creates a list of DataSet objects, with the first one being the 1746 'top' dataset, and each subsequent one being a child of the previous 1747 one, ending with the current dataset. 1748 1749 :param bool update_cache: Update the cached genealogy if True, else return cached value 1750 :return list: Dataset genealogy, oldest dataset first 1751 """ 1752 if not self._genealogy or update_cache: 1753 key_parent = self.key_parent 1754 genealogy = [] 1755 1756 while key_parent: 1757 try: 1758 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1759 except DataSetException: 1760 break 1761 1762 genealogy.append(parent) 1763 if parent.key_parent: 1764 key_parent = parent.key_parent 1765 else: 1766 break 1767 1768 genealogy.reverse() 1769 1770 # add self to the end 1771 genealogy.append(self) 1772 # cache the result 1773 self._genealogy = genealogy 1774 1775 # return a copy to prevent external modification 1776 return list(self._genealogy) 1777 1778 def get_children(self, update=False): 1779 """ 1780 Get children of this dataset 1781 1782 :param bool update: Update the list of children from database if True, else return cached value 1783 :return list: List of child datasets 1784 """ 1785 if self._children is not None and not update: 1786 return self._children 1787 1788 analyses = self.db.fetchall( 1789 "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", 1790 (self.key,), 1791 ) 1792 self._children = [ 1793 DataSet(data=analysis, db=self.db, modules=self.modules) 1794 for analysis in analyses 1795 ] 1796 return self._children 1797 1798 def get_all_children(self, recursive=True, update=True): 1799 """ 1800 Get all children of this dataset 1801 1802 Results are returned as a non-hierarchical list, i.e. the result does 1803 not reflect the actual dataset hierarchy (but all datasets in the 1804 result will have the original dataset as an ancestor somewhere) 1805 1806 :return list: List of DataSets 1807 """ 1808 children = self.get_children(update=update) 1809 results = children.copy() 1810 if recursive: 1811 for child in children: 1812 results += child.get_all_children(recursive=recursive, update=update) 1813 1814 return results 1815 1816 def nearest(self, type_filter): 1817 """ 1818 Return nearest dataset that matches the given type 1819 1820 Starting with this dataset, traverse the hierarchy upwards and return 1821 whichever dataset matches the given type. 1822 1823 :param str type_filter: Type filter. Can contain wildcards and is matched 1824 using `fnmatch.fnmatch`. 1825 :return: Earliest matching dataset, or `None` if none match. 1826 """ 1827 genealogy = self.get_genealogy() 1828 for dataset in reversed(genealogy): 1829 if fnmatch.fnmatch(dataset.type, type_filter): 1830 return dataset 1831 1832 return None 1833 1834 def get_breadcrumbs(self): 1835 """ 1836 Get breadcrumbs navlink for use in permalinks 1837 1838 Returns a string representing this dataset's genealogy that may be used 1839 to uniquely identify it. 1840 1841 :return str: Nav link 1842 """ 1843 if not self.key_parent: 1844 return self.key 1845 1846 genealogy = self.get_genealogy() 1847 return ",".join([d.key for d in genealogy]) 1848 1849 def get_compatible_processors(self, config=None): 1850 """ 1851 Get list of processors compatible with this dataset 1852 1853 Checks whether this dataset type is one that is listed as being accepted 1854 by the processor, for each known type: if the processor does not 1855 specify accepted types (via the `is_compatible_with` method), it is 1856 assumed it accepts any top-level datasets 1857 1858 :param ConfigManager|None config: Configuration reader to determine 1859 compatibility through. This may not be the same reader the dataset was 1860 instantiated with, e.g. when checking whether some other user should 1861 be able to run processors on this dataset. 1862 :return dict: Compatible processors, `name => class` mapping 1863 """ 1864 processors = self.modules.processors 1865 1866 available = {} 1867 for processor_type, processor in processors.items(): 1868 if processor.is_from_collector(): 1869 continue 1870 1871 own_processor = self.get_own_processor() 1872 if own_processor and own_processor.exclude_followup_processors( 1873 processor_type 1874 ): 1875 continue 1876 1877 # consider a processor compatible if its is_compatible_with 1878 # method returns True *or* if it has no explicit compatibility 1879 # check and this dataset is top-level (i.e. has no parent) 1880 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1881 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)): 1882 available[processor_type] = processor 1883 1884 return available 1885 1886 def get_place_in_queue(self, update=False): 1887 """ 1888 Determine dataset's position in queue 1889 1890 If the dataset is already finished, the position is -1. Else, the 1891 position is the number of datasets to be completed before this one will 1892 be processed. A position of 0 would mean that the dataset is currently 1893 being executed, or that the backend is not running. 1894 1895 :param bool update: Update the queue position from database if True, else return cached value 1896 :return int: Queue position 1897 """ 1898 if self.is_finished() or not self.data.get("job"): 1899 self._queue_position = -1 1900 return self._queue_position 1901 elif not update and self._queue_position is not None: 1902 # Use cached value 1903 return self._queue_position 1904 else: 1905 # Collect queue position from database via the job 1906 try: 1907 job = Job.get_by_ID(self.data["job"], self.db) 1908 self._queue_position = job.get_place_in_queue() 1909 except JobNotFoundException: 1910 self._queue_position = -1 1911 1912 return self._queue_position 1913 1914 def get_own_processor(self): 1915 """ 1916 Get the processor class that produced this dataset 1917 1918 :return: Processor class, or `None` if not available. 1919 """ 1920 processor_type = self.parameters.get("type", self.data.get("type")) 1921 1922 return self.modules.processors.get(processor_type) 1923 1924 def get_available_processors(self, config=None, exclude_hidden=False): 1925 """ 1926 Get list of processors that may be run for this dataset 1927 1928 Returns all compatible processors except for those that are already 1929 queued or finished and have no options. Processors that have been 1930 run but have options are included so they may be run again with a 1931 different configuration 1932 1933 :param ConfigManager|None config: Configuration reader to determine 1934 compatibility through. This may not be the same reader the dataset was 1935 instantiated with, e.g. when checking whether some other user should 1936 be able to run processors on this dataset. 1937 :param bool exclude_hidden: Exclude processors that should be displayed 1938 in the UI? If `False`, all processors are returned. 1939 1940 :return dict: Available processors, `name => properties` mapping 1941 """ 1942 if self.available_processors: 1943 # Update to reflect exclude_hidden parameter which may be different from last call 1944 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1945 return { 1946 processor_type: processor 1947 for processor_type, processor in self.available_processors.items() 1948 if not exclude_hidden or not processor.is_hidden 1949 } 1950 1951 processors = self.get_compatible_processors(config=config) 1952 1953 for analysis in self.get_children(update=True): 1954 if analysis.type not in processors: 1955 continue 1956 1957 if not processors[analysis.type].get_options(config=config): 1958 # No variable options; this processor has been run so remove 1959 del processors[analysis.type] 1960 continue 1961 1962 if exclude_hidden and processors[analysis.type].is_hidden: 1963 del processors[analysis.type] 1964 1965 self.available_processors = processors 1966 return processors 1967 1968 def link_job(self, job): 1969 """ 1970 Link this dataset to a job ID 1971 1972 Updates the dataset data to include a reference to the job that will be 1973 executing (or has already executed) this job. 1974 1975 Note that if no job can be found for this dataset, this method silently 1976 fails. 1977 1978 :param Job job: The job that will run this dataset 1979 1980 :todo: If the job column ever gets used, make sure it always contains 1981 a valid value, rather than silently failing this method. 1982 """ 1983 if type(job) is not Job: 1984 raise TypeError("link_job requires a Job object as its argument") 1985 1986 if "id" not in job.data: 1987 try: 1988 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1989 except JobNotFoundException: 1990 return 1991 1992 self.db.update( 1993 "datasets", where={"key": self.key}, data={"job": job.data["id"]} 1994 ) 1995 1996 def link_parent(self, key_parent): 1997 """ 1998 Set source_dataset key for this dataset 1999 2000 :param key_parent: Parent key. Not checked for validity 2001 """ 2002 self.db.update( 2003 "datasets", where={"key": self.key}, data={"key_parent": key_parent} 2004 ) 2005 # reset caches 2006 self.data["key_parent"] = key_parent 2007 self._genealogy = None 2008 2009 def get_parent(self): 2010 """ 2011 Get parent dataset 2012 2013 :return DataSet: Parent dataset, or `None` if not applicable 2014 """ 2015 return ( 2016 DataSet(key=self.key_parent, db=self.db, modules=self.modules) 2017 if self.key_parent 2018 else None 2019 ) 2020 2021 def detach(self): 2022 """ 2023 Makes the datasets standalone, i.e. not having any source_dataset dataset 2024 """ 2025 self.link_parent("") 2026 2027 def is_dataset(self): 2028 """ 2029 Easy way to confirm this is a dataset. 2030 Used for checking processor and dataset compatibility, 2031 which needs to handle both processors and datasets. 2032 """ 2033 return True 2034 2035 def is_top_dataset(self): 2036 """ 2037 Easy way to confirm this is a top dataset. 2038 Used for checking processor and dataset compatibility, 2039 which needs to handle both processors and datasets. 2040 """ 2041 if self.key_parent: 2042 return False 2043 return True 2044 2045 def is_expiring(self, config): 2046 """ 2047 Determine if dataset is set to expire 2048 2049 Similar to `is_expired`, but checks if the dataset will be deleted in 2050 the future, not if it should be deleted right now. 2051 2052 :param ConfigManager config: Configuration reader (context-aware) 2053 :return bool|int: `False`, or the expiration date as a Unix timestamp. 2054 """ 2055 # has someone opted out of deleting this? 2056 if self.parameters.get("keep"): 2057 return False 2058 2059 # is this dataset explicitly marked as expiring after a certain time? 2060 if self.parameters.get("expires-after"): 2061 return self.parameters.get("expires-after") 2062 2063 # is the data source configured to have its datasets expire? 2064 expiration = config.get("datasources.expiration", {}) 2065 if not expiration.get(self.parameters.get("datasource")): 2066 return False 2067 2068 # is there a timeout for this data source? 2069 if expiration.get(self.parameters.get("datasource")).get("timeout"): 2070 return self.timestamp + expiration.get( 2071 self.parameters.get("datasource") 2072 ).get("timeout") 2073 2074 return False 2075 2076 def is_expired(self, config): 2077 """ 2078 Determine if dataset should be deleted 2079 2080 Datasets can be set to expire, but when they should be deleted depends 2081 on a number of factor. This checks them all. 2082 2083 :param ConfigManager config: Configuration reader (context-aware) 2084 :return bool: 2085 """ 2086 # has someone opted out of deleting this? 2087 if not self.is_expiring(config): 2088 return False 2089 2090 # is this dataset explicitly marked as expiring after a certain time? 2091 future = ( 2092 time.time() + 3600 2093 ) # ensure we don't delete datasets with invalid expiration times 2094 if ( 2095 self.parameters.get("expires-after") 2096 and convert_to_int(self.parameters["expires-after"], future) < time.time() 2097 ): 2098 return True 2099 2100 # is the data source configured to have its datasets expire? 2101 expiration = config.get("datasources.expiration", {}) 2102 if not expiration.get(self.parameters.get("datasource")): 2103 return False 2104 2105 # is the dataset older than the set timeout? 2106 if expiration.get(self.parameters.get("datasource")).get("timeout"): 2107 return ( 2108 self.timestamp 2109 + expiration[self.parameters.get("datasource")]["timeout"] 2110 < time.time() 2111 ) 2112 2113 return False 2114 2115 def is_from_collector(self): 2116 """ 2117 Check if this dataset was made by a processor that collects data, i.e. 2118 a search or import worker. 2119 2120 :return bool: 2121 """ 2122 return self.type.endswith("-search") or self.type.endswith("-import") 2123 2124 def get_extension(self): 2125 """ 2126 Gets the file extension this dataset produces. 2127 Also checks whether the results file exists. 2128 Used for checking processor and dataset compatibility. 2129 2130 :return str extension: Extension, e.g. `csv` 2131 """ 2132 if self.get_results_path().exists(): 2133 return self.get_results_path().suffix[1:] 2134 2135 return False 2136 2137 def is_filter(self): 2138 """ 2139 Check whether a dataset is a filter dataset. 2140 2141 :return bool: True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown). 2142 """ 2143 own_processor = self.get_own_processor() 2144 if own_processor is None: 2145 # Deprecated datasets do not have a processor 2146 return None 2147 return own_processor.is_filter() 2148 2149 def get_media_type(self): 2150 """ 2151 Gets the media type of the dataset file. 2152 2153 :return str: media type, e.g., "text" 2154 """ 2155 own_processor = self.get_own_processor() 2156 if hasattr(self, "media_type"): 2157 # media type can be defined explicitly in the dataset; this is the priority 2158 return self.media_type 2159 elif own_processor is not None: 2160 # or media type can be defined in the processor 2161 # some processors can set different media types for different datasets (e.g., import_media) 2162 if hasattr(own_processor, "media_type"): 2163 return own_processor.media_type 2164 2165 # Default to text 2166 return self.parameters.get("media_type", "text") 2167 2168 def get_metadata(self): 2169 """ 2170 Get dataset metadata 2171 2172 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 2173 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 2174 update its database (and ensure compatibility with the exporting version of 4CAT). 2175 """ 2176 metadata = self.db.fetchone( 2177 "SELECT * FROM datasets WHERE key = %s", (self.key,) 2178 ) 2179 2180 # get 4CAT version (presumably to ensure export is compatible with import) 2181 metadata["current_4CAT_version"] = get_software_version() 2182 return metadata 2183 2184 def get_result_url(self): 2185 """ 2186 Gets the 4CAT frontend URL of a dataset file. 2187 2188 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 2189 SERVER_HTTPS) plus hardcoded '/result/'. 2190 TODO: create more dynamic method of obtaining url. 2191 """ 2192 filename = self.get_results_path().name 2193 2194 # we cheat a little here by using the modules' config reader, but these 2195 # will never be context-dependent values anyway 2196 url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \ 2197 self.modules.config.get("flask.server_name") + '/result/' + filename 2198 return url_to_file 2199 2200 def warn_unmappable_item( 2201 self, item_count, processor=None, error_message=None, warn_admins=True 2202 ): 2203 """ 2204 Log an item that is unable to be mapped and warn administrators. 2205 2206 :param int item_count: Item index 2207 :param Processor processor: Processor calling function8 2208 """ 2209 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 2210 2211 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 2212 closest_dataset = ( 2213 processor.dataset 2214 if processor is not None and processor.dataset is not None 2215 else self 2216 ) 2217 # Log error to dataset log 2218 closest_dataset.log(dataset_error_message) 2219 2220 if warn_admins: 2221 if processor is not None: 2222 processor.log.warning( 2223 f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}." 2224 ) 2225 elif hasattr(self.db, "log"): 2226 # borrow the database's log handler 2227 self.db.log.warning( 2228 f"Unable to map item all items for dataset {closest_dataset.key}." 2229 ) 2230 else: 2231 # No other log available 2232 raise DataSetException( 2233 f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn" 2234 ) 2235 2236 # Annotation functions (most of it is handled in Annotations) 2237 def has_annotations(self) -> bool: 2238 """ 2239 Whether this dataset has annotations 2240 """ 2241 2242 annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,)) 2243 2244 return True if annotation else False 2245 2246 def num_annotations(self) -> int: 2247 """ 2248 Get the amount of annotations 2249 """ 2250 return self.db.fetchone( 2251 "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,) 2252 )["count"] 2253 2254 def get_annotation(self, data: dict): 2255 """ 2256 Retrieves a specific annotation if it exists. 2257 2258 :param data: A dictionary with which to get the annotations from. 2259 To get specific annotations, include either an `id` field or 2260 `field_id` and `item_id` fields. 2261 2262 return Annotation: Annotation object. 2263 """ 2264 2265 if "id" not in data or ("field_id" not in data and "item_id" not in data): 2266 return None 2267 2268 if "dataset" not in data: 2269 data["dataset"] = self.key 2270 2271 return Annotation(data=data, db=self.db) 2272 2273 def get_annotations(self) -> list: 2274 """ 2275 Retrieves all annotations for this dataset. 2276 2277 return list: List of Annotation objects. 2278 """ 2279 2280 return Annotation.get_annotations_for_dataset(self.db, self.key) 2281 2282 def get_annotations_for_item(self, item_id: str | list, before=0) -> list: 2283 """ 2284 Retrieves all annotations from this dataset for a specific item (e.g. social media post). 2285 :param str item_id: The ID of the annotation item 2286 :param int before: The upper timestamp range for annotations. 2287 """ 2288 return Annotation.get_annotations_for_dataset( 2289 self.db, self.key, item_id=item_id, before=before 2290 ) 2291 2292 def has_annotation_fields(self) -> bool: 2293 """ 2294 Returns True if there's annotation fields saved tot the dataset table 2295 Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.). 2296 """ 2297 2298 return True if self.annotation_fields else False 2299 2300 def get_annotation_field_labels(self) -> list: 2301 """ 2302 Retrieves the saved annotation field labels for this dataset. 2303 These are stored in the annotations table. 2304 2305 :return list: List of annotation field labels. 2306 """ 2307 2308 annotation_fields = self.annotation_fields 2309 2310 if not annotation_fields: 2311 return [] 2312 2313 labels = [v["label"] for v in annotation_fields.values()] 2314 2315 return labels 2316 2317 def save_annotations(self, annotations: list) -> int: 2318 """ 2319 Takes a list of annotations and saves them to the annotations table. 2320 If a field is not yet present in the `annotation_fields` column in 2321 the datasets table, it also adds it there. 2322 2323 :param list annotations: List of dictionaries with annotation items. Must have `item_id`, `field_id`, 2324 and `label`. 2325 `item_id` is for the specific item being annotated (e.g. a social media post) 2326 `field_id` refers to the annotation field. 2327 `label` is a human-readable description of this annotation. 2328 E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", 2329 "value": "Yes"}] 2330 2331 :returns int: How many annotations were saved. 2332 2333 """ 2334 2335 if not annotations: 2336 return 0 2337 2338 count = 0 2339 annotation_fields = self.annotation_fields 2340 2341 # Add some dataset data to annotations, if not present 2342 for annotation_data in annotations: 2343 # Check if the required fields are present 2344 if not annotation_data.get("item_id"): 2345 raise AnnotationException( 2346 "Can't save annotations; annotation must have an `item_id` referencing " 2347 "the item it annotated, got %s" % annotation_data 2348 ) 2349 if not annotation_data.get("field_id"): 2350 raise AnnotationException( 2351 "Can't save annotations; annotation must have a `field_id` field, " 2352 "got %s" % annotation_data 2353 ) 2354 if not annotation_data.get("label") or not isinstance( 2355 annotation_data["label"], str 2356 ): 2357 raise AnnotationException( 2358 "Can't save annotations; annotation must have a `label` field, " 2359 "got %s" % annotation_data 2360 ) 2361 2362 # Set dataset key 2363 if not annotation_data.get("dataset"): 2364 annotation_data["dataset"] = self.key 2365 2366 # Set default author to this dataset owner 2367 # If this annotation is made by a processor, it will have the processor name 2368 if not annotation_data.get("author"): 2369 annotation_data["author"] = self.get_owners()[0] 2370 2371 # Create Annotation object, which also saves it to the database 2372 # If this dataset/item_id/field_id combination already exists, this retrieves the 2373 # existing data and updates it with new values. 2374 Annotation(data=annotation_data, db=self.db) 2375 count += 1 2376 2377 # Save annotation fields if things changed 2378 if annotation_fields != self.annotation_fields: 2379 self.save_annotation_fields(annotation_fields) 2380 2381 return count 2382 2383 def save_annotation_fields(self, new_fields: dict, add=False) -> int: 2384 """ 2385 Save annotation field data to the datasets table (in the `annotation_fields` column). 2386 If changes to the annotation fields affect existing annotations, 2387 this function will also call `update_annotations_via_fields()` to change them. 2388 2389 :param dict new_fields: New annotation fields, with a field ID as key. 2390 2391 :param bool add: Whether we're merely adding new fields 2392 or replacing the whole batch. If add is False, 2393 `new_fields` should contain all fields. 2394 2395 :return int: The number of annotation fields saved. 2396 2397 """ 2398 2399 # Get existing annotation fields to see if stuff changed. 2400 old_fields = self.annotation_fields 2401 changes = False 2402 2403 # Annotation field must be valid JSON. 2404 try: 2405 json.dumps(new_fields) 2406 except ValueError: 2407 raise AnnotationException( 2408 "Can't save annotation fields: not valid JSON (%s)" % new_fields 2409 ) 2410 2411 # No duplicate IDs 2412 if len(new_fields) != len(set(new_fields)): 2413 raise AnnotationException( 2414 "Can't save annotation fields: field IDs must be unique" 2415 ) 2416 2417 # Annotation fields must at minimum have `type` and `label` keys. 2418 for field_id, annotation_field in new_fields.items(): 2419 if not isinstance(field_id, str): 2420 raise AnnotationException( 2421 "Can't save annotation fields: field ID %s is not a valid string" 2422 % field_id 2423 ) 2424 if "label" not in annotation_field: 2425 raise AnnotationException( 2426 "Can't save annotation fields: all fields must have a label" 2427 % field_id 2428 ) 2429 if "type" not in annotation_field: 2430 raise AnnotationException( 2431 "Can't save annotation fields: all fields must have a type" 2432 % field_id 2433 ) 2434 2435 # Check if fields are removed 2436 if not add and old_fields: 2437 for field_id in old_fields.keys(): 2438 if field_id not in new_fields: 2439 changes = True 2440 2441 # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin 2442 # dataset 2443 for field_id in new_fields.keys(): 2444 if field_id in old_fields and old_fields[field_id].get("from_dataset"): 2445 old_fields[field_id]["label"] = new_fields[field_id][ 2446 "label" 2447 ] # Only labels could've been changed 2448 new_fields[field_id] = old_fields[field_id] 2449 2450 # If we're just adding fields, add them to the old fields. 2451 # If the field already exists, overwrite the old field. 2452 if add and old_fields: 2453 all_fields = old_fields 2454 for field_id, annotation_field in new_fields.items(): 2455 all_fields[field_id] = annotation_field 2456 new_fields = all_fields 2457 2458 # We're saving the new annotation fields as-is. 2459 # Ordering of fields is preserved this way. 2460 self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)}) 2461 self.annotation_fields = new_fields 2462 2463 # If anything changed with the annotation fields, possibly update 2464 # existing annotations (e.g. to delete them or change their labels). 2465 if changes: 2466 Annotation.update_annotations_via_fields( 2467 self.key, old_fields, new_fields, self.db 2468 ) 2469 2470 return len(new_fields) 2471 2472 def get_annotation_metadata(self) -> dict: 2473 """ 2474 Retrieves all the data for this dataset from the annotations table. 2475 """ 2476 2477 annotation_data = self.db.fetchall( 2478 "SELECT * FROM annotations WHERE dataset = '%s';" % self.key 2479 ) 2480 return annotation_data 2481 2482 def __getattr__(self, attr): 2483 """ 2484 Getter so we don't have to use .data all the time 2485 2486 :param attr: Data key to get 2487 :return: Value 2488 """ 2489 2490 if attr in dir(self): 2491 # an explicitly defined attribute should always be called in favour 2492 # of this passthrough 2493 attribute = getattr(self, attr) 2494 return attribute 2495 elif attr in self.data: 2496 return self.data[attr] 2497 else: 2498 raise AttributeError("DataSet instance has no attribute %s" % attr) 2499 2500 def __setattr__(self, attr, value): 2501 """ 2502 Setter so we can flexibly update the database 2503 2504 Also updates internal data stores (.data etc). If the attribute is 2505 unknown, it is stored within the 'parameters' attribute. 2506 2507 :param str attr: Attribute to update 2508 :param value: New value 2509 """ 2510 2511 # don't override behaviour for *actual* class attributes 2512 if attr in dir(self): 2513 super().__setattr__(attr, value) 2514 return 2515 2516 if attr not in self.data: 2517 self.parameters[attr] = value 2518 attr = "parameters" 2519 value = self.parameters 2520 2521 if attr == "parameters": 2522 value = json.dumps(value) 2523 2524 self.db.update("datasets", where={"key": self.key}, data={attr: value}) 2525 2526 self.data[attr] = value 2527 2528 if attr == "parameters": 2529 self.parameters = json.loads(value)
25class DataSet(FourcatModule): 26 """ 27 Provide interface to safely register and run operations on a dataset 28 29 A dataset is a collection of: 30 - A unique identifier 31 - A set of parameters that demarcate the data contained within 32 - The data 33 34 The data is usually stored in a file on the disk; the parameters are stored 35 in a database. The handling of the data, et cetera, is done by other 36 workers; this class defines method to create and manipulate the dataset's 37 properties. 38 """ 39 40 # Attributes must be created here to ensure getattr and setattr work properly 41 data = None 42 key = "" 43 44 _children = None 45 available_processors = None 46 _genealogy = None 47 preset_parent = None 48 parameters = None 49 modules = None 50 annotation_fields = None 51 52 owners = None 53 tagged_owners = None 54 55 db = None 56 folder = None 57 is_new = True 58 59 no_status_updates = False 60 disposable_files = None 61 _queue_position = None 62 63 def __init__( 64 self, 65 parameters=None, 66 key=None, 67 job=None, 68 data=None, 69 db=None, 70 parent="", 71 extension=None, 72 type=None, 73 is_private=True, 74 owner="anonymous", 75 modules=None 76 ): 77 """ 78 Create new dataset object 79 80 If the dataset is not in the database yet, it is added. 81 82 :param dict parameters: Only when creating a new dataset. Dataset 83 parameters, free-form dictionary. 84 :param str key: Dataset key. If given, dataset with this key is loaded. 85 :param int job: Job ID. If given, dataset corresponding to job is 86 loaded. 87 :param dict data: Dataset data, corresponding to a row in the datasets 88 database table. If not given, retrieved from database depending on key. 89 :param db: Database connection 90 :param str parent: Only when creating a new dataset. Parent dataset 91 key to which the one being created is a child. 92 :param str extension: Only when creating a new dataset. Extension of 93 dataset result file. 94 :param str type: Only when creating a new dataset. Type of the dataset, 95 corresponding to the type property of a processor class. 96 :param bool is_private: Only when creating a new dataset. Whether the 97 dataset is private or public. 98 :param str owner: Only when creating a new dataset. The user name of 99 the dataset's creator. 100 :param modules: Module cache. If not given, will be loaded when needed 101 (expensive). Used to figure out what processors are compatible with 102 this dataset. 103 """ 104 self.db = db 105 106 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 107 self.data = {} 108 self.parameters = {} 109 self.available_processors = {} 110 self._genealogy = None 111 self.disposable_files = [] 112 self.modules = modules 113 114 if key is not None: 115 self.key = key 116 current = self.db.fetchone( 117 "SELECT * FROM datasets WHERE key = %s", (self.key,) 118 ) 119 if not current: 120 raise DataSetNotFoundException( 121 "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" 122 % key 123 ) 124 125 elif job is not None: 126 current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),)) 127 if not current: 128 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 129 130 self.key = current["key"] 131 elif data is not None: 132 current = data 133 if ( 134 "query" not in data 135 or "key" not in data 136 or "parameters" not in data 137 or "key_parent" not in data 138 ): 139 raise DataSetException( 140 "DataSet() requires a complete dataset record for its 'data' argument" 141 ) 142 143 self.key = current["key"] 144 else: 145 if parameters is None: 146 raise DataSetException( 147 "DataSet() requires either 'key', or 'parameters' to be given" 148 ) 149 150 if not type: 151 raise DataSetException("Datasets must have their type set explicitly") 152 153 query = self.get_label(parameters, default=type) 154 self.key = self.get_key(query, parameters, parent) 155 current = self.db.fetchone( 156 "SELECT * FROM datasets WHERE key = %s AND query = %s", 157 (self.key, query), 158 ) 159 160 if current: 161 self.data = current 162 self.parameters = json.loads(self.data["parameters"]) 163 self.annotation_fields = json.loads(self.data["annotation_fields"]) \ 164 if self.data.get("annotation_fields") else {} 165 self.is_new = False 166 else: 167 self.data = {"type": type} # get_own_processor needs this 168 own_processor = self.get_own_processor() 169 version = get_software_commit(own_processor) 170 self.data = { 171 "key": self.key, 172 "query": self.get_label(parameters, default=type), 173 "parameters": json.dumps(parameters), 174 "result_file": "", 175 "creator": owner, 176 "status": "", 177 "type": type, 178 "timestamp": int(time.time()), 179 "is_finished": False, 180 "is_private": is_private, 181 "software_version": version[0], 182 "software_source": version[1], 183 "software_file": "", 184 "num_rows": 0, 185 "progress": 0.0, 186 "key_parent": parent, 187 "annotation_fields": "{}" 188 } 189 self.parameters = parameters 190 self.annotation_fields = {} 191 192 self.db.insert("datasets", data=self.data) 193 self.refresh_owners() 194 self.add_owner(owner) 195 196 # Find desired extension from processor if not explicitly set 197 if extension is None: 198 if own_processor: 199 extension = own_processor.get_extension( 200 parent_dataset=DataSet(key=parent, db=db, modules=self.modules) 201 if parent 202 else None 203 ) 204 # Still no extension, default to 'csv' 205 if not extension: 206 extension = "csv" 207 208 # Reserve filename and update data['result_file'] 209 self.reserve_result_file(parameters, extension) 210 211 self.refresh_owners() 212 213 def check_dataset_finished(self): 214 """ 215 Checks if dataset is finished. Returns path to results file is not empty, 216 or 'empty_file' when there were not matches. 217 218 Only returns a path if the dataset is complete. In other words, if this 219 method returns a path, a file with the complete results for this dataset 220 will exist at that location. 221 222 :return: A path to the results file, 'empty_file', or `None` 223 """ 224 if self.data["is_finished"] and self.data["num_rows"] > 0: 225 return self.get_results_path() 226 elif self.data["is_finished"] and self.data["num_rows"] == 0: 227 return 'empty' 228 else: 229 return None 230 231 def get_results_path(self): 232 """ 233 Get path to results file 234 235 Always returns a path, that will at some point contain the dataset 236 data, but may not do so yet. Use this to get the location to write 237 generated results to. 238 239 :return Path: A path to the results file 240 """ 241 # alas we need to instantiate a config reader here - no way around it 242 if not self.folder: 243 self.folder = self.modules.config.get('PATH_DATA') 244 return self.folder.joinpath(self.data["result_file"]) 245 246 def get_results_folder_path(self): 247 """ 248 Get path to folder containing accompanying results 249 250 Returns a path that may not yet be created 251 252 :return Path: A path to the results file 253 """ 254 return self.get_results_path().parent.joinpath("folder_" + self.key) 255 256 def get_log_path(self): 257 """ 258 Get path to dataset log file 259 260 Each dataset has a single log file that documents its creation. This 261 method returns the path to that file. It is identical to the path of 262 the dataset result file, with 'log' as its extension instead. 263 264 :return Path: A path to the log file 265 """ 266 return self.get_results_path().with_suffix(".log") 267 268 def clear_log(self): 269 """ 270 Clears the dataset log file 271 272 If the log file does not exist, it is created empty. The log file will 273 have the same file name as the dataset result file, with the 'log' 274 extension. 275 """ 276 log_path = self.get_log_path() 277 with log_path.open("w"): 278 pass 279 280 def log(self, log): 281 """ 282 Write log message to file 283 284 Writes the log message to the log file on a new line, including a 285 timestamp at the start of the line. Note that this assumes the log file 286 already exists - it should have been created/cleared with clear_log() 287 prior to calling this. 288 289 :param str log: Log message to write 290 """ 291 log_path = self.get_log_path() 292 with log_path.open("a", encoding="utf-8") as outfile: 293 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log)) 294 295 def _iterate_items(self, processor=None, offset=0, *args, **kwargs): 296 """ 297 A generator that iterates through a CSV or NDJSON file 298 299 This is an internal method and should not be called directly. Rather, 300 call iterate_items() and use the generated dictionary and its properties. 301 302 If a reference to a processor is provided, with every iteration, 303 the processor's 'interrupted' flag is checked, and if set a 304 ProcessorInterruptedException is raised, which by default is caught 305 in the worker and subsequently stops execution gracefully. 306 307 There are two file types that can be iterated (currently): CSV files 308 and NDJSON (newline-delimited JSON) files. In the future, one could 309 envision adding a pathway to retrieve items from e.g. a MongoDB 310 collection directly instead of from a static file 311 312 :param BasicProcessor processor: A reference to the processor 313 iterating the dataset. 314 :param offset int: How many items to skip. 315 :return generator: A generator that yields each item as a dictionary 316 """ 317 path = self.get_results_path() 318 319 # Yield through items one by one 320 if path.suffix.lower() == ".csv": 321 with path.open("rb") as infile: 322 wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8") 323 reader = csv.DictReader(wrapped_infile) 324 325 if not self.get_own_processor(): 326 # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not 327 first_item = next(reader) 328 if first_item is None or any( 329 [True for key in first_item if type(key) is not str] 330 ): 331 raise NotImplementedError( 332 f"Cannot iterate through CSV file (deprecated processor {self.type})" 333 ) 334 yield first_item 335 336 for i, item in enumerate(reader): 337 if hasattr(processor, "interrupted") and processor.interrupted: 338 raise ProcessorInterruptedException( 339 "Processor interrupted while iterating through CSV file" 340 ) 341 342 if i < offset: 343 continue 344 345 yield item 346 347 elif path.suffix.lower() == ".ndjson": 348 # In NDJSON format each line in the file is a self-contained JSON 349 with path.open(encoding="utf-8") as infile: 350 for i, line in enumerate(infile): 351 if hasattr(processor, "interrupted") and processor.interrupted: 352 raise ProcessorInterruptedException( 353 "Processor interrupted while iterating through NDJSON file" 354 ) 355 356 if i < offset: 357 continue 358 359 yield json.loads(line) 360 361 else: 362 raise NotImplementedError(f"Cannot iterate through {path.suffix} file") 363 364 def _iterate_archive_contents( 365 self, 366 staging_area=None, 367 immediately_delete=True, 368 filename_filter=None, 369 processor=None, 370 offset=0, 371 *args, **kwargs 372 ): 373 """ 374 A generator that iterates through files in an archive 375 376 With every iteration, the processor's 'interrupted' flag is checked, 377 and if set a ProcessorInterruptedException is raised, which by default 378 is caught and subsequently stops execution gracefully. 379 380 Files are temporarily unzipped and deleted after use. 381 382 :param Path staging_area: Where to store the files while they're 383 being worked with. If omitted, a temporary folder is created and 384 marked for deletion after all files have been yielded 385 :param bool immediately_delete: Temporary files are removed after 386 yielding; False keeps files until the staging_area is removed 387 :param list filename_filter: Whitelist of filenames to iterate. If 388 empty, do not filter 389 :param BasicProcessor processor: A reference to the processor 390 iterating the dataset. 391 :param int offset: Skip this many files before yielding (warning: may 392 skip a metadata file too!) 393 :return: An iterator with a dictionary for each file, containing an 394 `id`, a `path`, and all attributes of the `ZipInfo` object as keys 395 """ 396 path = self.get_results_path() 397 if not path.exists(): 398 return 399 400 if not staging_area: 401 staging_area = self.get_staging_area() 402 403 if not staging_area.exists() or not staging_area.is_dir(): 404 raise RuntimeError(f"Staging area {staging_area} is not a valid folder") 405 406 iterations = 0 407 with zipfile.ZipFile(path, "r") as archive_file: 408 # sorting is important because it ensures .metadata.json is read 409 # first 410 archive_contents = sorted(archive_file.infolist(), key=lambda x: x.filename) 411 for archived_file in archive_contents: 412 413 if filename_filter and archived_file.filename not in filename_filter: 414 continue 415 416 if archived_file.is_dir(): 417 # do not yield folders - we'll get to the files in them 418 continue 419 420 if iterations < offset: 421 iterations += 1 422 continue 423 424 if hasattr(processor, "interrupted") and processor.interrupted: 425 raise ProcessorInterruptedException( 426 "Processor interrupted while iterating through Zip archive" 427 ) 428 429 iterations += 1 430 temp_file = staging_area.joinpath(archived_file.filename) 431 archive_file.extract(archived_file.filename, staging_area) 432 433 # iterated items are expected as a dictionary 434 # we thus make a dictionary from the ZipInfo object 435 # and use the path (inside the archive) as a unique ID 436 yield { 437 "id": archived_file.filename, 438 "path": temp_file, 439 **{ 440 attribute: getattr(archived_file, attribute) for attribute in dir(archived_file) if not attribute.startswith("_") 441 } 442 } 443 444 if immediately_delete: 445 # this, effectively, triggers when the *next* item is 446 # asked for, or if it is the last file 447 temp_file.unlink() 448 449 def iterate_items( 450 self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None, 451 offset=0, *args, **kwargs 452 ): 453 """ 454 Generate mapped dataset items 455 456 Wrapper for _iterate_items that returns a DatasetItem, which can be 457 accessed as a dict returning the original item or (if a mapper is 458 available) the mapped item. Mapped or original versions of the item can 459 also be accessed via the `original` and `mapped_object` properties of 460 the DatasetItem. 461 462 Processors can define a method called `map_item` that can be used to map 463 an item from the dataset file before it is processed any further. This is 464 slower than storing the data file in the right format to begin with but 465 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 466 are nested objects when retrieved from the twitter API that are easier 467 to store as a JSON file than as a flat CSV file, and it would be a shame 468 to throw away that data. 469 470 Note the two parameters warn_unmappable and map_missing. Items can be 471 unmappable in that their structure is too different to coerce into a 472 neat dictionary of the structure the data source expects. This makes it 473 'unmappable' and warn_unmappable determines what happens in this case. 474 It can also be of the right structure, but with some fields missing or 475 incomplete. map_missing determines what happens in that case. The 476 latter is for example possible when importing data via Zeeschuimer, 477 which produces unstably-structured data captured from social media 478 sites. 479 480 :param BasicProcessor processor: A reference to the processor 481 iterating the dataset. 482 :param bool warn_unmappable: If an item is not mappable, skip the item 483 and log a warning 484 :param max_unmappable: Skip at most this many unmappable items; if 485 more are encountered, stop iterating. `None` to never stop. 486 :param map_missing: Indicates what to do with mapped items for which 487 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 488 - 'default': fill missing fields with the default passed by map_item 489 - 'abort': raise a MappedItemIncompleteException if a field is missing 490 - a callback: replace missing field with the return value of the 491 callback. The MappedItem object is passed to the callback as the 492 first argument and the name of the missing field as the second. 493 - a dictionary with a key for each possible missing field: replace missing 494 field with a strategy for that field ('default', 'abort', or a callback) 495 :param get_annotations: Whether to also fetch annotations from the database. 496 This can be disabled to help speed up iteration. 497 :param offset: After how many rows we should yield items. 498 :param bool immediately_delete: Only used when iterating a file 499 archive. Defaults to `True`, if set to `False`, files are not deleted 500 from the staging area after the iteration, so they can be re-used. 501 :param staging_area: Only used when iterating a file archive. Where to 502 store the files while they're being worked with. If omitted, a 503 temporary folder is created and marked for deletion after all files 504 have been yielded. 505 :param list filename_filter: Only used when iterating a file archive. 506 Whitelist of filenames to iterate, others are skipped. If empty, do 507 not filter. 508 :return generator: A generator that yields DatasetItems 509 """ 510 unmapped_items = 0 511 512 # Collect item_mapper for use with filter 513 item_mapper = False 514 own_processor = self.get_own_processor() 515 if own_processor and own_processor.map_item_method_available(dataset=self): 516 item_mapper = True 517 518 # Annotations are dynamically added, and we're handling them as 'extra' map_item fields. 519 # If we're getting annotations, we're caching items so we don't need to retrieve annotations one-by-one. 520 get_annotations = True if self.annotation_fields and get_annotations else False 521 if get_annotations: 522 annotation_fields = self.annotation_fields.copy() 523 item_batch_size = 500 524 dataset_item_cache = [] 525 annotations_before = int(time.time()) 526 527 # Append a number to annotation labels if there's duplicate ones 528 annotation_labels = {} 529 for (annotation_field_id, annotation_field_items,) in annotation_fields.items(): 530 unique_label = annotation_field_items["label"] 531 counter = 1 532 while unique_label in annotation_labels.values(): 533 counter += 1 534 unique_label = f"{annotation_field_items['label']}_{counter}" 535 annotation_labels[annotation_field_id] = unique_label 536 537 # missing field strategy can be for all fields at once, or per field 538 # if it is per field, it is a dictionary with field names and their strategy 539 # if it is for all fields, it may be a callback, 'abort', or 'default' 540 default_strategy = "default" 541 if type(map_missing) is not dict: 542 default_strategy = map_missing 543 map_missing = {} 544 545 iterator = self._iterate_items if self.get_extension() != "zip" else self._iterate_archive_contents 546 547 # Loop through items 548 for i, item in enumerate(iterator(processor=processor, offset=offset, *args, **kwargs)): 549 # Save original to yield 550 original_item = item.copy() 551 552 # Map item 553 if item_mapper: 554 try: 555 mapped_item = own_processor.get_mapped_item(item) 556 except MapItemException as e: 557 if warn_unmappable: 558 self.warn_unmappable_item( 559 i, processor, e, warn_admins=unmapped_items is False 560 ) 561 562 unmapped_items += 1 563 if max_unmappable and unmapped_items > max_unmappable: 564 break 565 else: 566 continue 567 568 # check if fields have been marked as 'missing' in the 569 # underlying data, and treat according to the chosen strategy 570 if mapped_item.get_missing_fields(): 571 for missing_field in mapped_item.get_missing_fields(): 572 strategy = map_missing.get(missing_field, default_strategy) 573 574 if callable(strategy): 575 # delegate handling to a callback 576 mapped_item.data[missing_field] = strategy( 577 mapped_item.data, missing_field 578 ) 579 elif strategy == "abort": 580 # raise an exception to be handled at the processor level 581 raise MappedItemIncompleteException( 582 f"Cannot process item, field {missing_field} missing in source data." 583 ) 584 elif strategy == "default": 585 # use whatever was passed to the object constructor 586 mapped_item.data[missing_field] = mapped_item.data[ 587 missing_field 588 ].value 589 else: 590 raise ValueError( 591 "map_missing must be 'abort', 'default', or a callback." 592 ) 593 else: 594 mapped_item = original_item 595 596 # yield a DatasetItem, which is a dict with some special properties 597 dataset_item = DatasetItem( 598 mapper=item_mapper, 599 original=original_item, 600 mapped_object=mapped_item, 601 data_file=original_item["path"] if "path" in original_item and issubclass(type(original_item["path"]), os.PathLike) else None, 602 **( 603 mapped_item.get_item_data() 604 if type(mapped_item) is MappedItem 605 else mapped_item 606 ), 607 ) 608 609 # If we're getting annotations, yield in items batches so we don't need to get annotations per item. 610 if get_annotations: 611 dataset_item_cache.append(dataset_item) 612 613 # When we reach the batch limit or the end of the dataset, 614 # get the annotations for cached items and yield the entire thing. 615 if len(dataset_item_cache) >= item_batch_size or i == (self.num_rows - 1): 616 617 item_ids = [dataset_item.get("id") for dataset_item in dataset_item_cache] 618 619 # Dict with item ids for fast lookup 620 annotations_dict = collections.defaultdict(dict) 621 annotations = self.get_annotations_for_item(item_ids, before=annotations_before) 622 for item_annotation in annotations: 623 item_id = item_annotation.item_id 624 if item_annotation: 625 annotations_dict[item_id][item_annotation.field_id] = item_annotation.value 626 627 # Process each dataset item 628 for dataset_item in dataset_item_cache: 629 item_id = dataset_item.get("id") 630 item_annotations = annotations_dict.get(item_id, {}) 631 632 for annotation_field_id, annotation_field_items in annotation_fields.items(): 633 # Get annotation value 634 value = item_annotations.get(annotation_field_id, "") 635 636 # Convert list to string if needed 637 if isinstance(value, list): 638 value = ",".join(value) 639 elif value != "": 640 value = str(value) # Ensure string type 641 else: 642 value = "" 643 644 dataset_item[annotation_labels[annotation_field_id]] = value 645 646 yield dataset_item 647 648 dataset_item_cache = [] 649 650 else: 651 yield dataset_item 652 653 654 def sort_and_iterate_items( 655 self, sort="", reverse=False, chunk_size=50000, **kwargs 656 ) -> dict: 657 """ 658 Loop through items in a dataset, sorted by a given key. 659 660 This is a wrapper function for `iterate_items()` with the 661 added functionality of sorting a dataset. 662 663 :param sort: The item key that determines the sort order. 664 :param reverse: Whether to sort by largest values first. 665 :param chunk_size: How many items to write 666 667 :returns dict: Yields iterated post 668 """ 669 670 def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False): 671 """ 672 Sort items based on the given key and order. 673 674 :param items_to_sort: The items to sort 675 :param sort_key: The key to sort by 676 :param reverse: Whether to sort in reverse order 677 :return: Sorted items 678 """ 679 if reverse is False and (sort_key == "dataset-order" or sort_key == ""): 680 # Sort by dataset order 681 yield from items_to_sort 682 elif sort_key == "dataset-order" and reverse: 683 # Sort by dataset order in reverse 684 yield from reversed(list(items_to_sort)) 685 else: 686 # Sort on the basis of a column value 687 if not convert_sort_to_float: 688 yield from sorted( 689 items_to_sort, 690 key=lambda x: x.get(sort_key, ""), 691 reverse=reverse, 692 ) 693 else: 694 # Dataset fields can contain integers and empty strings. 695 # Since these cannot be compared, we will convert every 696 # empty string to 0. 697 yield from sorted( 698 items_to_sort, 699 key=lambda x: convert_to_float(x.get(sort_key, ""), force=True), 700 reverse=reverse, 701 ) 702 703 if self.num_rows < chunk_size: 704 try: 705 # First try to force-sort float values. If this doesn't work, it'll be alphabetical. 706 yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True) 707 except (TypeError, ValueError): 708 yield from sort_items( 709 self.iterate_items(**kwargs), 710 sort, 711 reverse, 712 convert_sort_to_float=False 713 ) 714 715 else: 716 # For large datasets, we will use chunk sorting 717 staging_area = self.get_staging_area() 718 buffer = [] 719 chunk_files = [] 720 convert_sort_to_float = True 721 fieldnames = self.get_columns() 722 723 def write_chunk(buffer, chunk_index): 724 """ 725 Write a chunk of data to a temporary file 726 727 :param buffer: The buffer containing the chunk of data 728 :param chunk_index: The index of the chunk 729 :return: The path to the temporary file 730 """ 731 temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv") 732 with temp_file.open("w", encoding="utf-8") as chunk_file: 733 writer = csv.DictWriter(chunk_file, fieldnames=fieldnames) 734 writer.writeheader() 735 writer.writerows(buffer) 736 return temp_file 737 738 # Divide the dataset into sorted chunks 739 for item in self.iterate_items(**kwargs): 740 buffer.append(item) 741 if len(buffer) >= chunk_size: 742 try: 743 buffer = list( 744 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 745 ) 746 except (TypeError, ValueError): 747 convert_sort_to_float = False 748 buffer = list( 749 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 750 ) 751 752 chunk_files.append(write_chunk(buffer, len(chunk_files))) 753 buffer.clear() 754 755 # Sort and write any remaining items in the buffer 756 if buffer: 757 buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float)) 758 chunk_files.append(write_chunk(buffer, len(chunk_files))) 759 buffer.clear() 760 761 # Merge sorted chunks into the final sorted file 762 sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv") 763 with sorted_file.open("w", encoding="utf-8") as outfile: 764 writer = csv.DictWriter(outfile, fieldnames=self.get_columns()) 765 writer.writeheader() 766 767 # Open all chunk files for reading 768 chunk_readers = [ 769 csv.DictReader(chunk.open("r", encoding="utf-8")) 770 for chunk in chunk_files 771 ] 772 heap = [] 773 774 # Initialize the heap with the first row from each chunk 775 for i, reader in enumerate(chunk_readers): 776 try: 777 row = next(reader) 778 if sort == "dataset-order" and reverse: 779 # Use a reverse index for "dataset-order" and reverse=True 780 sort_key = -i 781 elif convert_sort_to_float: 782 # Negate numeric keys for reverse sorting 783 sort_key = ( 784 -convert_to_float(row.get(sort, "")) 785 if reverse 786 else convert_to_float(row.get(sort, "")) 787 ) 788 else: 789 if reverse: 790 # For reverse string sorting, invert string comparison by creating a tuple 791 # with an inverted string - this makes Python's tuple comparison work in reverse 792 sort_key = ( 793 tuple(-ord(c) for c in row.get(sort, "")), 794 -i, 795 ) 796 else: 797 sort_key = (row.get(sort, ""), i) 798 heap.append((sort_key, i, row)) 799 except StopIteration: 800 pass 801 802 # Use a heap to merge sorted chunks 803 import heapq 804 805 heapq.heapify(heap) 806 while heap: 807 _, chunk_index, smallest_row = heapq.heappop(heap) 808 writer.writerow(smallest_row) 809 try: 810 next_row = next(chunk_readers[chunk_index]) 811 if sort == "dataset-order" and reverse: 812 # Use a reverse index for "dataset-order" and reverse=True 813 sort_key = -chunk_index 814 elif convert_sort_to_float: 815 sort_key = ( 816 -convert_to_float(next_row.get(sort, "")) 817 if reverse 818 else convert_to_float(next_row.get(sort, "")) 819 ) 820 else: 821 # Use the same inverted comparison for string values 822 if reverse: 823 sort_key = ( 824 tuple(-ord(c) for c in next_row.get(sort, "")), 825 -chunk_index, 826 ) 827 else: 828 sort_key = (next_row.get(sort, ""), chunk_index) 829 heapq.heappush(heap, (sort_key, chunk_index, next_row)) 830 except StopIteration: 831 pass 832 833 # Read the sorted file and yield each item 834 with sorted_file.open("r", encoding="utf-8") as infile: 835 reader = csv.DictReader(infile) 836 for item in reader: 837 yield item 838 839 # Remove the temporary files 840 if staging_area.is_dir(): 841 shutil.rmtree(staging_area) 842 843 def get_staging_area(self): 844 """ 845 Get path to a temporary folder in which files can be stored before 846 finishing 847 848 This folder must be created before use, but is guaranteed to not exist 849 yet. The folder may be used as a staging area for the dataset data 850 while it is being processed. 851 852 :return Path: Path to folder 853 """ 854 results_file = self.get_results_path() 855 856 results_dir_base = results_file.parent 857 results_dir = results_file.name.replace(".", "") + "-staging" 858 results_path = results_dir_base.joinpath(results_dir) 859 index = 1 860 while results_path.exists(): 861 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 862 index += 1 863 864 # create temporary folder 865 results_path.mkdir() 866 867 # Storing the staging area with the dataset so that it can be removed later 868 self.disposable_files.append(results_path) 869 870 return results_path 871 872 def remove_disposable_files(self): 873 """ 874 Remove any disposable files and folders, such as staging areas 875 876 Called from BasicProcessor after processing a dataset finishes. 877 """ 878 # Remove DataSet staging areas 879 if self.disposable_files: 880 for disposable_file in self.disposable_files: 881 if disposable_file.exists(): 882 shutil.rmtree(disposable_file) 883 884 def finish(self, num_rows=0): 885 """ 886 Declare the dataset finished 887 """ 888 if self.data["is_finished"]: 889 raise RuntimeError("Cannot finish a finished dataset again") 890 891 self.db.update( 892 "datasets", 893 where={"key": self.data["key"]}, 894 data={ 895 "is_finished": True, 896 "num_rows": num_rows, 897 "progress": 1.0, 898 "timestamp_finished": int(time.time()), 899 }, 900 ) 901 self.data["is_finished"] = True 902 self.data["num_rows"] = num_rows 903 904 def copy(self, shallow=True): 905 """ 906 Copies the dataset, making a new version with a unique key 907 908 909 :param bool shallow: Shallow copy: does not copy the result file, but 910 instead refers to the same file as the original dataset did 911 :return Dataset: Copied dataset 912 """ 913 parameters = self.parameters.copy() 914 915 # a key is partially based on the parameters. so by setting these extra 916 # attributes, we also ensure a unique key will be generated for the 917 # copy 918 # possibly todo: don't use time for uniqueness (but one shouldn't be 919 # copying a dataset multiple times per microsecond, that's not what 920 # this is for) 921 parameters["copied_from"] = self.key 922 parameters["copied_at"] = time.time() 923 924 copy = DataSet( 925 parameters=parameters, 926 db=self.db, 927 extension=self.result_file.split(".")[-1], 928 type=self.type, 929 modules=self.modules 930 ) 931 932 for field in self.data: 933 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 934 continue 935 copy.__setattr__(field, self.data[field]) 936 937 if shallow: 938 # use the same result file 939 copy.result_file = self.result_file 940 else: 941 # copy to new file with new key 942 shutil.copy(self.get_results_path(), copy.get_results_path()) 943 944 if self.is_finished(): 945 copy.finish(self.num_rows) 946 947 # make sure ownership is also copied 948 copy.copy_ownership_from(self) 949 950 return copy 951 952 def delete(self, commit=True, queue=None): 953 """ 954 Delete the dataset, and all its children 955 956 Deletes both database records and result files. Note that manipulating 957 a dataset object after it has been deleted is undefined behaviour. 958 959 :param bool commit: Commit SQL DELETE query? 960 """ 961 # first, recursively delete children 962 children = self.db.fetchall( 963 "SELECT * FROM datasets WHERE key_parent = %s", (self.key,) 964 ) 965 for child in children: 966 try: 967 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 968 child.delete(commit=commit) 969 except DataSetException: 970 # dataset already deleted - race condition? 971 pass 972 973 # delete any queued jobs for this dataset 974 try: 975 job = Job.get_by_remote_ID(self.key, self.db, self.type) 976 if job.is_claimed: 977 # tell API to stop any jobs running for this dataset 978 # level 2 = cancel job 979 # we're not interested in the result - if the API is available, 980 # it will do its thing, if it's not the backend is probably not 981 # running so the job also doesn't need to be interrupted 982 call_api( 983 "cancel-job", 984 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 985 False, 986 ) 987 988 # this deletes the job from the database 989 job.finish(True) 990 991 except JobNotFoundException: 992 pass 993 994 # delete this dataset's own annotations 995 self.db.delete("annotations", where={"dataset": self.key}, commit=commit) 996 # delete annotations that have been generated as part of this dataset 997 self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit) 998 # delete annotation fields on parent dataset(s) stemming from this dataset 999 for related_dataset in self.get_genealogy(update_cache=True): 1000 field_deleted = False 1001 annotation_fields = related_dataset.annotation_fields 1002 if annotation_fields: 1003 for field_id in list(annotation_fields.keys()): 1004 if annotation_fields[field_id].get("from_dataset", "") == self.key: 1005 del annotation_fields[field_id] 1006 field_deleted = True 1007 if field_deleted: 1008 related_dataset.save_annotation_fields(annotation_fields) 1009 1010 # delete dataset from database 1011 self.db.delete("datasets", where={"key": self.key}, commit=commit) 1012 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 1013 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 1014 1015 # delete from drive 1016 try: 1017 if self.get_results_path().exists(): 1018 self.get_results_path().unlink() 1019 if self.get_results_path().with_suffix(".log").exists(): 1020 self.get_results_path().with_suffix(".log").unlink() 1021 if self.get_results_folder_path().exists(): 1022 shutil.rmtree(self.get_results_folder_path()) 1023 1024 except FileNotFoundError: 1025 # already deleted, apparently 1026 pass 1027 except PermissionError as e: 1028 self.db.log.error( 1029 f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}" 1030 ) 1031 1032 def update_children(self, **kwargs): 1033 """ 1034 Update an attribute for all child datasets 1035 1036 Can be used to e.g. change the owner, version, finished status for all 1037 datasets in a tree 1038 1039 :param kwargs: Parameters corresponding to known dataset attributes 1040 """ 1041 for child in self.get_children(update=True): 1042 for attr, value in kwargs.items(): 1043 child.__setattr__(attr, value) 1044 1045 child.update_children(**kwargs) 1046 1047 def is_finished(self): 1048 """ 1049 Check if dataset is finished 1050 :return bool: 1051 """ 1052 return bool(self.data["is_finished"]) 1053 1054 def is_rankable(self, multiple_items=True): 1055 """ 1056 Determine if a dataset is rankable 1057 1058 Rankable means that it is a CSV file with 'date' and 'value' columns 1059 as well as one or more item label columns 1060 1061 :param bool multiple_items: Consider datasets with multiple items per 1062 item (e.g. word_1, word_2, etc)? 1063 1064 :return bool: Whether the dataset is rankable or not 1065 """ 1066 if ( 1067 self.get_results_path().suffix != ".csv" 1068 or not self.get_results_path().exists() 1069 ): 1070 return False 1071 1072 column_options = {"date", "value", "item"} 1073 if multiple_items: 1074 column_options.add("word_1") 1075 1076 with self.get_results_path().open(encoding="utf-8") as infile: 1077 reader = csv.DictReader(infile) 1078 try: 1079 return len(set(reader.fieldnames) & column_options) >= 3 1080 except (TypeError, ValueError): 1081 return False 1082 1083 def is_accessible_by(self, username, role="owner"): 1084 """ 1085 Check if dataset has given user as owner 1086 1087 :param str|User username: Username to check for 1088 :return bool: 1089 """ 1090 if type(username) is not str: 1091 if hasattr(username, "get_id"): 1092 username = username.get_id() 1093 else: 1094 raise TypeError("User must be a str or User object") 1095 1096 # 'normal' owners 1097 if username in [ 1098 owner 1099 for owner, meta in self.owners.items() 1100 if (role is None or meta["role"] == role) 1101 ]: 1102 return True 1103 1104 # owners that are owner by being part of a tag 1105 if username in itertools.chain( 1106 *[ 1107 tagged_owners 1108 for tag, tagged_owners in self.tagged_owners.items() 1109 if (role is None or self.owners[f"tag:{tag}"]["role"] == role) 1110 ] 1111 ): 1112 return True 1113 1114 return False 1115 1116 def get_owners_users(self, role="owner"): 1117 """ 1118 Get list of dataset owners 1119 1120 This returns a list of *users* that are considered owners. Tags are 1121 transparently replaced with the users with that tag. 1122 1123 :param str|None role: Role to check for. If `None`, all owners are 1124 returned regardless of role. 1125 1126 :return set: De-duplicated owner list 1127 """ 1128 # 'normal' owners 1129 owners = [ 1130 owner 1131 for owner, meta in self.owners.items() 1132 if (role is None or meta["role"] == role) and not owner.startswith("tag:") 1133 ] 1134 1135 # owners that are owner by being part of a tag 1136 owners.extend( 1137 itertools.chain( 1138 *[ 1139 tagged_owners 1140 for tag, tagged_owners in self.tagged_owners.items() 1141 if role is None or self.owners[f"tag:{tag}"]["role"] == role 1142 ] 1143 ) 1144 ) 1145 1146 # de-duplicate before returning 1147 return set(owners) 1148 1149 def get_owners(self, role="owner"): 1150 """ 1151 Get list of dataset owners 1152 1153 This returns a list of all owners, and does not transparently resolve 1154 tags (like `get_owners_users` does). 1155 1156 :param str|None role: Role to check for. If `None`, all owners are 1157 returned regardless of role. 1158 1159 :return set: De-duplicated owner list 1160 """ 1161 return [ 1162 owner 1163 for owner, meta in self.owners.items() 1164 if (role is None or meta["role"] == role) 1165 ] 1166 1167 def add_owner(self, username, role="owner"): 1168 """ 1169 Set dataset owner 1170 1171 If the user is already an owner, but with a different role, the role is 1172 updated. If the user is already an owner with the same role, nothing happens. 1173 1174 :param str|User username: Username to set as owner 1175 :param str|None role: Role to add user with. 1176 """ 1177 if type(username) is not str: 1178 if hasattr(username, "get_id"): 1179 username = username.get_id() 1180 else: 1181 raise TypeError("User must be a str or User object") 1182 1183 if username not in self.owners: 1184 self.owners[username] = {"name": username, "key": self.key, "role": role} 1185 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 1186 1187 elif username in self.owners and self.owners[username]["role"] != role: 1188 self.db.update( 1189 "datasets_owners", 1190 data={"role": role}, 1191 where={"name": username, "key": self.key}, 1192 ) 1193 self.owners[username]["role"] = role 1194 1195 if username.startswith("tag:"): 1196 # this is a bit more complicated than just adding to the list of 1197 # owners, so do a full refresh 1198 self.refresh_owners() 1199 1200 # make sure children's owners remain in sync 1201 for child in self.get_children(update=True): 1202 child.add_owner(username, role) 1203 # not recursive, since we're calling it from recursive code! 1204 child.copy_ownership_from(self, recursive=False) 1205 1206 def remove_owner(self, username): 1207 """ 1208 Remove dataset owner 1209 1210 If no owner is set, the dataset is assigned to the anonymous user. 1211 If the user is not an owner, nothing happens. 1212 1213 :param str|User username: Username to set as owner 1214 """ 1215 if type(username) is not str: 1216 if hasattr(username, "get_id"): 1217 username = username.get_id() 1218 else: 1219 raise TypeError("User must be a str or User object") 1220 1221 if username in self.owners: 1222 del self.owners[username] 1223 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 1224 1225 if not self.owners: 1226 self.add_owner("anonymous") 1227 1228 if username in self.tagged_owners: 1229 del self.tagged_owners[username] 1230 1231 # make sure children's owners remain in sync 1232 for child in self.get_children(update=True): 1233 child.remove_owner(username) 1234 # not recursive, since we're calling it from recursive code! 1235 child.copy_ownership_from(self, recursive=False) 1236 1237 def refresh_owners(self): 1238 """ 1239 Update internal owner cache 1240 1241 This makes sure that the list of *users* and *tags* which can access the 1242 dataset is up to date. 1243 """ 1244 self.owners = { 1245 owner["name"]: owner 1246 for owner in self.db.fetchall( 1247 "SELECT * FROM datasets_owners WHERE key = %s", (self.key,) 1248 ) 1249 } 1250 1251 # determine which users (if any) are owners of the dataset by having a 1252 # tag that is listed as an owner 1253 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 1254 if owner_tags: 1255 tagged_owners = self.db.fetchall( 1256 "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,) 1257 ) 1258 self.tagged_owners = { 1259 owner_tag: [ 1260 user["name"] for user in tagged_owners if owner_tag in user["tags"] 1261 ] 1262 for owner_tag in owner_tags 1263 } 1264 else: 1265 self.tagged_owners = {} 1266 1267 def copy_ownership_from(self, dataset, recursive=True): 1268 """ 1269 Copy ownership 1270 1271 This is useful to e.g. make sure a dataset's ownership stays in sync 1272 with its parent 1273 1274 :param Dataset dataset: Parent to copy from 1275 :return: 1276 """ 1277 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 1278 1279 for role in ("owner", "viewer"): 1280 owners = dataset.get_owners(role=role) 1281 for owner in owners: 1282 self.db.insert( 1283 "datasets_owners", 1284 data={"key": self.key, "name": owner, "role": role}, 1285 commit=False, 1286 safe=True, 1287 ) 1288 1289 self.db.commit() 1290 if recursive: 1291 for child in self.get_children(update=True): 1292 child.copy_ownership_from(self, recursive=recursive) 1293 1294 def get_parameters(self): 1295 """ 1296 Get dataset parameters 1297 1298 The dataset parameters are stored as JSON in the database - parse them 1299 and return the resulting object 1300 1301 :return: Dataset parameters as originally stored 1302 """ 1303 try: 1304 return json.loads(self.data["parameters"]) 1305 except json.JSONDecodeError: 1306 return {} 1307 1308 def get_columns(self): 1309 """ 1310 Returns the dataset columns. 1311 1312 Useful for processor input forms. Can deal with both CSV and NDJSON 1313 files, the latter only if a `map_item` function is available in the 1314 processor that generated it. While in other cases one could use the 1315 keys of the JSON object, this is not always possible in follow-up code 1316 that uses the 'column' names, so for consistency this function acts as 1317 if no column can be parsed if no `map_item` function exists. 1318 1319 :return list: List of dataset columns; empty list if unable to parse 1320 """ 1321 if not self.get_results_path().exists(): 1322 # no file to get columns from 1323 return [] 1324 1325 if (self.get_results_path().suffix.lower() == ".csv") or ( 1326 self.get_results_path().suffix.lower() == ".ndjson" 1327 and self.get_own_processor() is not None 1328 and self.get_own_processor().map_item_method_available(dataset=self) 1329 ): 1330 items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100) 1331 try: 1332 keys = list(next(items).keys()) 1333 if self.annotation_fields: 1334 for annotation_field in self.annotation_fields.values(): 1335 annotation_column = annotation_field["label"] 1336 label_count = 1 1337 while annotation_column in keys: 1338 label_count += 1 1339 annotation_column = ( 1340 f"{annotation_field['label']}_{label_count}" 1341 ) 1342 keys.append(annotation_column) 1343 columns = keys 1344 except (StopIteration, NotImplementedError): 1345 # No items or otherwise unable to iterate 1346 columns = [] 1347 finally: 1348 del items 1349 else: 1350 # Filetype not CSV or an NDJSON with `map_item` 1351 columns = [] 1352 1353 return columns 1354 1355 def update_label(self, label): 1356 """ 1357 Update label for this dataset 1358 1359 :param str label: New label 1360 :return str: The new label, as returned by get_label 1361 """ 1362 self.parameters["label"] = label 1363 1364 self.db.update( 1365 "datasets", 1366 data={"parameters": json.dumps(self.parameters)}, 1367 where={"key": self.key}, 1368 ) 1369 return self.get_label() 1370 1371 def get_label(self, parameters=None, default="Query"): 1372 """ 1373 Generate a readable label for the dataset 1374 1375 :param dict parameters: Parameters of the dataset 1376 :param str default: Label to use if it cannot be inferred from the 1377 parameters 1378 1379 :return str: Label 1380 """ 1381 if not parameters: 1382 parameters = self.parameters 1383 1384 if parameters.get("label"): 1385 return parameters["label"] 1386 elif parameters.get("body_query") and parameters["body_query"] != "empty": 1387 return parameters["body_query"] 1388 elif parameters.get("body_match") and parameters["body_match"] != "empty": 1389 return parameters["body_match"] 1390 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 1391 return parameters["subject_query"] 1392 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 1393 return parameters["subject_match"] 1394 elif parameters.get("query"): 1395 label = parameters["query"] 1396 # Some legacy datasets have lists as query data 1397 if isinstance(label, list): 1398 label = ", ".join(label) 1399 1400 label = label if len(label) < 30 else label[:25] + "..." 1401 label = label.strip().replace("\n", ", ") 1402 return label 1403 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 1404 return "Flag: %s" % parameters["country_flag"] 1405 elif parameters.get("country_name") and parameters["country_name"] != "all": 1406 return "Country: %s" % parameters["country_name"] 1407 elif parameters.get("filename"): 1408 return parameters["filename"] 1409 elif parameters.get("board") and "datasource" in parameters: 1410 return parameters["datasource"] + "/" + parameters["board"] 1411 elif ( 1412 "datasource" in parameters 1413 and parameters["datasource"] in self.modules.datasources 1414 ): 1415 return ( 1416 self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 1417 ) 1418 else: 1419 return default 1420 1421 def change_datasource(self, datasource): 1422 """ 1423 Change the datasource type for this dataset 1424 1425 :param str label: New datasource type 1426 :return str: The new datasource type 1427 """ 1428 1429 self.parameters["datasource"] = datasource 1430 1431 self.db.update( 1432 "datasets", 1433 data={"parameters": json.dumps(self.parameters)}, 1434 where={"key": self.key}, 1435 ) 1436 return datasource 1437 1438 def reserve_result_file(self, parameters=None, extension="csv"): 1439 """ 1440 Generate a unique path to the results file for this dataset 1441 1442 This generates a file name for the data file of this dataset, and makes sure 1443 no file exists or will exist at that location other than the file we 1444 expect (i.e. the data for this particular dataset). 1445 1446 :param str extension: File extension, "csv" by default 1447 :param parameters: Dataset parameters 1448 :return bool: Whether the file path was successfully reserved 1449 """ 1450 if self.data["is_finished"]: 1451 raise RuntimeError("Cannot reserve results file for a finished dataset") 1452 1453 # Use 'random' for random post queries 1454 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 1455 file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"] 1456 # Use country code for country flag queries 1457 elif "country_flag" in parameters and parameters["country_flag"] != "all": 1458 file = ( 1459 "countryflag-" 1460 + str(parameters["country_flag"]) 1461 + "-" 1462 + self.data["key"] 1463 ) 1464 # Use the query string for all other queries 1465 else: 1466 query_bit = self.data["query"].replace(" ", "-").lower() 1467 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 1468 query_bit = query_bit[:100] # Crop to avoid OSError 1469 file = query_bit + "-" + self.data["key"] 1470 file = re.sub(r"[-]+", "-", file) 1471 1472 self.data["result_file"] = file + "." + extension.lower() 1473 index = 1 1474 while self.get_results_path().is_file(): 1475 self.data["result_file"] = file + "-" + str(index) + "." + extension.lower() 1476 index += 1 1477 1478 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 1479 data={"result_file": self.data["result_file"]}) 1480 return updated > 0 1481 1482 def get_key(self, query, parameters, parent="", time_offset=0): 1483 """ 1484 Generate a unique key for this dataset that can be used to identify it 1485 1486 The key is a hash of a combination of the query string and parameters. 1487 You never need to call this, really: it's used internally. 1488 1489 :param str query: Query string 1490 :param parameters: Dataset parameters 1491 :param parent: Parent dataset's key (if applicable) 1492 :param time_offset: Offset to add to the time component of the dataset 1493 key. This can be used to ensure a unique key even if the parameters and 1494 timing is otherwise identical to an existing dataset's 1495 1496 :return str: Dataset key 1497 """ 1498 # Return a hash based on parameters 1499 # we're going to use the hash of the parameters to uniquely identify 1500 # the dataset, so make sure it's always in the same order, or we might 1501 # end up creating multiple keys for the same dataset if python 1502 # decides to return the dict in a different order 1503 param_key = collections.OrderedDict() 1504 for key in sorted(parameters): 1505 param_key[key] = parameters[key] 1506 1507 # we additionally use the current time as a salt - this should usually 1508 # ensure a unique key for the dataset. if for some reason there is a 1509 # hash collision 1510 param_key["_salt"] = int(time.time()) + time_offset 1511 1512 parent_key = str(parent) if parent else "" 1513 plain_key = repr(param_key) + str(query) + parent_key 1514 hashed_key = hash_to_md5(plain_key) 1515 1516 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1517 # key exists, generate a new one 1518 return self.get_key( 1519 query, parameters, parent, time_offset=random.randint(1, 10) 1520 ) 1521 else: 1522 return hashed_key 1523 1524 def set_key(self, key): 1525 """ 1526 Change dataset key 1527 1528 In principe, keys should never be changed. But there are rare cases 1529 where it is useful to do so, in particular when importing a dataset 1530 from another 4CAT instance; in that case it makes sense to try and 1531 ensure that the key is the same as it was before. This function sets 1532 the dataset key and updates any dataset references to it. 1533 1534 :param str key: Key to set 1535 :return str: Key that was set. If the desired key already exists, the 1536 original key is kept. 1537 """ 1538 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1539 if key_exists or not key: 1540 return self.key 1541 1542 old_key = self.key 1543 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1544 1545 # update references 1546 self.db.update( 1547 "datasets", data={"key_parent": key}, where={"key_parent": old_key} 1548 ) 1549 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1550 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1551 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1552 1553 # for good measure 1554 self.db.commit() 1555 self.key = key 1556 1557 return self.key 1558 1559 def get_status(self): 1560 """ 1561 Get Dataset status 1562 1563 :return string: Dataset status 1564 """ 1565 return self.data["status"] 1566 1567 def update_status(self, status, is_final=False): 1568 """ 1569 Update dataset status 1570 1571 The status is a string that may be displayed to a user to keep them 1572 updated and informed about the progress of a dataset. No memory is kept 1573 of earlier dataset statuses; the current status is overwritten when 1574 updated. 1575 1576 Statuses are also written to the dataset log file. 1577 1578 :param string status: Dataset status 1579 :param bool is_final: If this is `True`, subsequent calls to this 1580 method while the object is instantiated will not update the dataset 1581 status. 1582 :return bool: Status update successful? 1583 """ 1584 if self.no_status_updates: 1585 return 1586 1587 # for presets, copy the updated status to the preset(s) this is part of 1588 if self.preset_parent is None: 1589 self.preset_parent = [ 1590 parent 1591 for parent in self.get_genealogy() 1592 if parent.type.find("preset-") == 0 and parent.key != self.key 1593 ][:1] 1594 1595 if self.preset_parent: 1596 for preset_parent in self.preset_parent: 1597 if not preset_parent.is_finished(): 1598 preset_parent.update_status(status) 1599 1600 self.data["status"] = status 1601 updated = self.db.update( 1602 "datasets", where={"key": self.data["key"]}, data={"status": status} 1603 ) 1604 1605 if is_final: 1606 self.no_status_updates = True 1607 1608 self.log(status) 1609 1610 return updated > 0 1611 1612 def update_progress(self, progress): 1613 """ 1614 Update dataset progress 1615 1616 The progress can be used to indicate to a user how close the dataset 1617 is to completion. 1618 1619 :param float progress: Between 0 and 1. 1620 :return: 1621 """ 1622 progress = min(1, max(0, progress)) # clamp 1623 if type(progress) is int: 1624 progress = float(progress) 1625 1626 self.data["progress"] = progress 1627 updated = self.db.update( 1628 "datasets", where={"key": self.data["key"]}, data={"progress": progress} 1629 ) 1630 return updated > 0 1631 1632 def get_progress(self): 1633 """ 1634 Get dataset progress 1635 1636 :return float: Progress, between 0 and 1 1637 """ 1638 return self.data["progress"] 1639 1640 def finish_with_error(self, error): 1641 """ 1642 Set error as final status, and finish with 0 results 1643 1644 This is a convenience function to avoid having to repeat 1645 "update_status" and "finish" a lot. 1646 1647 :param str error: Error message for final dataset status. 1648 :return: 1649 """ 1650 self.update_status(error, is_final=True) 1651 self.finish(0) 1652 1653 return None 1654 1655 def update_version(self, version): 1656 """ 1657 Update software version used for this dataset 1658 1659 This can be used to verify the code that was used to process this dataset. 1660 1661 :param string version: Version identifier 1662 :return bool: Update successul? 1663 """ 1664 try: 1665 # this fails if the processor type is unknown 1666 # edge case, but let's not crash... 1667 processor_path = self.modules.processors.get(self.data["type"]).filepath 1668 except AttributeError: 1669 processor_path = "" 1670 1671 updated = self.db.update( 1672 "datasets", 1673 where={"key": self.data["key"]}, 1674 data={ 1675 "software_version": version[0], 1676 "software_source": version[1], 1677 "software_file": processor_path, 1678 }, 1679 ) 1680 1681 return updated > 0 1682 1683 def delete_parameter(self, parameter, instant=True): 1684 """ 1685 Delete a parameter from the dataset metadata 1686 1687 :param string parameter: Parameter to delete 1688 :param bool instant: Also delete parameters in this instance object? 1689 :return bool: Update successul? 1690 """ 1691 parameters = self.parameters.copy() 1692 if parameter in parameters: 1693 del parameters[parameter] 1694 else: 1695 return False 1696 1697 updated = self.db.update( 1698 "datasets", 1699 where={"key": self.data["key"]}, 1700 data={"parameters": json.dumps(parameters)}, 1701 ) 1702 1703 if instant: 1704 self.parameters = parameters 1705 1706 return updated > 0 1707 1708 def get_version_url(self, file): 1709 """ 1710 Get a versioned github URL for the version this dataset was processed with 1711 1712 :param file: File to link within the repository 1713 :return: URL, or an empty string 1714 """ 1715 if not self.data["software_source"]: 1716 return "" 1717 1718 filepath = self.data.get("software_file", "") 1719 if filepath.startswith("/config/extensions/"): 1720 # go to root of extension 1721 filepath = "/" + "/".join(filepath.split("/")[3:]) 1722 1723 return ( 1724 self.data["software_source"] 1725 + "/blob/" 1726 + self.data["software_version"] 1727 + filepath 1728 ) 1729 1730 def top_parent(self): 1731 """ 1732 Get root dataset 1733 1734 Traverses the tree of datasets this one is part of until it finds one 1735 with no source_dataset dataset, then returns that dataset. 1736 1737 :return Dataset: Parent dataset 1738 """ 1739 genealogy = self.get_genealogy() 1740 return genealogy[0] 1741 1742 def get_genealogy(self, update_cache=False): 1743 """ 1744 Get genealogy of this dataset 1745 1746 Creates a list of DataSet objects, with the first one being the 1747 'top' dataset, and each subsequent one being a child of the previous 1748 one, ending with the current dataset. 1749 1750 :param bool update_cache: Update the cached genealogy if True, else return cached value 1751 :return list: Dataset genealogy, oldest dataset first 1752 """ 1753 if not self._genealogy or update_cache: 1754 key_parent = self.key_parent 1755 genealogy = [] 1756 1757 while key_parent: 1758 try: 1759 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1760 except DataSetException: 1761 break 1762 1763 genealogy.append(parent) 1764 if parent.key_parent: 1765 key_parent = parent.key_parent 1766 else: 1767 break 1768 1769 genealogy.reverse() 1770 1771 # add self to the end 1772 genealogy.append(self) 1773 # cache the result 1774 self._genealogy = genealogy 1775 1776 # return a copy to prevent external modification 1777 return list(self._genealogy) 1778 1779 def get_children(self, update=False): 1780 """ 1781 Get children of this dataset 1782 1783 :param bool update: Update the list of children from database if True, else return cached value 1784 :return list: List of child datasets 1785 """ 1786 if self._children is not None and not update: 1787 return self._children 1788 1789 analyses = self.db.fetchall( 1790 "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", 1791 (self.key,), 1792 ) 1793 self._children = [ 1794 DataSet(data=analysis, db=self.db, modules=self.modules) 1795 for analysis in analyses 1796 ] 1797 return self._children 1798 1799 def get_all_children(self, recursive=True, update=True): 1800 """ 1801 Get all children of this dataset 1802 1803 Results are returned as a non-hierarchical list, i.e. the result does 1804 not reflect the actual dataset hierarchy (but all datasets in the 1805 result will have the original dataset as an ancestor somewhere) 1806 1807 :return list: List of DataSets 1808 """ 1809 children = self.get_children(update=update) 1810 results = children.copy() 1811 if recursive: 1812 for child in children: 1813 results += child.get_all_children(recursive=recursive, update=update) 1814 1815 return results 1816 1817 def nearest(self, type_filter): 1818 """ 1819 Return nearest dataset that matches the given type 1820 1821 Starting with this dataset, traverse the hierarchy upwards and return 1822 whichever dataset matches the given type. 1823 1824 :param str type_filter: Type filter. Can contain wildcards and is matched 1825 using `fnmatch.fnmatch`. 1826 :return: Earliest matching dataset, or `None` if none match. 1827 """ 1828 genealogy = self.get_genealogy() 1829 for dataset in reversed(genealogy): 1830 if fnmatch.fnmatch(dataset.type, type_filter): 1831 return dataset 1832 1833 return None 1834 1835 def get_breadcrumbs(self): 1836 """ 1837 Get breadcrumbs navlink for use in permalinks 1838 1839 Returns a string representing this dataset's genealogy that may be used 1840 to uniquely identify it. 1841 1842 :return str: Nav link 1843 """ 1844 if not self.key_parent: 1845 return self.key 1846 1847 genealogy = self.get_genealogy() 1848 return ",".join([d.key for d in genealogy]) 1849 1850 def get_compatible_processors(self, config=None): 1851 """ 1852 Get list of processors compatible with this dataset 1853 1854 Checks whether this dataset type is one that is listed as being accepted 1855 by the processor, for each known type: if the processor does not 1856 specify accepted types (via the `is_compatible_with` method), it is 1857 assumed it accepts any top-level datasets 1858 1859 :param ConfigManager|None config: Configuration reader to determine 1860 compatibility through. This may not be the same reader the dataset was 1861 instantiated with, e.g. when checking whether some other user should 1862 be able to run processors on this dataset. 1863 :return dict: Compatible processors, `name => class` mapping 1864 """ 1865 processors = self.modules.processors 1866 1867 available = {} 1868 for processor_type, processor in processors.items(): 1869 if processor.is_from_collector(): 1870 continue 1871 1872 own_processor = self.get_own_processor() 1873 if own_processor and own_processor.exclude_followup_processors( 1874 processor_type 1875 ): 1876 continue 1877 1878 # consider a processor compatible if its is_compatible_with 1879 # method returns True *or* if it has no explicit compatibility 1880 # check and this dataset is top-level (i.e. has no parent) 1881 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1882 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)): 1883 available[processor_type] = processor 1884 1885 return available 1886 1887 def get_place_in_queue(self, update=False): 1888 """ 1889 Determine dataset's position in queue 1890 1891 If the dataset is already finished, the position is -1. Else, the 1892 position is the number of datasets to be completed before this one will 1893 be processed. A position of 0 would mean that the dataset is currently 1894 being executed, or that the backend is not running. 1895 1896 :param bool update: Update the queue position from database if True, else return cached value 1897 :return int: Queue position 1898 """ 1899 if self.is_finished() or not self.data.get("job"): 1900 self._queue_position = -1 1901 return self._queue_position 1902 elif not update and self._queue_position is not None: 1903 # Use cached value 1904 return self._queue_position 1905 else: 1906 # Collect queue position from database via the job 1907 try: 1908 job = Job.get_by_ID(self.data["job"], self.db) 1909 self._queue_position = job.get_place_in_queue() 1910 except JobNotFoundException: 1911 self._queue_position = -1 1912 1913 return self._queue_position 1914 1915 def get_own_processor(self): 1916 """ 1917 Get the processor class that produced this dataset 1918 1919 :return: Processor class, or `None` if not available. 1920 """ 1921 processor_type = self.parameters.get("type", self.data.get("type")) 1922 1923 return self.modules.processors.get(processor_type) 1924 1925 def get_available_processors(self, config=None, exclude_hidden=False): 1926 """ 1927 Get list of processors that may be run for this dataset 1928 1929 Returns all compatible processors except for those that are already 1930 queued or finished and have no options. Processors that have been 1931 run but have options are included so they may be run again with a 1932 different configuration 1933 1934 :param ConfigManager|None config: Configuration reader to determine 1935 compatibility through. This may not be the same reader the dataset was 1936 instantiated with, e.g. when checking whether some other user should 1937 be able to run processors on this dataset. 1938 :param bool exclude_hidden: Exclude processors that should be displayed 1939 in the UI? If `False`, all processors are returned. 1940 1941 :return dict: Available processors, `name => properties` mapping 1942 """ 1943 if self.available_processors: 1944 # Update to reflect exclude_hidden parameter which may be different from last call 1945 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1946 return { 1947 processor_type: processor 1948 for processor_type, processor in self.available_processors.items() 1949 if not exclude_hidden or not processor.is_hidden 1950 } 1951 1952 processors = self.get_compatible_processors(config=config) 1953 1954 for analysis in self.get_children(update=True): 1955 if analysis.type not in processors: 1956 continue 1957 1958 if not processors[analysis.type].get_options(config=config): 1959 # No variable options; this processor has been run so remove 1960 del processors[analysis.type] 1961 continue 1962 1963 if exclude_hidden and processors[analysis.type].is_hidden: 1964 del processors[analysis.type] 1965 1966 self.available_processors = processors 1967 return processors 1968 1969 def link_job(self, job): 1970 """ 1971 Link this dataset to a job ID 1972 1973 Updates the dataset data to include a reference to the job that will be 1974 executing (or has already executed) this job. 1975 1976 Note that if no job can be found for this dataset, this method silently 1977 fails. 1978 1979 :param Job job: The job that will run this dataset 1980 1981 :todo: If the job column ever gets used, make sure it always contains 1982 a valid value, rather than silently failing this method. 1983 """ 1984 if type(job) is not Job: 1985 raise TypeError("link_job requires a Job object as its argument") 1986 1987 if "id" not in job.data: 1988 try: 1989 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1990 except JobNotFoundException: 1991 return 1992 1993 self.db.update( 1994 "datasets", where={"key": self.key}, data={"job": job.data["id"]} 1995 ) 1996 1997 def link_parent(self, key_parent): 1998 """ 1999 Set source_dataset key for this dataset 2000 2001 :param key_parent: Parent key. Not checked for validity 2002 """ 2003 self.db.update( 2004 "datasets", where={"key": self.key}, data={"key_parent": key_parent} 2005 ) 2006 # reset caches 2007 self.data["key_parent"] = key_parent 2008 self._genealogy = None 2009 2010 def get_parent(self): 2011 """ 2012 Get parent dataset 2013 2014 :return DataSet: Parent dataset, or `None` if not applicable 2015 """ 2016 return ( 2017 DataSet(key=self.key_parent, db=self.db, modules=self.modules) 2018 if self.key_parent 2019 else None 2020 ) 2021 2022 def detach(self): 2023 """ 2024 Makes the datasets standalone, i.e. not having any source_dataset dataset 2025 """ 2026 self.link_parent("") 2027 2028 def is_dataset(self): 2029 """ 2030 Easy way to confirm this is a dataset. 2031 Used for checking processor and dataset compatibility, 2032 which needs to handle both processors and datasets. 2033 """ 2034 return True 2035 2036 def is_top_dataset(self): 2037 """ 2038 Easy way to confirm this is a top dataset. 2039 Used for checking processor and dataset compatibility, 2040 which needs to handle both processors and datasets. 2041 """ 2042 if self.key_parent: 2043 return False 2044 return True 2045 2046 def is_expiring(self, config): 2047 """ 2048 Determine if dataset is set to expire 2049 2050 Similar to `is_expired`, but checks if the dataset will be deleted in 2051 the future, not if it should be deleted right now. 2052 2053 :param ConfigManager config: Configuration reader (context-aware) 2054 :return bool|int: `False`, or the expiration date as a Unix timestamp. 2055 """ 2056 # has someone opted out of deleting this? 2057 if self.parameters.get("keep"): 2058 return False 2059 2060 # is this dataset explicitly marked as expiring after a certain time? 2061 if self.parameters.get("expires-after"): 2062 return self.parameters.get("expires-after") 2063 2064 # is the data source configured to have its datasets expire? 2065 expiration = config.get("datasources.expiration", {}) 2066 if not expiration.get(self.parameters.get("datasource")): 2067 return False 2068 2069 # is there a timeout for this data source? 2070 if expiration.get(self.parameters.get("datasource")).get("timeout"): 2071 return self.timestamp + expiration.get( 2072 self.parameters.get("datasource") 2073 ).get("timeout") 2074 2075 return False 2076 2077 def is_expired(self, config): 2078 """ 2079 Determine if dataset should be deleted 2080 2081 Datasets can be set to expire, but when they should be deleted depends 2082 on a number of factor. This checks them all. 2083 2084 :param ConfigManager config: Configuration reader (context-aware) 2085 :return bool: 2086 """ 2087 # has someone opted out of deleting this? 2088 if not self.is_expiring(config): 2089 return False 2090 2091 # is this dataset explicitly marked as expiring after a certain time? 2092 future = ( 2093 time.time() + 3600 2094 ) # ensure we don't delete datasets with invalid expiration times 2095 if ( 2096 self.parameters.get("expires-after") 2097 and convert_to_int(self.parameters["expires-after"], future) < time.time() 2098 ): 2099 return True 2100 2101 # is the data source configured to have its datasets expire? 2102 expiration = config.get("datasources.expiration", {}) 2103 if not expiration.get(self.parameters.get("datasource")): 2104 return False 2105 2106 # is the dataset older than the set timeout? 2107 if expiration.get(self.parameters.get("datasource")).get("timeout"): 2108 return ( 2109 self.timestamp 2110 + expiration[self.parameters.get("datasource")]["timeout"] 2111 < time.time() 2112 ) 2113 2114 return False 2115 2116 def is_from_collector(self): 2117 """ 2118 Check if this dataset was made by a processor that collects data, i.e. 2119 a search or import worker. 2120 2121 :return bool: 2122 """ 2123 return self.type.endswith("-search") or self.type.endswith("-import") 2124 2125 def get_extension(self): 2126 """ 2127 Gets the file extension this dataset produces. 2128 Also checks whether the results file exists. 2129 Used for checking processor and dataset compatibility. 2130 2131 :return str extension: Extension, e.g. `csv` 2132 """ 2133 if self.get_results_path().exists(): 2134 return self.get_results_path().suffix[1:] 2135 2136 return False 2137 2138 def is_filter(self): 2139 """ 2140 Check whether a dataset is a filter dataset. 2141 2142 :return bool: True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown). 2143 """ 2144 own_processor = self.get_own_processor() 2145 if own_processor is None: 2146 # Deprecated datasets do not have a processor 2147 return None 2148 return own_processor.is_filter() 2149 2150 def get_media_type(self): 2151 """ 2152 Gets the media type of the dataset file. 2153 2154 :return str: media type, e.g., "text" 2155 """ 2156 own_processor = self.get_own_processor() 2157 if hasattr(self, "media_type"): 2158 # media type can be defined explicitly in the dataset; this is the priority 2159 return self.media_type 2160 elif own_processor is not None: 2161 # or media type can be defined in the processor 2162 # some processors can set different media types for different datasets (e.g., import_media) 2163 if hasattr(own_processor, "media_type"): 2164 return own_processor.media_type 2165 2166 # Default to text 2167 return self.parameters.get("media_type", "text") 2168 2169 def get_metadata(self): 2170 """ 2171 Get dataset metadata 2172 2173 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 2174 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 2175 update its database (and ensure compatibility with the exporting version of 4CAT). 2176 """ 2177 metadata = self.db.fetchone( 2178 "SELECT * FROM datasets WHERE key = %s", (self.key,) 2179 ) 2180 2181 # get 4CAT version (presumably to ensure export is compatible with import) 2182 metadata["current_4CAT_version"] = get_software_version() 2183 return metadata 2184 2185 def get_result_url(self): 2186 """ 2187 Gets the 4CAT frontend URL of a dataset file. 2188 2189 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 2190 SERVER_HTTPS) plus hardcoded '/result/'. 2191 TODO: create more dynamic method of obtaining url. 2192 """ 2193 filename = self.get_results_path().name 2194 2195 # we cheat a little here by using the modules' config reader, but these 2196 # will never be context-dependent values anyway 2197 url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \ 2198 self.modules.config.get("flask.server_name") + '/result/' + filename 2199 return url_to_file 2200 2201 def warn_unmappable_item( 2202 self, item_count, processor=None, error_message=None, warn_admins=True 2203 ): 2204 """ 2205 Log an item that is unable to be mapped and warn administrators. 2206 2207 :param int item_count: Item index 2208 :param Processor processor: Processor calling function8 2209 """ 2210 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 2211 2212 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 2213 closest_dataset = ( 2214 processor.dataset 2215 if processor is not None and processor.dataset is not None 2216 else self 2217 ) 2218 # Log error to dataset log 2219 closest_dataset.log(dataset_error_message) 2220 2221 if warn_admins: 2222 if processor is not None: 2223 processor.log.warning( 2224 f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}." 2225 ) 2226 elif hasattr(self.db, "log"): 2227 # borrow the database's log handler 2228 self.db.log.warning( 2229 f"Unable to map item all items for dataset {closest_dataset.key}." 2230 ) 2231 else: 2232 # No other log available 2233 raise DataSetException( 2234 f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn" 2235 ) 2236 2237 # Annotation functions (most of it is handled in Annotations) 2238 def has_annotations(self) -> bool: 2239 """ 2240 Whether this dataset has annotations 2241 """ 2242 2243 annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,)) 2244 2245 return True if annotation else False 2246 2247 def num_annotations(self) -> int: 2248 """ 2249 Get the amount of annotations 2250 """ 2251 return self.db.fetchone( 2252 "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,) 2253 )["count"] 2254 2255 def get_annotation(self, data: dict): 2256 """ 2257 Retrieves a specific annotation if it exists. 2258 2259 :param data: A dictionary with which to get the annotations from. 2260 To get specific annotations, include either an `id` field or 2261 `field_id` and `item_id` fields. 2262 2263 return Annotation: Annotation object. 2264 """ 2265 2266 if "id" not in data or ("field_id" not in data and "item_id" not in data): 2267 return None 2268 2269 if "dataset" not in data: 2270 data["dataset"] = self.key 2271 2272 return Annotation(data=data, db=self.db) 2273 2274 def get_annotations(self) -> list: 2275 """ 2276 Retrieves all annotations for this dataset. 2277 2278 return list: List of Annotation objects. 2279 """ 2280 2281 return Annotation.get_annotations_for_dataset(self.db, self.key) 2282 2283 def get_annotations_for_item(self, item_id: str | list, before=0) -> list: 2284 """ 2285 Retrieves all annotations from this dataset for a specific item (e.g. social media post). 2286 :param str item_id: The ID of the annotation item 2287 :param int before: The upper timestamp range for annotations. 2288 """ 2289 return Annotation.get_annotations_for_dataset( 2290 self.db, self.key, item_id=item_id, before=before 2291 ) 2292 2293 def has_annotation_fields(self) -> bool: 2294 """ 2295 Returns True if there's annotation fields saved tot the dataset table 2296 Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.). 2297 """ 2298 2299 return True if self.annotation_fields else False 2300 2301 def get_annotation_field_labels(self) -> list: 2302 """ 2303 Retrieves the saved annotation field labels for this dataset. 2304 These are stored in the annotations table. 2305 2306 :return list: List of annotation field labels. 2307 """ 2308 2309 annotation_fields = self.annotation_fields 2310 2311 if not annotation_fields: 2312 return [] 2313 2314 labels = [v["label"] for v in annotation_fields.values()] 2315 2316 return labels 2317 2318 def save_annotations(self, annotations: list) -> int: 2319 """ 2320 Takes a list of annotations and saves them to the annotations table. 2321 If a field is not yet present in the `annotation_fields` column in 2322 the datasets table, it also adds it there. 2323 2324 :param list annotations: List of dictionaries with annotation items. Must have `item_id`, `field_id`, 2325 and `label`. 2326 `item_id` is for the specific item being annotated (e.g. a social media post) 2327 `field_id` refers to the annotation field. 2328 `label` is a human-readable description of this annotation. 2329 E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", 2330 "value": "Yes"}] 2331 2332 :returns int: How many annotations were saved. 2333 2334 """ 2335 2336 if not annotations: 2337 return 0 2338 2339 count = 0 2340 annotation_fields = self.annotation_fields 2341 2342 # Add some dataset data to annotations, if not present 2343 for annotation_data in annotations: 2344 # Check if the required fields are present 2345 if not annotation_data.get("item_id"): 2346 raise AnnotationException( 2347 "Can't save annotations; annotation must have an `item_id` referencing " 2348 "the item it annotated, got %s" % annotation_data 2349 ) 2350 if not annotation_data.get("field_id"): 2351 raise AnnotationException( 2352 "Can't save annotations; annotation must have a `field_id` field, " 2353 "got %s" % annotation_data 2354 ) 2355 if not annotation_data.get("label") or not isinstance( 2356 annotation_data["label"], str 2357 ): 2358 raise AnnotationException( 2359 "Can't save annotations; annotation must have a `label` field, " 2360 "got %s" % annotation_data 2361 ) 2362 2363 # Set dataset key 2364 if not annotation_data.get("dataset"): 2365 annotation_data["dataset"] = self.key 2366 2367 # Set default author to this dataset owner 2368 # If this annotation is made by a processor, it will have the processor name 2369 if not annotation_data.get("author"): 2370 annotation_data["author"] = self.get_owners()[0] 2371 2372 # Create Annotation object, which also saves it to the database 2373 # If this dataset/item_id/field_id combination already exists, this retrieves the 2374 # existing data and updates it with new values. 2375 Annotation(data=annotation_data, db=self.db) 2376 count += 1 2377 2378 # Save annotation fields if things changed 2379 if annotation_fields != self.annotation_fields: 2380 self.save_annotation_fields(annotation_fields) 2381 2382 return count 2383 2384 def save_annotation_fields(self, new_fields: dict, add=False) -> int: 2385 """ 2386 Save annotation field data to the datasets table (in the `annotation_fields` column). 2387 If changes to the annotation fields affect existing annotations, 2388 this function will also call `update_annotations_via_fields()` to change them. 2389 2390 :param dict new_fields: New annotation fields, with a field ID as key. 2391 2392 :param bool add: Whether we're merely adding new fields 2393 or replacing the whole batch. If add is False, 2394 `new_fields` should contain all fields. 2395 2396 :return int: The number of annotation fields saved. 2397 2398 """ 2399 2400 # Get existing annotation fields to see if stuff changed. 2401 old_fields = self.annotation_fields 2402 changes = False 2403 2404 # Annotation field must be valid JSON. 2405 try: 2406 json.dumps(new_fields) 2407 except ValueError: 2408 raise AnnotationException( 2409 "Can't save annotation fields: not valid JSON (%s)" % new_fields 2410 ) 2411 2412 # No duplicate IDs 2413 if len(new_fields) != len(set(new_fields)): 2414 raise AnnotationException( 2415 "Can't save annotation fields: field IDs must be unique" 2416 ) 2417 2418 # Annotation fields must at minimum have `type` and `label` keys. 2419 for field_id, annotation_field in new_fields.items(): 2420 if not isinstance(field_id, str): 2421 raise AnnotationException( 2422 "Can't save annotation fields: field ID %s is not a valid string" 2423 % field_id 2424 ) 2425 if "label" not in annotation_field: 2426 raise AnnotationException( 2427 "Can't save annotation fields: all fields must have a label" 2428 % field_id 2429 ) 2430 if "type" not in annotation_field: 2431 raise AnnotationException( 2432 "Can't save annotation fields: all fields must have a type" 2433 % field_id 2434 ) 2435 2436 # Check if fields are removed 2437 if not add and old_fields: 2438 for field_id in old_fields.keys(): 2439 if field_id not in new_fields: 2440 changes = True 2441 2442 # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin 2443 # dataset 2444 for field_id in new_fields.keys(): 2445 if field_id in old_fields and old_fields[field_id].get("from_dataset"): 2446 old_fields[field_id]["label"] = new_fields[field_id][ 2447 "label" 2448 ] # Only labels could've been changed 2449 new_fields[field_id] = old_fields[field_id] 2450 2451 # If we're just adding fields, add them to the old fields. 2452 # If the field already exists, overwrite the old field. 2453 if add and old_fields: 2454 all_fields = old_fields 2455 for field_id, annotation_field in new_fields.items(): 2456 all_fields[field_id] = annotation_field 2457 new_fields = all_fields 2458 2459 # We're saving the new annotation fields as-is. 2460 # Ordering of fields is preserved this way. 2461 self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)}) 2462 self.annotation_fields = new_fields 2463 2464 # If anything changed with the annotation fields, possibly update 2465 # existing annotations (e.g. to delete them or change their labels). 2466 if changes: 2467 Annotation.update_annotations_via_fields( 2468 self.key, old_fields, new_fields, self.db 2469 ) 2470 2471 return len(new_fields) 2472 2473 def get_annotation_metadata(self) -> dict: 2474 """ 2475 Retrieves all the data for this dataset from the annotations table. 2476 """ 2477 2478 annotation_data = self.db.fetchall( 2479 "SELECT * FROM annotations WHERE dataset = '%s';" % self.key 2480 ) 2481 return annotation_data 2482 2483 def __getattr__(self, attr): 2484 """ 2485 Getter so we don't have to use .data all the time 2486 2487 :param attr: Data key to get 2488 :return: Value 2489 """ 2490 2491 if attr in dir(self): 2492 # an explicitly defined attribute should always be called in favour 2493 # of this passthrough 2494 attribute = getattr(self, attr) 2495 return attribute 2496 elif attr in self.data: 2497 return self.data[attr] 2498 else: 2499 raise AttributeError("DataSet instance has no attribute %s" % attr) 2500 2501 def __setattr__(self, attr, value): 2502 """ 2503 Setter so we can flexibly update the database 2504 2505 Also updates internal data stores (.data etc). If the attribute is 2506 unknown, it is stored within the 'parameters' attribute. 2507 2508 :param str attr: Attribute to update 2509 :param value: New value 2510 """ 2511 2512 # don't override behaviour for *actual* class attributes 2513 if attr in dir(self): 2514 super().__setattr__(attr, value) 2515 return 2516 2517 if attr not in self.data: 2518 self.parameters[attr] = value 2519 attr = "parameters" 2520 value = self.parameters 2521 2522 if attr == "parameters": 2523 value = json.dumps(value) 2524 2525 self.db.update("datasets", where={"key": self.key}, data={attr: value}) 2526 2527 self.data[attr] = value 2528 2529 if attr == "parameters": 2530 self.parameters = json.loads(value)
Provide interface to safely register and run operations on a dataset
A dataset is a collection of:
- A unique identifier
- A set of parameters that demarcate the data contained within
- The data
The data is usually stored in a file on the disk; the parameters are stored in a database. The handling of the data, et cetera, is done by other workers; this class defines method to create and manipulate the dataset's properties.
63 def __init__( 64 self, 65 parameters=None, 66 key=None, 67 job=None, 68 data=None, 69 db=None, 70 parent="", 71 extension=None, 72 type=None, 73 is_private=True, 74 owner="anonymous", 75 modules=None 76 ): 77 """ 78 Create new dataset object 79 80 If the dataset is not in the database yet, it is added. 81 82 :param dict parameters: Only when creating a new dataset. Dataset 83 parameters, free-form dictionary. 84 :param str key: Dataset key. If given, dataset with this key is loaded. 85 :param int job: Job ID. If given, dataset corresponding to job is 86 loaded. 87 :param dict data: Dataset data, corresponding to a row in the datasets 88 database table. If not given, retrieved from database depending on key. 89 :param db: Database connection 90 :param str parent: Only when creating a new dataset. Parent dataset 91 key to which the one being created is a child. 92 :param str extension: Only when creating a new dataset. Extension of 93 dataset result file. 94 :param str type: Only when creating a new dataset. Type of the dataset, 95 corresponding to the type property of a processor class. 96 :param bool is_private: Only when creating a new dataset. Whether the 97 dataset is private or public. 98 :param str owner: Only when creating a new dataset. The user name of 99 the dataset's creator. 100 :param modules: Module cache. If not given, will be loaded when needed 101 (expensive). Used to figure out what processors are compatible with 102 this dataset. 103 """ 104 self.db = db 105 106 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 107 self.data = {} 108 self.parameters = {} 109 self.available_processors = {} 110 self._genealogy = None 111 self.disposable_files = [] 112 self.modules = modules 113 114 if key is not None: 115 self.key = key 116 current = self.db.fetchone( 117 "SELECT * FROM datasets WHERE key = %s", (self.key,) 118 ) 119 if not current: 120 raise DataSetNotFoundException( 121 "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" 122 % key 123 ) 124 125 elif job is not None: 126 current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),)) 127 if not current: 128 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 129 130 self.key = current["key"] 131 elif data is not None: 132 current = data 133 if ( 134 "query" not in data 135 or "key" not in data 136 or "parameters" not in data 137 or "key_parent" not in data 138 ): 139 raise DataSetException( 140 "DataSet() requires a complete dataset record for its 'data' argument" 141 ) 142 143 self.key = current["key"] 144 else: 145 if parameters is None: 146 raise DataSetException( 147 "DataSet() requires either 'key', or 'parameters' to be given" 148 ) 149 150 if not type: 151 raise DataSetException("Datasets must have their type set explicitly") 152 153 query = self.get_label(parameters, default=type) 154 self.key = self.get_key(query, parameters, parent) 155 current = self.db.fetchone( 156 "SELECT * FROM datasets WHERE key = %s AND query = %s", 157 (self.key, query), 158 ) 159 160 if current: 161 self.data = current 162 self.parameters = json.loads(self.data["parameters"]) 163 self.annotation_fields = json.loads(self.data["annotation_fields"]) \ 164 if self.data.get("annotation_fields") else {} 165 self.is_new = False 166 else: 167 self.data = {"type": type} # get_own_processor needs this 168 own_processor = self.get_own_processor() 169 version = get_software_commit(own_processor) 170 self.data = { 171 "key": self.key, 172 "query": self.get_label(parameters, default=type), 173 "parameters": json.dumps(parameters), 174 "result_file": "", 175 "creator": owner, 176 "status": "", 177 "type": type, 178 "timestamp": int(time.time()), 179 "is_finished": False, 180 "is_private": is_private, 181 "software_version": version[0], 182 "software_source": version[1], 183 "software_file": "", 184 "num_rows": 0, 185 "progress": 0.0, 186 "key_parent": parent, 187 "annotation_fields": "{}" 188 } 189 self.parameters = parameters 190 self.annotation_fields = {} 191 192 self.db.insert("datasets", data=self.data) 193 self.refresh_owners() 194 self.add_owner(owner) 195 196 # Find desired extension from processor if not explicitly set 197 if extension is None: 198 if own_processor: 199 extension = own_processor.get_extension( 200 parent_dataset=DataSet(key=parent, db=db, modules=self.modules) 201 if parent 202 else None 203 ) 204 # Still no extension, default to 'csv' 205 if not extension: 206 extension = "csv" 207 208 # Reserve filename and update data['result_file'] 209 self.reserve_result_file(parameters, extension) 210 211 self.refresh_owners()
Create new dataset object
If the dataset is not in the database yet, it is added.
Parameters
- dict parameters: Only when creating a new dataset. Dataset parameters, free-form dictionary.
- str key: Dataset key. If given, dataset with this key is loaded.
- int job: Job ID. If given, dataset corresponding to job is loaded.
- dict data: Dataset data, corresponding to a row in the datasets database table. If not given, retrieved from database depending on key.
- db: Database connection
- str parent: Only when creating a new dataset. Parent dataset key to which the one being created is a child.
- str extension: Only when creating a new dataset. Extension of dataset result file.
- str type: Only when creating a new dataset. Type of the dataset, corresponding to the type property of a processor class.
- bool is_private: Only when creating a new dataset. Whether the dataset is private or public.
- str owner: Only when creating a new dataset. The user name of the dataset's creator.
- modules: Module cache. If not given, will be loaded when needed (expensive). Used to figure out what processors are compatible with this dataset.
213 def check_dataset_finished(self): 214 """ 215 Checks if dataset is finished. Returns path to results file is not empty, 216 or 'empty_file' when there were not matches. 217 218 Only returns a path if the dataset is complete. In other words, if this 219 method returns a path, a file with the complete results for this dataset 220 will exist at that location. 221 222 :return: A path to the results file, 'empty_file', or `None` 223 """ 224 if self.data["is_finished"] and self.data["num_rows"] > 0: 225 return self.get_results_path() 226 elif self.data["is_finished"] and self.data["num_rows"] == 0: 227 return 'empty' 228 else: 229 return None
Checks if dataset is finished. Returns path to results file is not empty, or 'empty_file' when there were not matches.
Only returns a path if the dataset is complete. In other words, if this method returns a path, a file with the complete results for this dataset will exist at that location.
Returns
A path to the results file, 'empty_file', or
None
231 def get_results_path(self): 232 """ 233 Get path to results file 234 235 Always returns a path, that will at some point contain the dataset 236 data, but may not do so yet. Use this to get the location to write 237 generated results to. 238 239 :return Path: A path to the results file 240 """ 241 # alas we need to instantiate a config reader here - no way around it 242 if not self.folder: 243 self.folder = self.modules.config.get('PATH_DATA') 244 return self.folder.joinpath(self.data["result_file"])
Get path to results file
Always returns a path, that will at some point contain the dataset data, but may not do so yet. Use this to get the location to write generated results to.
Returns
A path to the results file
246 def get_results_folder_path(self): 247 """ 248 Get path to folder containing accompanying results 249 250 Returns a path that may not yet be created 251 252 :return Path: A path to the results file 253 """ 254 return self.get_results_path().parent.joinpath("folder_" + self.key)
Get path to folder containing accompanying results
Returns a path that may not yet be created
Returns
A path to the results file
256 def get_log_path(self): 257 """ 258 Get path to dataset log file 259 260 Each dataset has a single log file that documents its creation. This 261 method returns the path to that file. It is identical to the path of 262 the dataset result file, with 'log' as its extension instead. 263 264 :return Path: A path to the log file 265 """ 266 return self.get_results_path().with_suffix(".log")
Get path to dataset log file
Each dataset has a single log file that documents its creation. This method returns the path to that file. It is identical to the path of the dataset result file, with 'log' as its extension instead.
Returns
A path to the log file
268 def clear_log(self): 269 """ 270 Clears the dataset log file 271 272 If the log file does not exist, it is created empty. The log file will 273 have the same file name as the dataset result file, with the 'log' 274 extension. 275 """ 276 log_path = self.get_log_path() 277 with log_path.open("w"): 278 pass
Clears the dataset log file
If the log file does not exist, it is created empty. The log file will have the same file name as the dataset result file, with the 'log' extension.
280 def log(self, log): 281 """ 282 Write log message to file 283 284 Writes the log message to the log file on a new line, including a 285 timestamp at the start of the line. Note that this assumes the log file 286 already exists - it should have been created/cleared with clear_log() 287 prior to calling this. 288 289 :param str log: Log message to write 290 """ 291 log_path = self.get_log_path() 292 with log_path.open("a", encoding="utf-8") as outfile: 293 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
Write log message to file
Writes the log message to the log file on a new line, including a timestamp at the start of the line. Note that this assumes the log file already exists - it should have been created/cleared with clear_log() prior to calling this.
Parameters
- str log: Log message to write
449 def iterate_items( 450 self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None, 451 offset=0, *args, **kwargs 452 ): 453 """ 454 Generate mapped dataset items 455 456 Wrapper for _iterate_items that returns a DatasetItem, which can be 457 accessed as a dict returning the original item or (if a mapper is 458 available) the mapped item. Mapped or original versions of the item can 459 also be accessed via the `original` and `mapped_object` properties of 460 the DatasetItem. 461 462 Processors can define a method called `map_item` that can be used to map 463 an item from the dataset file before it is processed any further. This is 464 slower than storing the data file in the right format to begin with but 465 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 466 are nested objects when retrieved from the twitter API that are easier 467 to store as a JSON file than as a flat CSV file, and it would be a shame 468 to throw away that data. 469 470 Note the two parameters warn_unmappable and map_missing. Items can be 471 unmappable in that their structure is too different to coerce into a 472 neat dictionary of the structure the data source expects. This makes it 473 'unmappable' and warn_unmappable determines what happens in this case. 474 It can also be of the right structure, but with some fields missing or 475 incomplete. map_missing determines what happens in that case. The 476 latter is for example possible when importing data via Zeeschuimer, 477 which produces unstably-structured data captured from social media 478 sites. 479 480 :param BasicProcessor processor: A reference to the processor 481 iterating the dataset. 482 :param bool warn_unmappable: If an item is not mappable, skip the item 483 and log a warning 484 :param max_unmappable: Skip at most this many unmappable items; if 485 more are encountered, stop iterating. `None` to never stop. 486 :param map_missing: Indicates what to do with mapped items for which 487 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 488 - 'default': fill missing fields with the default passed by map_item 489 - 'abort': raise a MappedItemIncompleteException if a field is missing 490 - a callback: replace missing field with the return value of the 491 callback. The MappedItem object is passed to the callback as the 492 first argument and the name of the missing field as the second. 493 - a dictionary with a key for each possible missing field: replace missing 494 field with a strategy for that field ('default', 'abort', or a callback) 495 :param get_annotations: Whether to also fetch annotations from the database. 496 This can be disabled to help speed up iteration. 497 :param offset: After how many rows we should yield items. 498 :param bool immediately_delete: Only used when iterating a file 499 archive. Defaults to `True`, if set to `False`, files are not deleted 500 from the staging area after the iteration, so they can be re-used. 501 :param staging_area: Only used when iterating a file archive. Where to 502 store the files while they're being worked with. If omitted, a 503 temporary folder is created and marked for deletion after all files 504 have been yielded. 505 :param list filename_filter: Only used when iterating a file archive. 506 Whitelist of filenames to iterate, others are skipped. If empty, do 507 not filter. 508 :return generator: A generator that yields DatasetItems 509 """ 510 unmapped_items = 0 511 512 # Collect item_mapper for use with filter 513 item_mapper = False 514 own_processor = self.get_own_processor() 515 if own_processor and own_processor.map_item_method_available(dataset=self): 516 item_mapper = True 517 518 # Annotations are dynamically added, and we're handling them as 'extra' map_item fields. 519 # If we're getting annotations, we're caching items so we don't need to retrieve annotations one-by-one. 520 get_annotations = True if self.annotation_fields and get_annotations else False 521 if get_annotations: 522 annotation_fields = self.annotation_fields.copy() 523 item_batch_size = 500 524 dataset_item_cache = [] 525 annotations_before = int(time.time()) 526 527 # Append a number to annotation labels if there's duplicate ones 528 annotation_labels = {} 529 for (annotation_field_id, annotation_field_items,) in annotation_fields.items(): 530 unique_label = annotation_field_items["label"] 531 counter = 1 532 while unique_label in annotation_labels.values(): 533 counter += 1 534 unique_label = f"{annotation_field_items['label']}_{counter}" 535 annotation_labels[annotation_field_id] = unique_label 536 537 # missing field strategy can be for all fields at once, or per field 538 # if it is per field, it is a dictionary with field names and their strategy 539 # if it is for all fields, it may be a callback, 'abort', or 'default' 540 default_strategy = "default" 541 if type(map_missing) is not dict: 542 default_strategy = map_missing 543 map_missing = {} 544 545 iterator = self._iterate_items if self.get_extension() != "zip" else self._iterate_archive_contents 546 547 # Loop through items 548 for i, item in enumerate(iterator(processor=processor, offset=offset, *args, **kwargs)): 549 # Save original to yield 550 original_item = item.copy() 551 552 # Map item 553 if item_mapper: 554 try: 555 mapped_item = own_processor.get_mapped_item(item) 556 except MapItemException as e: 557 if warn_unmappable: 558 self.warn_unmappable_item( 559 i, processor, e, warn_admins=unmapped_items is False 560 ) 561 562 unmapped_items += 1 563 if max_unmappable and unmapped_items > max_unmappable: 564 break 565 else: 566 continue 567 568 # check if fields have been marked as 'missing' in the 569 # underlying data, and treat according to the chosen strategy 570 if mapped_item.get_missing_fields(): 571 for missing_field in mapped_item.get_missing_fields(): 572 strategy = map_missing.get(missing_field, default_strategy) 573 574 if callable(strategy): 575 # delegate handling to a callback 576 mapped_item.data[missing_field] = strategy( 577 mapped_item.data, missing_field 578 ) 579 elif strategy == "abort": 580 # raise an exception to be handled at the processor level 581 raise MappedItemIncompleteException( 582 f"Cannot process item, field {missing_field} missing in source data." 583 ) 584 elif strategy == "default": 585 # use whatever was passed to the object constructor 586 mapped_item.data[missing_field] = mapped_item.data[ 587 missing_field 588 ].value 589 else: 590 raise ValueError( 591 "map_missing must be 'abort', 'default', or a callback." 592 ) 593 else: 594 mapped_item = original_item 595 596 # yield a DatasetItem, which is a dict with some special properties 597 dataset_item = DatasetItem( 598 mapper=item_mapper, 599 original=original_item, 600 mapped_object=mapped_item, 601 data_file=original_item["path"] if "path" in original_item and issubclass(type(original_item["path"]), os.PathLike) else None, 602 **( 603 mapped_item.get_item_data() 604 if type(mapped_item) is MappedItem 605 else mapped_item 606 ), 607 ) 608 609 # If we're getting annotations, yield in items batches so we don't need to get annotations per item. 610 if get_annotations: 611 dataset_item_cache.append(dataset_item) 612 613 # When we reach the batch limit or the end of the dataset, 614 # get the annotations for cached items and yield the entire thing. 615 if len(dataset_item_cache) >= item_batch_size or i == (self.num_rows - 1): 616 617 item_ids = [dataset_item.get("id") for dataset_item in dataset_item_cache] 618 619 # Dict with item ids for fast lookup 620 annotations_dict = collections.defaultdict(dict) 621 annotations = self.get_annotations_for_item(item_ids, before=annotations_before) 622 for item_annotation in annotations: 623 item_id = item_annotation.item_id 624 if item_annotation: 625 annotations_dict[item_id][item_annotation.field_id] = item_annotation.value 626 627 # Process each dataset item 628 for dataset_item in dataset_item_cache: 629 item_id = dataset_item.get("id") 630 item_annotations = annotations_dict.get(item_id, {}) 631 632 for annotation_field_id, annotation_field_items in annotation_fields.items(): 633 # Get annotation value 634 value = item_annotations.get(annotation_field_id, "") 635 636 # Convert list to string if needed 637 if isinstance(value, list): 638 value = ",".join(value) 639 elif value != "": 640 value = str(value) # Ensure string type 641 else: 642 value = "" 643 644 dataset_item[annotation_labels[annotation_field_id]] = value 645 646 yield dataset_item 647 648 dataset_item_cache = [] 649 650 else: 651 yield dataset_item
Generate mapped dataset items
Wrapper for _iterate_items that returns a DatasetItem, which can be
accessed as a dict returning the original item or (if a mapper is
available) the mapped item. Mapped or original versions of the item can
also be accessed via the original and mapped_object properties of
the DatasetItem.
Processors can define a method called map_item that can be used to map
an item from the dataset file before it is processed any further. This is
slower than storing the data file in the right format to begin with but
not all data sources allow for easy 'flat' mapping of items, e.g. tweets
are nested objects when retrieved from the twitter API that are easier
to store as a JSON file than as a flat CSV file, and it would be a shame
to throw away that data.
Note the two parameters warn_unmappable and map_missing. Items can be unmappable in that their structure is too different to coerce into a neat dictionary of the structure the data source expects. This makes it 'unmappable' and warn_unmappable determines what happens in this case. It can also be of the right structure, but with some fields missing or incomplete. map_missing determines what happens in that case. The latter is for example possible when importing data via Zeeschuimer, which produces unstably-structured data captured from social media sites.
Parameters
- BasicProcessor processor: A reference to the processor iterating the dataset.
- bool warn_unmappable: If an item is not mappable, skip the item and log a warning
- max_unmappable: Skip at most this many unmappable items; if
more are encountered, stop iterating.
Noneto never stop. - map_missing: Indicates what to do with mapped items for which
some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
- 'default': fill missing fields with the default passed by map_item
- 'abort': raise a MappedItemIncompleteException if a field is missing
- a callback: replace missing field with the return value of the callback. The MappedItem object is passed to the callback as the first argument and the name of the missing field as the second.
- a dictionary with a key for each possible missing field: replace missing field with a strategy for that field ('default', 'abort', or a callback)
- get_annotations: Whether to also fetch annotations from the database. This can be disabled to help speed up iteration.
- offset: After how many rows we should yield items.
- bool immediately_delete: Only used when iterating a file
archive. Defaults to
True, if set toFalse, files are not deleted from the staging area after the iteration, so they can be re-used. - staging_area: Only used when iterating a file archive. Where to store the files while they're being worked with. If omitted, a temporary folder is created and marked for deletion after all files have been yielded.
- list filename_filter: Only used when iterating a file archive. Whitelist of filenames to iterate, others are skipped. If empty, do not filter.
Returns
A generator that yields DatasetItems
654 def sort_and_iterate_items( 655 self, sort="", reverse=False, chunk_size=50000, **kwargs 656 ) -> dict: 657 """ 658 Loop through items in a dataset, sorted by a given key. 659 660 This is a wrapper function for `iterate_items()` with the 661 added functionality of sorting a dataset. 662 663 :param sort: The item key that determines the sort order. 664 :param reverse: Whether to sort by largest values first. 665 :param chunk_size: How many items to write 666 667 :returns dict: Yields iterated post 668 """ 669 670 def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False): 671 """ 672 Sort items based on the given key and order. 673 674 :param items_to_sort: The items to sort 675 :param sort_key: The key to sort by 676 :param reverse: Whether to sort in reverse order 677 :return: Sorted items 678 """ 679 if reverse is False and (sort_key == "dataset-order" or sort_key == ""): 680 # Sort by dataset order 681 yield from items_to_sort 682 elif sort_key == "dataset-order" and reverse: 683 # Sort by dataset order in reverse 684 yield from reversed(list(items_to_sort)) 685 else: 686 # Sort on the basis of a column value 687 if not convert_sort_to_float: 688 yield from sorted( 689 items_to_sort, 690 key=lambda x: x.get(sort_key, ""), 691 reverse=reverse, 692 ) 693 else: 694 # Dataset fields can contain integers and empty strings. 695 # Since these cannot be compared, we will convert every 696 # empty string to 0. 697 yield from sorted( 698 items_to_sort, 699 key=lambda x: convert_to_float(x.get(sort_key, ""), force=True), 700 reverse=reverse, 701 ) 702 703 if self.num_rows < chunk_size: 704 try: 705 # First try to force-sort float values. If this doesn't work, it'll be alphabetical. 706 yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True) 707 except (TypeError, ValueError): 708 yield from sort_items( 709 self.iterate_items(**kwargs), 710 sort, 711 reverse, 712 convert_sort_to_float=False 713 ) 714 715 else: 716 # For large datasets, we will use chunk sorting 717 staging_area = self.get_staging_area() 718 buffer = [] 719 chunk_files = [] 720 convert_sort_to_float = True 721 fieldnames = self.get_columns() 722 723 def write_chunk(buffer, chunk_index): 724 """ 725 Write a chunk of data to a temporary file 726 727 :param buffer: The buffer containing the chunk of data 728 :param chunk_index: The index of the chunk 729 :return: The path to the temporary file 730 """ 731 temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv") 732 with temp_file.open("w", encoding="utf-8") as chunk_file: 733 writer = csv.DictWriter(chunk_file, fieldnames=fieldnames) 734 writer.writeheader() 735 writer.writerows(buffer) 736 return temp_file 737 738 # Divide the dataset into sorted chunks 739 for item in self.iterate_items(**kwargs): 740 buffer.append(item) 741 if len(buffer) >= chunk_size: 742 try: 743 buffer = list( 744 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 745 ) 746 except (TypeError, ValueError): 747 convert_sort_to_float = False 748 buffer = list( 749 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 750 ) 751 752 chunk_files.append(write_chunk(buffer, len(chunk_files))) 753 buffer.clear() 754 755 # Sort and write any remaining items in the buffer 756 if buffer: 757 buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float)) 758 chunk_files.append(write_chunk(buffer, len(chunk_files))) 759 buffer.clear() 760 761 # Merge sorted chunks into the final sorted file 762 sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv") 763 with sorted_file.open("w", encoding="utf-8") as outfile: 764 writer = csv.DictWriter(outfile, fieldnames=self.get_columns()) 765 writer.writeheader() 766 767 # Open all chunk files for reading 768 chunk_readers = [ 769 csv.DictReader(chunk.open("r", encoding="utf-8")) 770 for chunk in chunk_files 771 ] 772 heap = [] 773 774 # Initialize the heap with the first row from each chunk 775 for i, reader in enumerate(chunk_readers): 776 try: 777 row = next(reader) 778 if sort == "dataset-order" and reverse: 779 # Use a reverse index for "dataset-order" and reverse=True 780 sort_key = -i 781 elif convert_sort_to_float: 782 # Negate numeric keys for reverse sorting 783 sort_key = ( 784 -convert_to_float(row.get(sort, "")) 785 if reverse 786 else convert_to_float(row.get(sort, "")) 787 ) 788 else: 789 if reverse: 790 # For reverse string sorting, invert string comparison by creating a tuple 791 # with an inverted string - this makes Python's tuple comparison work in reverse 792 sort_key = ( 793 tuple(-ord(c) for c in row.get(sort, "")), 794 -i, 795 ) 796 else: 797 sort_key = (row.get(sort, ""), i) 798 heap.append((sort_key, i, row)) 799 except StopIteration: 800 pass 801 802 # Use a heap to merge sorted chunks 803 import heapq 804 805 heapq.heapify(heap) 806 while heap: 807 _, chunk_index, smallest_row = heapq.heappop(heap) 808 writer.writerow(smallest_row) 809 try: 810 next_row = next(chunk_readers[chunk_index]) 811 if sort == "dataset-order" and reverse: 812 # Use a reverse index for "dataset-order" and reverse=True 813 sort_key = -chunk_index 814 elif convert_sort_to_float: 815 sort_key = ( 816 -convert_to_float(next_row.get(sort, "")) 817 if reverse 818 else convert_to_float(next_row.get(sort, "")) 819 ) 820 else: 821 # Use the same inverted comparison for string values 822 if reverse: 823 sort_key = ( 824 tuple(-ord(c) for c in next_row.get(sort, "")), 825 -chunk_index, 826 ) 827 else: 828 sort_key = (next_row.get(sort, ""), chunk_index) 829 heapq.heappush(heap, (sort_key, chunk_index, next_row)) 830 except StopIteration: 831 pass 832 833 # Read the sorted file and yield each item 834 with sorted_file.open("r", encoding="utf-8") as infile: 835 reader = csv.DictReader(infile) 836 for item in reader: 837 yield item 838 839 # Remove the temporary files 840 if staging_area.is_dir(): 841 shutil.rmtree(staging_area)
Loop through items in a dataset, sorted by a given key.
This is a wrapper function for iterate_items() with the
added functionality of sorting a dataset.
Parameters
- sort: The item key that determines the sort order.
- reverse: Whether to sort by largest values first.
- chunk_size: How many items to write
:returns dict: Yields iterated post
843 def get_staging_area(self): 844 """ 845 Get path to a temporary folder in which files can be stored before 846 finishing 847 848 This folder must be created before use, but is guaranteed to not exist 849 yet. The folder may be used as a staging area for the dataset data 850 while it is being processed. 851 852 :return Path: Path to folder 853 """ 854 results_file = self.get_results_path() 855 856 results_dir_base = results_file.parent 857 results_dir = results_file.name.replace(".", "") + "-staging" 858 results_path = results_dir_base.joinpath(results_dir) 859 index = 1 860 while results_path.exists(): 861 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 862 index += 1 863 864 # create temporary folder 865 results_path.mkdir() 866 867 # Storing the staging area with the dataset so that it can be removed later 868 self.disposable_files.append(results_path) 869 870 return results_path
Get path to a temporary folder in which files can be stored before finishing
This folder must be created before use, but is guaranteed to not exist yet. The folder may be used as a staging area for the dataset data while it is being processed.
Returns
Path to folder
872 def remove_disposable_files(self): 873 """ 874 Remove any disposable files and folders, such as staging areas 875 876 Called from BasicProcessor after processing a dataset finishes. 877 """ 878 # Remove DataSet staging areas 879 if self.disposable_files: 880 for disposable_file in self.disposable_files: 881 if disposable_file.exists(): 882 shutil.rmtree(disposable_file)
Remove any disposable files and folders, such as staging areas
Called from BasicProcessor after processing a dataset finishes.
884 def finish(self, num_rows=0): 885 """ 886 Declare the dataset finished 887 """ 888 if self.data["is_finished"]: 889 raise RuntimeError("Cannot finish a finished dataset again") 890 891 self.db.update( 892 "datasets", 893 where={"key": self.data["key"]}, 894 data={ 895 "is_finished": True, 896 "num_rows": num_rows, 897 "progress": 1.0, 898 "timestamp_finished": int(time.time()), 899 }, 900 ) 901 self.data["is_finished"] = True 902 self.data["num_rows"] = num_rows
Declare the dataset finished
904 def copy(self, shallow=True): 905 """ 906 Copies the dataset, making a new version with a unique key 907 908 909 :param bool shallow: Shallow copy: does not copy the result file, but 910 instead refers to the same file as the original dataset did 911 :return Dataset: Copied dataset 912 """ 913 parameters = self.parameters.copy() 914 915 # a key is partially based on the parameters. so by setting these extra 916 # attributes, we also ensure a unique key will be generated for the 917 # copy 918 # possibly todo: don't use time for uniqueness (but one shouldn't be 919 # copying a dataset multiple times per microsecond, that's not what 920 # this is for) 921 parameters["copied_from"] = self.key 922 parameters["copied_at"] = time.time() 923 924 copy = DataSet( 925 parameters=parameters, 926 db=self.db, 927 extension=self.result_file.split(".")[-1], 928 type=self.type, 929 modules=self.modules 930 ) 931 932 for field in self.data: 933 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 934 continue 935 copy.__setattr__(field, self.data[field]) 936 937 if shallow: 938 # use the same result file 939 copy.result_file = self.result_file 940 else: 941 # copy to new file with new key 942 shutil.copy(self.get_results_path(), copy.get_results_path()) 943 944 if self.is_finished(): 945 copy.finish(self.num_rows) 946 947 # make sure ownership is also copied 948 copy.copy_ownership_from(self) 949 950 return copy
Copies the dataset, making a new version with a unique key
Parameters
- bool shallow: Shallow copy: does not copy the result file, but instead refers to the same file as the original dataset did
Returns
Copied dataset
952 def delete(self, commit=True, queue=None): 953 """ 954 Delete the dataset, and all its children 955 956 Deletes both database records and result files. Note that manipulating 957 a dataset object after it has been deleted is undefined behaviour. 958 959 :param bool commit: Commit SQL DELETE query? 960 """ 961 # first, recursively delete children 962 children = self.db.fetchall( 963 "SELECT * FROM datasets WHERE key_parent = %s", (self.key,) 964 ) 965 for child in children: 966 try: 967 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 968 child.delete(commit=commit) 969 except DataSetException: 970 # dataset already deleted - race condition? 971 pass 972 973 # delete any queued jobs for this dataset 974 try: 975 job = Job.get_by_remote_ID(self.key, self.db, self.type) 976 if job.is_claimed: 977 # tell API to stop any jobs running for this dataset 978 # level 2 = cancel job 979 # we're not interested in the result - if the API is available, 980 # it will do its thing, if it's not the backend is probably not 981 # running so the job also doesn't need to be interrupted 982 call_api( 983 "cancel-job", 984 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 985 False, 986 ) 987 988 # this deletes the job from the database 989 job.finish(True) 990 991 except JobNotFoundException: 992 pass 993 994 # delete this dataset's own annotations 995 self.db.delete("annotations", where={"dataset": self.key}, commit=commit) 996 # delete annotations that have been generated as part of this dataset 997 self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit) 998 # delete annotation fields on parent dataset(s) stemming from this dataset 999 for related_dataset in self.get_genealogy(update_cache=True): 1000 field_deleted = False 1001 annotation_fields = related_dataset.annotation_fields 1002 if annotation_fields: 1003 for field_id in list(annotation_fields.keys()): 1004 if annotation_fields[field_id].get("from_dataset", "") == self.key: 1005 del annotation_fields[field_id] 1006 field_deleted = True 1007 if field_deleted: 1008 related_dataset.save_annotation_fields(annotation_fields) 1009 1010 # delete dataset from database 1011 self.db.delete("datasets", where={"key": self.key}, commit=commit) 1012 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 1013 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 1014 1015 # delete from drive 1016 try: 1017 if self.get_results_path().exists(): 1018 self.get_results_path().unlink() 1019 if self.get_results_path().with_suffix(".log").exists(): 1020 self.get_results_path().with_suffix(".log").unlink() 1021 if self.get_results_folder_path().exists(): 1022 shutil.rmtree(self.get_results_folder_path()) 1023 1024 except FileNotFoundError: 1025 # already deleted, apparently 1026 pass 1027 except PermissionError as e: 1028 self.db.log.error( 1029 f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}" 1030 )
Delete the dataset, and all its children
Deletes both database records and result files. Note that manipulating a dataset object after it has been deleted is undefined behaviour.
Parameters
- bool commit: Commit SQL DELETE query?
1032 def update_children(self, **kwargs): 1033 """ 1034 Update an attribute for all child datasets 1035 1036 Can be used to e.g. change the owner, version, finished status for all 1037 datasets in a tree 1038 1039 :param kwargs: Parameters corresponding to known dataset attributes 1040 """ 1041 for child in self.get_children(update=True): 1042 for attr, value in kwargs.items(): 1043 child.__setattr__(attr, value) 1044 1045 child.update_children(**kwargs)
Update an attribute for all child datasets
Can be used to e.g. change the owner, version, finished status for all datasets in a tree
Parameters
- kwargs: Parameters corresponding to known dataset attributes
1047 def is_finished(self): 1048 """ 1049 Check if dataset is finished 1050 :return bool: 1051 """ 1052 return bool(self.data["is_finished"])
Check if dataset is finished
Returns
1054 def is_rankable(self, multiple_items=True): 1055 """ 1056 Determine if a dataset is rankable 1057 1058 Rankable means that it is a CSV file with 'date' and 'value' columns 1059 as well as one or more item label columns 1060 1061 :param bool multiple_items: Consider datasets with multiple items per 1062 item (e.g. word_1, word_2, etc)? 1063 1064 :return bool: Whether the dataset is rankable or not 1065 """ 1066 if ( 1067 self.get_results_path().suffix != ".csv" 1068 or not self.get_results_path().exists() 1069 ): 1070 return False 1071 1072 column_options = {"date", "value", "item"} 1073 if multiple_items: 1074 column_options.add("word_1") 1075 1076 with self.get_results_path().open(encoding="utf-8") as infile: 1077 reader = csv.DictReader(infile) 1078 try: 1079 return len(set(reader.fieldnames) & column_options) >= 3 1080 except (TypeError, ValueError): 1081 return False
Determine if a dataset is rankable
Rankable means that it is a CSV file with 'date' and 'value' columns as well as one or more item label columns
Parameters
- bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)?
Returns
Whether the dataset is rankable or not
1083 def is_accessible_by(self, username, role="owner"): 1084 """ 1085 Check if dataset has given user as owner 1086 1087 :param str|User username: Username to check for 1088 :return bool: 1089 """ 1090 if type(username) is not str: 1091 if hasattr(username, "get_id"): 1092 username = username.get_id() 1093 else: 1094 raise TypeError("User must be a str or User object") 1095 1096 # 'normal' owners 1097 if username in [ 1098 owner 1099 for owner, meta in self.owners.items() 1100 if (role is None or meta["role"] == role) 1101 ]: 1102 return True 1103 1104 # owners that are owner by being part of a tag 1105 if username in itertools.chain( 1106 *[ 1107 tagged_owners 1108 for tag, tagged_owners in self.tagged_owners.items() 1109 if (role is None or self.owners[f"tag:{tag}"]["role"] == role) 1110 ] 1111 ): 1112 return True 1113 1114 return False
Check if dataset has given user as owner
Parameters
- str|User username: Username to check for
Returns
1116 def get_owners_users(self, role="owner"): 1117 """ 1118 Get list of dataset owners 1119 1120 This returns a list of *users* that are considered owners. Tags are 1121 transparently replaced with the users with that tag. 1122 1123 :param str|None role: Role to check for. If `None`, all owners are 1124 returned regardless of role. 1125 1126 :return set: De-duplicated owner list 1127 """ 1128 # 'normal' owners 1129 owners = [ 1130 owner 1131 for owner, meta in self.owners.items() 1132 if (role is None or meta["role"] == role) and not owner.startswith("tag:") 1133 ] 1134 1135 # owners that are owner by being part of a tag 1136 owners.extend( 1137 itertools.chain( 1138 *[ 1139 tagged_owners 1140 for tag, tagged_owners in self.tagged_owners.items() 1141 if role is None or self.owners[f"tag:{tag}"]["role"] == role 1142 ] 1143 ) 1144 ) 1145 1146 # de-duplicate before returning 1147 return set(owners)
Get list of dataset owners
This returns a list of users that are considered owners. Tags are transparently replaced with the users with that tag.
Parameters
- str|None role: Role to check for. If
None, all owners are returned regardless of role.
Returns
De-duplicated owner list
1149 def get_owners(self, role="owner"): 1150 """ 1151 Get list of dataset owners 1152 1153 This returns a list of all owners, and does not transparently resolve 1154 tags (like `get_owners_users` does). 1155 1156 :param str|None role: Role to check for. If `None`, all owners are 1157 returned regardless of role. 1158 1159 :return set: De-duplicated owner list 1160 """ 1161 return [ 1162 owner 1163 for owner, meta in self.owners.items() 1164 if (role is None or meta["role"] == role) 1165 ]
Get list of dataset owners
This returns a list of all owners, and does not transparently resolve
tags (like get_owners_users does).
Parameters
- str|None role: Role to check for. If
None, all owners are returned regardless of role.
Returns
De-duplicated owner list
1167 def add_owner(self, username, role="owner"): 1168 """ 1169 Set dataset owner 1170 1171 If the user is already an owner, but with a different role, the role is 1172 updated. If the user is already an owner with the same role, nothing happens. 1173 1174 :param str|User username: Username to set as owner 1175 :param str|None role: Role to add user with. 1176 """ 1177 if type(username) is not str: 1178 if hasattr(username, "get_id"): 1179 username = username.get_id() 1180 else: 1181 raise TypeError("User must be a str or User object") 1182 1183 if username not in self.owners: 1184 self.owners[username] = {"name": username, "key": self.key, "role": role} 1185 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 1186 1187 elif username in self.owners and self.owners[username]["role"] != role: 1188 self.db.update( 1189 "datasets_owners", 1190 data={"role": role}, 1191 where={"name": username, "key": self.key}, 1192 ) 1193 self.owners[username]["role"] = role 1194 1195 if username.startswith("tag:"): 1196 # this is a bit more complicated than just adding to the list of 1197 # owners, so do a full refresh 1198 self.refresh_owners() 1199 1200 # make sure children's owners remain in sync 1201 for child in self.get_children(update=True): 1202 child.add_owner(username, role) 1203 # not recursive, since we're calling it from recursive code! 1204 child.copy_ownership_from(self, recursive=False)
Set dataset owner
If the user is already an owner, but with a different role, the role is updated. If the user is already an owner with the same role, nothing happens.
Parameters
- str|User username: Username to set as owner
- str|None role: Role to add user with.
1206 def remove_owner(self, username): 1207 """ 1208 Remove dataset owner 1209 1210 If no owner is set, the dataset is assigned to the anonymous user. 1211 If the user is not an owner, nothing happens. 1212 1213 :param str|User username: Username to set as owner 1214 """ 1215 if type(username) is not str: 1216 if hasattr(username, "get_id"): 1217 username = username.get_id() 1218 else: 1219 raise TypeError("User must be a str or User object") 1220 1221 if username in self.owners: 1222 del self.owners[username] 1223 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 1224 1225 if not self.owners: 1226 self.add_owner("anonymous") 1227 1228 if username in self.tagged_owners: 1229 del self.tagged_owners[username] 1230 1231 # make sure children's owners remain in sync 1232 for child in self.get_children(update=True): 1233 child.remove_owner(username) 1234 # not recursive, since we're calling it from recursive code! 1235 child.copy_ownership_from(self, recursive=False)
Remove dataset owner
If no owner is set, the dataset is assigned to the anonymous user. If the user is not an owner, nothing happens.
Parameters
- str|User username: Username to set as owner
1237 def refresh_owners(self): 1238 """ 1239 Update internal owner cache 1240 1241 This makes sure that the list of *users* and *tags* which can access the 1242 dataset is up to date. 1243 """ 1244 self.owners = { 1245 owner["name"]: owner 1246 for owner in self.db.fetchall( 1247 "SELECT * FROM datasets_owners WHERE key = %s", (self.key,) 1248 ) 1249 } 1250 1251 # determine which users (if any) are owners of the dataset by having a 1252 # tag that is listed as an owner 1253 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 1254 if owner_tags: 1255 tagged_owners = self.db.fetchall( 1256 "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,) 1257 ) 1258 self.tagged_owners = { 1259 owner_tag: [ 1260 user["name"] for user in tagged_owners if owner_tag in user["tags"] 1261 ] 1262 for owner_tag in owner_tags 1263 } 1264 else: 1265 self.tagged_owners = {}
Update internal owner cache
This makes sure that the list of users and tags which can access the dataset is up to date.
1267 def copy_ownership_from(self, dataset, recursive=True): 1268 """ 1269 Copy ownership 1270 1271 This is useful to e.g. make sure a dataset's ownership stays in sync 1272 with its parent 1273 1274 :param Dataset dataset: Parent to copy from 1275 :return: 1276 """ 1277 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 1278 1279 for role in ("owner", "viewer"): 1280 owners = dataset.get_owners(role=role) 1281 for owner in owners: 1282 self.db.insert( 1283 "datasets_owners", 1284 data={"key": self.key, "name": owner, "role": role}, 1285 commit=False, 1286 safe=True, 1287 ) 1288 1289 self.db.commit() 1290 if recursive: 1291 for child in self.get_children(update=True): 1292 child.copy_ownership_from(self, recursive=recursive)
Copy ownership
This is useful to e.g. make sure a dataset's ownership stays in sync with its parent
Parameters
- Dataset dataset: Parent to copy from
Returns
1294 def get_parameters(self): 1295 """ 1296 Get dataset parameters 1297 1298 The dataset parameters are stored as JSON in the database - parse them 1299 and return the resulting object 1300 1301 :return: Dataset parameters as originally stored 1302 """ 1303 try: 1304 return json.loads(self.data["parameters"]) 1305 except json.JSONDecodeError: 1306 return {}
Get dataset parameters
The dataset parameters are stored as JSON in the database - parse them and return the resulting object
Returns
Dataset parameters as originally stored
1308 def get_columns(self): 1309 """ 1310 Returns the dataset columns. 1311 1312 Useful for processor input forms. Can deal with both CSV and NDJSON 1313 files, the latter only if a `map_item` function is available in the 1314 processor that generated it. While in other cases one could use the 1315 keys of the JSON object, this is not always possible in follow-up code 1316 that uses the 'column' names, so for consistency this function acts as 1317 if no column can be parsed if no `map_item` function exists. 1318 1319 :return list: List of dataset columns; empty list if unable to parse 1320 """ 1321 if not self.get_results_path().exists(): 1322 # no file to get columns from 1323 return [] 1324 1325 if (self.get_results_path().suffix.lower() == ".csv") or ( 1326 self.get_results_path().suffix.lower() == ".ndjson" 1327 and self.get_own_processor() is not None 1328 and self.get_own_processor().map_item_method_available(dataset=self) 1329 ): 1330 items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100) 1331 try: 1332 keys = list(next(items).keys()) 1333 if self.annotation_fields: 1334 for annotation_field in self.annotation_fields.values(): 1335 annotation_column = annotation_field["label"] 1336 label_count = 1 1337 while annotation_column in keys: 1338 label_count += 1 1339 annotation_column = ( 1340 f"{annotation_field['label']}_{label_count}" 1341 ) 1342 keys.append(annotation_column) 1343 columns = keys 1344 except (StopIteration, NotImplementedError): 1345 # No items or otherwise unable to iterate 1346 columns = [] 1347 finally: 1348 del items 1349 else: 1350 # Filetype not CSV or an NDJSON with `map_item` 1351 columns = [] 1352 1353 return columns
Returns the dataset columns.
Useful for processor input forms. Can deal with both CSV and NDJSON
files, the latter only if a map_item function is available in the
processor that generated it. While in other cases one could use the
keys of the JSON object, this is not always possible in follow-up code
that uses the 'column' names, so for consistency this function acts as
if no column can be parsed if no map_item function exists.
Returns
List of dataset columns; empty list if unable to parse
1355 def update_label(self, label): 1356 """ 1357 Update label for this dataset 1358 1359 :param str label: New label 1360 :return str: The new label, as returned by get_label 1361 """ 1362 self.parameters["label"] = label 1363 1364 self.db.update( 1365 "datasets", 1366 data={"parameters": json.dumps(self.parameters)}, 1367 where={"key": self.key}, 1368 ) 1369 return self.get_label()
Update label for this dataset
Parameters
- str label: New label
Returns
The new label, as returned by get_label
1371 def get_label(self, parameters=None, default="Query"): 1372 """ 1373 Generate a readable label for the dataset 1374 1375 :param dict parameters: Parameters of the dataset 1376 :param str default: Label to use if it cannot be inferred from the 1377 parameters 1378 1379 :return str: Label 1380 """ 1381 if not parameters: 1382 parameters = self.parameters 1383 1384 if parameters.get("label"): 1385 return parameters["label"] 1386 elif parameters.get("body_query") and parameters["body_query"] != "empty": 1387 return parameters["body_query"] 1388 elif parameters.get("body_match") and parameters["body_match"] != "empty": 1389 return parameters["body_match"] 1390 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 1391 return parameters["subject_query"] 1392 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 1393 return parameters["subject_match"] 1394 elif parameters.get("query"): 1395 label = parameters["query"] 1396 # Some legacy datasets have lists as query data 1397 if isinstance(label, list): 1398 label = ", ".join(label) 1399 1400 label = label if len(label) < 30 else label[:25] + "..." 1401 label = label.strip().replace("\n", ", ") 1402 return label 1403 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 1404 return "Flag: %s" % parameters["country_flag"] 1405 elif parameters.get("country_name") and parameters["country_name"] != "all": 1406 return "Country: %s" % parameters["country_name"] 1407 elif parameters.get("filename"): 1408 return parameters["filename"] 1409 elif parameters.get("board") and "datasource" in parameters: 1410 return parameters["datasource"] + "/" + parameters["board"] 1411 elif ( 1412 "datasource" in parameters 1413 and parameters["datasource"] in self.modules.datasources 1414 ): 1415 return ( 1416 self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 1417 ) 1418 else: 1419 return default
Generate a readable label for the dataset
Parameters
- dict parameters: Parameters of the dataset
- str default: Label to use if it cannot be inferred from the parameters
Returns
Label
1421 def change_datasource(self, datasource): 1422 """ 1423 Change the datasource type for this dataset 1424 1425 :param str label: New datasource type 1426 :return str: The new datasource type 1427 """ 1428 1429 self.parameters["datasource"] = datasource 1430 1431 self.db.update( 1432 "datasets", 1433 data={"parameters": json.dumps(self.parameters)}, 1434 where={"key": self.key}, 1435 ) 1436 return datasource
Change the datasource type for this dataset
Parameters
- str label: New datasource type
Returns
The new datasource type
1438 def reserve_result_file(self, parameters=None, extension="csv"): 1439 """ 1440 Generate a unique path to the results file for this dataset 1441 1442 This generates a file name for the data file of this dataset, and makes sure 1443 no file exists or will exist at that location other than the file we 1444 expect (i.e. the data for this particular dataset). 1445 1446 :param str extension: File extension, "csv" by default 1447 :param parameters: Dataset parameters 1448 :return bool: Whether the file path was successfully reserved 1449 """ 1450 if self.data["is_finished"]: 1451 raise RuntimeError("Cannot reserve results file for a finished dataset") 1452 1453 # Use 'random' for random post queries 1454 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 1455 file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"] 1456 # Use country code for country flag queries 1457 elif "country_flag" in parameters and parameters["country_flag"] != "all": 1458 file = ( 1459 "countryflag-" 1460 + str(parameters["country_flag"]) 1461 + "-" 1462 + self.data["key"] 1463 ) 1464 # Use the query string for all other queries 1465 else: 1466 query_bit = self.data["query"].replace(" ", "-").lower() 1467 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 1468 query_bit = query_bit[:100] # Crop to avoid OSError 1469 file = query_bit + "-" + self.data["key"] 1470 file = re.sub(r"[-]+", "-", file) 1471 1472 self.data["result_file"] = file + "." + extension.lower() 1473 index = 1 1474 while self.get_results_path().is_file(): 1475 self.data["result_file"] = file + "-" + str(index) + "." + extension.lower() 1476 index += 1 1477 1478 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 1479 data={"result_file": self.data["result_file"]}) 1480 return updated > 0
Generate a unique path to the results file for this dataset
This generates a file name for the data file of this dataset, and makes sure no file exists or will exist at that location other than the file we expect (i.e. the data for this particular dataset).
Parameters
- str extension: File extension, "csv" by default
- parameters: Dataset parameters
Returns
Whether the file path was successfully reserved
1482 def get_key(self, query, parameters, parent="", time_offset=0): 1483 """ 1484 Generate a unique key for this dataset that can be used to identify it 1485 1486 The key is a hash of a combination of the query string and parameters. 1487 You never need to call this, really: it's used internally. 1488 1489 :param str query: Query string 1490 :param parameters: Dataset parameters 1491 :param parent: Parent dataset's key (if applicable) 1492 :param time_offset: Offset to add to the time component of the dataset 1493 key. This can be used to ensure a unique key even if the parameters and 1494 timing is otherwise identical to an existing dataset's 1495 1496 :return str: Dataset key 1497 """ 1498 # Return a hash based on parameters 1499 # we're going to use the hash of the parameters to uniquely identify 1500 # the dataset, so make sure it's always in the same order, or we might 1501 # end up creating multiple keys for the same dataset if python 1502 # decides to return the dict in a different order 1503 param_key = collections.OrderedDict() 1504 for key in sorted(parameters): 1505 param_key[key] = parameters[key] 1506 1507 # we additionally use the current time as a salt - this should usually 1508 # ensure a unique key for the dataset. if for some reason there is a 1509 # hash collision 1510 param_key["_salt"] = int(time.time()) + time_offset 1511 1512 parent_key = str(parent) if parent else "" 1513 plain_key = repr(param_key) + str(query) + parent_key 1514 hashed_key = hash_to_md5(plain_key) 1515 1516 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1517 # key exists, generate a new one 1518 return self.get_key( 1519 query, parameters, parent, time_offset=random.randint(1, 10) 1520 ) 1521 else: 1522 return hashed_key
Generate a unique key for this dataset that can be used to identify it
The key is a hash of a combination of the query string and parameters. You never need to call this, really: it's used internally.
Parameters
- str query: Query string
- parameters: Dataset parameters
- parent: Parent dataset's key (if applicable)
- time_offset: Offset to add to the time component of the dataset key. This can be used to ensure a unique key even if the parameters and timing is otherwise identical to an existing dataset's
Returns
Dataset key
1524 def set_key(self, key): 1525 """ 1526 Change dataset key 1527 1528 In principe, keys should never be changed. But there are rare cases 1529 where it is useful to do so, in particular when importing a dataset 1530 from another 4CAT instance; in that case it makes sense to try and 1531 ensure that the key is the same as it was before. This function sets 1532 the dataset key and updates any dataset references to it. 1533 1534 :param str key: Key to set 1535 :return str: Key that was set. If the desired key already exists, the 1536 original key is kept. 1537 """ 1538 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1539 if key_exists or not key: 1540 return self.key 1541 1542 old_key = self.key 1543 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1544 1545 # update references 1546 self.db.update( 1547 "datasets", data={"key_parent": key}, where={"key_parent": old_key} 1548 ) 1549 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1550 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1551 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1552 1553 # for good measure 1554 self.db.commit() 1555 self.key = key 1556 1557 return self.key
Change dataset key
In principe, keys should never be changed. But there are rare cases where it is useful to do so, in particular when importing a dataset from another 4CAT instance; in that case it makes sense to try and ensure that the key is the same as it was before. This function sets the dataset key and updates any dataset references to it.
Parameters
- str key: Key to set
Returns
Key that was set. If the desired key already exists, the original key is kept.
1559 def get_status(self): 1560 """ 1561 Get Dataset status 1562 1563 :return string: Dataset status 1564 """ 1565 return self.data["status"]
Get Dataset status
Returns
Dataset status
1567 def update_status(self, status, is_final=False): 1568 """ 1569 Update dataset status 1570 1571 The status is a string that may be displayed to a user to keep them 1572 updated and informed about the progress of a dataset. No memory is kept 1573 of earlier dataset statuses; the current status is overwritten when 1574 updated. 1575 1576 Statuses are also written to the dataset log file. 1577 1578 :param string status: Dataset status 1579 :param bool is_final: If this is `True`, subsequent calls to this 1580 method while the object is instantiated will not update the dataset 1581 status. 1582 :return bool: Status update successful? 1583 """ 1584 if self.no_status_updates: 1585 return 1586 1587 # for presets, copy the updated status to the preset(s) this is part of 1588 if self.preset_parent is None: 1589 self.preset_parent = [ 1590 parent 1591 for parent in self.get_genealogy() 1592 if parent.type.find("preset-") == 0 and parent.key != self.key 1593 ][:1] 1594 1595 if self.preset_parent: 1596 for preset_parent in self.preset_parent: 1597 if not preset_parent.is_finished(): 1598 preset_parent.update_status(status) 1599 1600 self.data["status"] = status 1601 updated = self.db.update( 1602 "datasets", where={"key": self.data["key"]}, data={"status": status} 1603 ) 1604 1605 if is_final: 1606 self.no_status_updates = True 1607 1608 self.log(status) 1609 1610 return updated > 0
Update dataset status
The status is a string that may be displayed to a user to keep them updated and informed about the progress of a dataset. No memory is kept of earlier dataset statuses; the current status is overwritten when updated.
Statuses are also written to the dataset log file.
Parameters
- string status: Dataset status
- bool is_final: If this is
True, subsequent calls to this method while the object is instantiated will not update the dataset status.
Returns
Status update successful?
1612 def update_progress(self, progress): 1613 """ 1614 Update dataset progress 1615 1616 The progress can be used to indicate to a user how close the dataset 1617 is to completion. 1618 1619 :param float progress: Between 0 and 1. 1620 :return: 1621 """ 1622 progress = min(1, max(0, progress)) # clamp 1623 if type(progress) is int: 1624 progress = float(progress) 1625 1626 self.data["progress"] = progress 1627 updated = self.db.update( 1628 "datasets", where={"key": self.data["key"]}, data={"progress": progress} 1629 ) 1630 return updated > 0
Update dataset progress
The progress can be used to indicate to a user how close the dataset is to completion.
Parameters
- float progress: Between 0 and 1.
Returns
1632 def get_progress(self): 1633 """ 1634 Get dataset progress 1635 1636 :return float: Progress, between 0 and 1 1637 """ 1638 return self.data["progress"]
Get dataset progress
Returns
Progress, between 0 and 1
1640 def finish_with_error(self, error): 1641 """ 1642 Set error as final status, and finish with 0 results 1643 1644 This is a convenience function to avoid having to repeat 1645 "update_status" and "finish" a lot. 1646 1647 :param str error: Error message for final dataset status. 1648 :return: 1649 """ 1650 self.update_status(error, is_final=True) 1651 self.finish(0) 1652 1653 return None
Set error as final status, and finish with 0 results
This is a convenience function to avoid having to repeat "update_status" and "finish" a lot.
Parameters
- str error: Error message for final dataset status.
Returns
1655 def update_version(self, version): 1656 """ 1657 Update software version used for this dataset 1658 1659 This can be used to verify the code that was used to process this dataset. 1660 1661 :param string version: Version identifier 1662 :return bool: Update successul? 1663 """ 1664 try: 1665 # this fails if the processor type is unknown 1666 # edge case, but let's not crash... 1667 processor_path = self.modules.processors.get(self.data["type"]).filepath 1668 except AttributeError: 1669 processor_path = "" 1670 1671 updated = self.db.update( 1672 "datasets", 1673 where={"key": self.data["key"]}, 1674 data={ 1675 "software_version": version[0], 1676 "software_source": version[1], 1677 "software_file": processor_path, 1678 }, 1679 ) 1680 1681 return updated > 0
Update software version used for this dataset
This can be used to verify the code that was used to process this dataset.
Parameters
- string version: Version identifier
Returns
Update successul?
1683 def delete_parameter(self, parameter, instant=True): 1684 """ 1685 Delete a parameter from the dataset metadata 1686 1687 :param string parameter: Parameter to delete 1688 :param bool instant: Also delete parameters in this instance object? 1689 :return bool: Update successul? 1690 """ 1691 parameters = self.parameters.copy() 1692 if parameter in parameters: 1693 del parameters[parameter] 1694 else: 1695 return False 1696 1697 updated = self.db.update( 1698 "datasets", 1699 where={"key": self.data["key"]}, 1700 data={"parameters": json.dumps(parameters)}, 1701 ) 1702 1703 if instant: 1704 self.parameters = parameters 1705 1706 return updated > 0
Delete a parameter from the dataset metadata
Parameters
- string parameter: Parameter to delete
- bool instant: Also delete parameters in this instance object?
Returns
Update successul?
1708 def get_version_url(self, file): 1709 """ 1710 Get a versioned github URL for the version this dataset was processed with 1711 1712 :param file: File to link within the repository 1713 :return: URL, or an empty string 1714 """ 1715 if not self.data["software_source"]: 1716 return "" 1717 1718 filepath = self.data.get("software_file", "") 1719 if filepath.startswith("/config/extensions/"): 1720 # go to root of extension 1721 filepath = "/" + "/".join(filepath.split("/")[3:]) 1722 1723 return ( 1724 self.data["software_source"] 1725 + "/blob/" 1726 + self.data["software_version"] 1727 + filepath 1728 )
Get a versioned github URL for the version this dataset was processed with
Parameters
- file: File to link within the repository
Returns
URL, or an empty string
1730 def top_parent(self): 1731 """ 1732 Get root dataset 1733 1734 Traverses the tree of datasets this one is part of until it finds one 1735 with no source_dataset dataset, then returns that dataset. 1736 1737 :return Dataset: Parent dataset 1738 """ 1739 genealogy = self.get_genealogy() 1740 return genealogy[0]
Get root dataset
Traverses the tree of datasets this one is part of until it finds one with no source_dataset dataset, then returns that dataset.
Returns
Parent dataset
1742 def get_genealogy(self, update_cache=False): 1743 """ 1744 Get genealogy of this dataset 1745 1746 Creates a list of DataSet objects, with the first one being the 1747 'top' dataset, and each subsequent one being a child of the previous 1748 one, ending with the current dataset. 1749 1750 :param bool update_cache: Update the cached genealogy if True, else return cached value 1751 :return list: Dataset genealogy, oldest dataset first 1752 """ 1753 if not self._genealogy or update_cache: 1754 key_parent = self.key_parent 1755 genealogy = [] 1756 1757 while key_parent: 1758 try: 1759 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1760 except DataSetException: 1761 break 1762 1763 genealogy.append(parent) 1764 if parent.key_parent: 1765 key_parent = parent.key_parent 1766 else: 1767 break 1768 1769 genealogy.reverse() 1770 1771 # add self to the end 1772 genealogy.append(self) 1773 # cache the result 1774 self._genealogy = genealogy 1775 1776 # return a copy to prevent external modification 1777 return list(self._genealogy)
Get genealogy of this dataset
Creates a list of DataSet objects, with the first one being the 'top' dataset, and each subsequent one being a child of the previous one, ending with the current dataset.
Parameters
- bool update_cache: Update the cached genealogy if True, else return cached value
Returns
Dataset genealogy, oldest dataset first
1779 def get_children(self, update=False): 1780 """ 1781 Get children of this dataset 1782 1783 :param bool update: Update the list of children from database if True, else return cached value 1784 :return list: List of child datasets 1785 """ 1786 if self._children is not None and not update: 1787 return self._children 1788 1789 analyses = self.db.fetchall( 1790 "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", 1791 (self.key,), 1792 ) 1793 self._children = [ 1794 DataSet(data=analysis, db=self.db, modules=self.modules) 1795 for analysis in analyses 1796 ] 1797 return self._children
Get children of this dataset
Parameters
- bool update: Update the list of children from database if True, else return cached value
Returns
List of child datasets
1799 def get_all_children(self, recursive=True, update=True): 1800 """ 1801 Get all children of this dataset 1802 1803 Results are returned as a non-hierarchical list, i.e. the result does 1804 not reflect the actual dataset hierarchy (but all datasets in the 1805 result will have the original dataset as an ancestor somewhere) 1806 1807 :return list: List of DataSets 1808 """ 1809 children = self.get_children(update=update) 1810 results = children.copy() 1811 if recursive: 1812 for child in children: 1813 results += child.get_all_children(recursive=recursive, update=update) 1814 1815 return results
Get all children of this dataset
Results are returned as a non-hierarchical list, i.e. the result does not reflect the actual dataset hierarchy (but all datasets in the result will have the original dataset as an ancestor somewhere)
Returns
List of DataSets
1817 def nearest(self, type_filter): 1818 """ 1819 Return nearest dataset that matches the given type 1820 1821 Starting with this dataset, traverse the hierarchy upwards and return 1822 whichever dataset matches the given type. 1823 1824 :param str type_filter: Type filter. Can contain wildcards and is matched 1825 using `fnmatch.fnmatch`. 1826 :return: Earliest matching dataset, or `None` if none match. 1827 """ 1828 genealogy = self.get_genealogy() 1829 for dataset in reversed(genealogy): 1830 if fnmatch.fnmatch(dataset.type, type_filter): 1831 return dataset 1832 1833 return None
Return nearest dataset that matches the given type
Starting with this dataset, traverse the hierarchy upwards and return whichever dataset matches the given type.
Parameters
- str type_filter: Type filter. Can contain wildcards and is matched
using
fnmatch.fnmatch.
Returns
Earliest matching dataset, or
Noneif none match.
1850 def get_compatible_processors(self, config=None): 1851 """ 1852 Get list of processors compatible with this dataset 1853 1854 Checks whether this dataset type is one that is listed as being accepted 1855 by the processor, for each known type: if the processor does not 1856 specify accepted types (via the `is_compatible_with` method), it is 1857 assumed it accepts any top-level datasets 1858 1859 :param ConfigManager|None config: Configuration reader to determine 1860 compatibility through. This may not be the same reader the dataset was 1861 instantiated with, e.g. when checking whether some other user should 1862 be able to run processors on this dataset. 1863 :return dict: Compatible processors, `name => class` mapping 1864 """ 1865 processors = self.modules.processors 1866 1867 available = {} 1868 for processor_type, processor in processors.items(): 1869 if processor.is_from_collector(): 1870 continue 1871 1872 own_processor = self.get_own_processor() 1873 if own_processor and own_processor.exclude_followup_processors( 1874 processor_type 1875 ): 1876 continue 1877 1878 # consider a processor compatible if its is_compatible_with 1879 # method returns True *or* if it has no explicit compatibility 1880 # check and this dataset is top-level (i.e. has no parent) 1881 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1882 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)): 1883 available[processor_type] = processor 1884 1885 return available
Get list of processors compatible with this dataset
Checks whether this dataset type is one that is listed as being accepted
by the processor, for each known type: if the processor does not
specify accepted types (via the is_compatible_with method), it is
assumed it accepts any top-level datasets
Parameters
- ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
Returns
Compatible processors,
name => classmapping
1887 def get_place_in_queue(self, update=False): 1888 """ 1889 Determine dataset's position in queue 1890 1891 If the dataset is already finished, the position is -1. Else, the 1892 position is the number of datasets to be completed before this one will 1893 be processed. A position of 0 would mean that the dataset is currently 1894 being executed, or that the backend is not running. 1895 1896 :param bool update: Update the queue position from database if True, else return cached value 1897 :return int: Queue position 1898 """ 1899 if self.is_finished() or not self.data.get("job"): 1900 self._queue_position = -1 1901 return self._queue_position 1902 elif not update and self._queue_position is not None: 1903 # Use cached value 1904 return self._queue_position 1905 else: 1906 # Collect queue position from database via the job 1907 try: 1908 job = Job.get_by_ID(self.data["job"], self.db) 1909 self._queue_position = job.get_place_in_queue() 1910 except JobNotFoundException: 1911 self._queue_position = -1 1912 1913 return self._queue_position
Determine dataset's position in queue
If the dataset is already finished, the position is -1. Else, the position is the number of datasets to be completed before this one will be processed. A position of 0 would mean that the dataset is currently being executed, or that the backend is not running.
Parameters
- bool update: Update the queue position from database if True, else return cached value
Returns
Queue position
1915 def get_own_processor(self): 1916 """ 1917 Get the processor class that produced this dataset 1918 1919 :return: Processor class, or `None` if not available. 1920 """ 1921 processor_type = self.parameters.get("type", self.data.get("type")) 1922 1923 return self.modules.processors.get(processor_type)
Get the processor class that produced this dataset
Returns
Processor class, or
Noneif not available.
1925 def get_available_processors(self, config=None, exclude_hidden=False): 1926 """ 1927 Get list of processors that may be run for this dataset 1928 1929 Returns all compatible processors except for those that are already 1930 queued or finished and have no options. Processors that have been 1931 run but have options are included so they may be run again with a 1932 different configuration 1933 1934 :param ConfigManager|None config: Configuration reader to determine 1935 compatibility through. This may not be the same reader the dataset was 1936 instantiated with, e.g. when checking whether some other user should 1937 be able to run processors on this dataset. 1938 :param bool exclude_hidden: Exclude processors that should be displayed 1939 in the UI? If `False`, all processors are returned. 1940 1941 :return dict: Available processors, `name => properties` mapping 1942 """ 1943 if self.available_processors: 1944 # Update to reflect exclude_hidden parameter which may be different from last call 1945 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1946 return { 1947 processor_type: processor 1948 for processor_type, processor in self.available_processors.items() 1949 if not exclude_hidden or not processor.is_hidden 1950 } 1951 1952 processors = self.get_compatible_processors(config=config) 1953 1954 for analysis in self.get_children(update=True): 1955 if analysis.type not in processors: 1956 continue 1957 1958 if not processors[analysis.type].get_options(config=config): 1959 # No variable options; this processor has been run so remove 1960 del processors[analysis.type] 1961 continue 1962 1963 if exclude_hidden and processors[analysis.type].is_hidden: 1964 del processors[analysis.type] 1965 1966 self.available_processors = processors 1967 return processors
Get list of processors that may be run for this dataset
Returns all compatible processors except for those that are already queued or finished and have no options. Processors that have been run but have options are included so they may be run again with a different configuration
Parameters
- ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
- bool exclude_hidden: Exclude processors that should be displayed
in the UI? If
False, all processors are returned.
Returns
Available processors,
name => propertiesmapping
1969 def link_job(self, job): 1970 """ 1971 Link this dataset to a job ID 1972 1973 Updates the dataset data to include a reference to the job that will be 1974 executing (or has already executed) this job. 1975 1976 Note that if no job can be found for this dataset, this method silently 1977 fails. 1978 1979 :param Job job: The job that will run this dataset 1980 1981 :todo: If the job column ever gets used, make sure it always contains 1982 a valid value, rather than silently failing this method. 1983 """ 1984 if type(job) is not Job: 1985 raise TypeError("link_job requires a Job object as its argument") 1986 1987 if "id" not in job.data: 1988 try: 1989 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1990 except JobNotFoundException: 1991 return 1992 1993 self.db.update( 1994 "datasets", where={"key": self.key}, data={"job": job.data["id"]} 1995 )
Link this dataset to a job ID
Updates the dataset data to include a reference to the job that will be executing (or has already executed) this job.
Note that if no job can be found for this dataset, this method silently fails.
Parameters
- Job job: The job that will run this dataset
:todo: If the job column ever gets used, make sure it always contains a valid value, rather than silently failing this method.
1997 def link_parent(self, key_parent): 1998 """ 1999 Set source_dataset key for this dataset 2000 2001 :param key_parent: Parent key. Not checked for validity 2002 """ 2003 self.db.update( 2004 "datasets", where={"key": self.key}, data={"key_parent": key_parent} 2005 ) 2006 # reset caches 2007 self.data["key_parent"] = key_parent 2008 self._genealogy = None
Set source_dataset key for this dataset
Parameters
- key_parent: Parent key. Not checked for validity
2010 def get_parent(self): 2011 """ 2012 Get parent dataset 2013 2014 :return DataSet: Parent dataset, or `None` if not applicable 2015 """ 2016 return ( 2017 DataSet(key=self.key_parent, db=self.db, modules=self.modules) 2018 if self.key_parent 2019 else None 2020 )
Get parent dataset
Returns
Parent dataset, or
Noneif not applicable
2022 def detach(self): 2023 """ 2024 Makes the datasets standalone, i.e. not having any source_dataset dataset 2025 """ 2026 self.link_parent("")
Makes the datasets standalone, i.e. not having any source_dataset dataset
2028 def is_dataset(self): 2029 """ 2030 Easy way to confirm this is a dataset. 2031 Used for checking processor and dataset compatibility, 2032 which needs to handle both processors and datasets. 2033 """ 2034 return True
Easy way to confirm this is a dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.
2036 def is_top_dataset(self): 2037 """ 2038 Easy way to confirm this is a top dataset. 2039 Used for checking processor and dataset compatibility, 2040 which needs to handle both processors and datasets. 2041 """ 2042 if self.key_parent: 2043 return False 2044 return True
Easy way to confirm this is a top dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.
2046 def is_expiring(self, config): 2047 """ 2048 Determine if dataset is set to expire 2049 2050 Similar to `is_expired`, but checks if the dataset will be deleted in 2051 the future, not if it should be deleted right now. 2052 2053 :param ConfigManager config: Configuration reader (context-aware) 2054 :return bool|int: `False`, or the expiration date as a Unix timestamp. 2055 """ 2056 # has someone opted out of deleting this? 2057 if self.parameters.get("keep"): 2058 return False 2059 2060 # is this dataset explicitly marked as expiring after a certain time? 2061 if self.parameters.get("expires-after"): 2062 return self.parameters.get("expires-after") 2063 2064 # is the data source configured to have its datasets expire? 2065 expiration = config.get("datasources.expiration", {}) 2066 if not expiration.get(self.parameters.get("datasource")): 2067 return False 2068 2069 # is there a timeout for this data source? 2070 if expiration.get(self.parameters.get("datasource")).get("timeout"): 2071 return self.timestamp + expiration.get( 2072 self.parameters.get("datasource") 2073 ).get("timeout") 2074 2075 return False
Determine if dataset is set to expire
Similar to is_expired, but checks if the dataset will be deleted in
the future, not if it should be deleted right now.
Parameters
- ConfigManager config: Configuration reader (context-aware)
Returns
False, or the expiration date as a Unix timestamp.
2077 def is_expired(self, config): 2078 """ 2079 Determine if dataset should be deleted 2080 2081 Datasets can be set to expire, but when they should be deleted depends 2082 on a number of factor. This checks them all. 2083 2084 :param ConfigManager config: Configuration reader (context-aware) 2085 :return bool: 2086 """ 2087 # has someone opted out of deleting this? 2088 if not self.is_expiring(config): 2089 return False 2090 2091 # is this dataset explicitly marked as expiring after a certain time? 2092 future = ( 2093 time.time() + 3600 2094 ) # ensure we don't delete datasets with invalid expiration times 2095 if ( 2096 self.parameters.get("expires-after") 2097 and convert_to_int(self.parameters["expires-after"], future) < time.time() 2098 ): 2099 return True 2100 2101 # is the data source configured to have its datasets expire? 2102 expiration = config.get("datasources.expiration", {}) 2103 if not expiration.get(self.parameters.get("datasource")): 2104 return False 2105 2106 # is the dataset older than the set timeout? 2107 if expiration.get(self.parameters.get("datasource")).get("timeout"): 2108 return ( 2109 self.timestamp 2110 + expiration[self.parameters.get("datasource")]["timeout"] 2111 < time.time() 2112 ) 2113 2114 return False
Determine if dataset should be deleted
Datasets can be set to expire, but when they should be deleted depends on a number of factor. This checks them all.
Parameters
- ConfigManager config: Configuration reader (context-aware)
Returns
2116 def is_from_collector(self): 2117 """ 2118 Check if this dataset was made by a processor that collects data, i.e. 2119 a search or import worker. 2120 2121 :return bool: 2122 """ 2123 return self.type.endswith("-search") or self.type.endswith("-import")
Check if this dataset was made by a processor that collects data, i.e. a search or import worker.
Returns
2125 def get_extension(self): 2126 """ 2127 Gets the file extension this dataset produces. 2128 Also checks whether the results file exists. 2129 Used for checking processor and dataset compatibility. 2130 2131 :return str extension: Extension, e.g. `csv` 2132 """ 2133 if self.get_results_path().exists(): 2134 return self.get_results_path().suffix[1:] 2135 2136 return False
Gets the file extension this dataset produces. Also checks whether the results file exists. Used for checking processor and dataset compatibility.
Returns
Extension, e.g.
csv
2138 def is_filter(self): 2139 """ 2140 Check whether a dataset is a filter dataset. 2141 2142 :return bool: True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown). 2143 """ 2144 own_processor = self.get_own_processor() 2145 if own_processor is None: 2146 # Deprecated datasets do not have a processor 2147 return None 2148 return own_processor.is_filter()
Check whether a dataset is a filter dataset.
Returns
True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown).
2150 def get_media_type(self): 2151 """ 2152 Gets the media type of the dataset file. 2153 2154 :return str: media type, e.g., "text" 2155 """ 2156 own_processor = self.get_own_processor() 2157 if hasattr(self, "media_type"): 2158 # media type can be defined explicitly in the dataset; this is the priority 2159 return self.media_type 2160 elif own_processor is not None: 2161 # or media type can be defined in the processor 2162 # some processors can set different media types for different datasets (e.g., import_media) 2163 if hasattr(own_processor, "media_type"): 2164 return own_processor.media_type 2165 2166 # Default to text 2167 return self.parameters.get("media_type", "text")
Gets the media type of the dataset file.
Returns
media type, e.g., "text"
2169 def get_metadata(self): 2170 """ 2171 Get dataset metadata 2172 2173 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 2174 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 2175 update its database (and ensure compatibility with the exporting version of 4CAT). 2176 """ 2177 metadata = self.db.fetchone( 2178 "SELECT * FROM datasets WHERE key = %s", (self.key,) 2179 ) 2180 2181 # get 4CAT version (presumably to ensure export is compatible with import) 2182 metadata["current_4CAT_version"] = get_software_version() 2183 return metadata
Get dataset metadata
This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to update its database (and ensure compatibility with the exporting version of 4CAT).
2185 def get_result_url(self): 2186 """ 2187 Gets the 4CAT frontend URL of a dataset file. 2188 2189 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 2190 SERVER_HTTPS) plus hardcoded '/result/'. 2191 TODO: create more dynamic method of obtaining url. 2192 """ 2193 filename = self.get_results_path().name 2194 2195 # we cheat a little here by using the modules' config reader, but these 2196 # will never be context-dependent values anyway 2197 url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \ 2198 self.modules.config.get("flask.server_name") + '/result/' + filename 2199 return url_to_file
Gets the 4CAT frontend URL of a dataset file.
Uses the FlaskConfig attributes (i.e., SERVER_NAME and SERVER_HTTPS) plus hardcoded '/result/'. TODO: create more dynamic method of obtaining url.
2201 def warn_unmappable_item( 2202 self, item_count, processor=None, error_message=None, warn_admins=True 2203 ): 2204 """ 2205 Log an item that is unable to be mapped and warn administrators. 2206 2207 :param int item_count: Item index 2208 :param Processor processor: Processor calling function8 2209 """ 2210 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 2211 2212 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 2213 closest_dataset = ( 2214 processor.dataset 2215 if processor is not None and processor.dataset is not None 2216 else self 2217 ) 2218 # Log error to dataset log 2219 closest_dataset.log(dataset_error_message) 2220 2221 if warn_admins: 2222 if processor is not None: 2223 processor.log.warning( 2224 f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}." 2225 ) 2226 elif hasattr(self.db, "log"): 2227 # borrow the database's log handler 2228 self.db.log.warning( 2229 f"Unable to map item all items for dataset {closest_dataset.key}." 2230 ) 2231 else: 2232 # No other log available 2233 raise DataSetException( 2234 f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn" 2235 )
Log an item that is unable to be mapped and warn administrators.
Parameters
- int item_count: Item index
- Processor processor: Processor calling function8
2238 def has_annotations(self) -> bool: 2239 """ 2240 Whether this dataset has annotations 2241 """ 2242 2243 annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,)) 2244 2245 return True if annotation else False
Whether this dataset has annotations
2247 def num_annotations(self) -> int: 2248 """ 2249 Get the amount of annotations 2250 """ 2251 return self.db.fetchone( 2252 "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,) 2253 )["count"]
Get the amount of annotations
2255 def get_annotation(self, data: dict): 2256 """ 2257 Retrieves a specific annotation if it exists. 2258 2259 :param data: A dictionary with which to get the annotations from. 2260 To get specific annotations, include either an `id` field or 2261 `field_id` and `item_id` fields. 2262 2263 return Annotation: Annotation object. 2264 """ 2265 2266 if "id" not in data or ("field_id" not in data and "item_id" not in data): 2267 return None 2268 2269 if "dataset" not in data: 2270 data["dataset"] = self.key 2271 2272 return Annotation(data=data, db=self.db)
Retrieves a specific annotation if it exists.
Parameters
- data: A dictionary with which to get the annotations from.
To get specific annotations, include either an
idfield orfield_idanditem_idfields.
return Annotation: Annotation object.
2274 def get_annotations(self) -> list: 2275 """ 2276 Retrieves all annotations for this dataset. 2277 2278 return list: List of Annotation objects. 2279 """ 2280 2281 return Annotation.get_annotations_for_dataset(self.db, self.key)
Retrieves all annotations for this dataset.
return list: List of Annotation objects.
2283 def get_annotations_for_item(self, item_id: str | list, before=0) -> list: 2284 """ 2285 Retrieves all annotations from this dataset for a specific item (e.g. social media post). 2286 :param str item_id: The ID of the annotation item 2287 :param int before: The upper timestamp range for annotations. 2288 """ 2289 return Annotation.get_annotations_for_dataset( 2290 self.db, self.key, item_id=item_id, before=before 2291 )
Retrieves all annotations from this dataset for a specific item (e.g. social media post).
Parameters
- str item_id: The ID of the annotation item
- int before: The upper timestamp range for annotations.
2293 def has_annotation_fields(self) -> bool: 2294 """ 2295 Returns True if there's annotation fields saved tot the dataset table 2296 Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.). 2297 """ 2298 2299 return True if self.annotation_fields else False
Returns True if there's annotation fields saved tot the dataset table
Annotation fields are metadata that describe a type of annotation (with info on id, type, etc.).
2301 def get_annotation_field_labels(self) -> list: 2302 """ 2303 Retrieves the saved annotation field labels for this dataset. 2304 These are stored in the annotations table. 2305 2306 :return list: List of annotation field labels. 2307 """ 2308 2309 annotation_fields = self.annotation_fields 2310 2311 if not annotation_fields: 2312 return [] 2313 2314 labels = [v["label"] for v in annotation_fields.values()] 2315 2316 return labels
Retrieves the saved annotation field labels for this dataset. These are stored in the annotations table.
Returns
List of annotation field labels.
2318 def save_annotations(self, annotations: list) -> int: 2319 """ 2320 Takes a list of annotations and saves them to the annotations table. 2321 If a field is not yet present in the `annotation_fields` column in 2322 the datasets table, it also adds it there. 2323 2324 :param list annotations: List of dictionaries with annotation items. Must have `item_id`, `field_id`, 2325 and `label`. 2326 `item_id` is for the specific item being annotated (e.g. a social media post) 2327 `field_id` refers to the annotation field. 2328 `label` is a human-readable description of this annotation. 2329 E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", 2330 "value": "Yes"}] 2331 2332 :returns int: How many annotations were saved. 2333 2334 """ 2335 2336 if not annotations: 2337 return 0 2338 2339 count = 0 2340 annotation_fields = self.annotation_fields 2341 2342 # Add some dataset data to annotations, if not present 2343 for annotation_data in annotations: 2344 # Check if the required fields are present 2345 if not annotation_data.get("item_id"): 2346 raise AnnotationException( 2347 "Can't save annotations; annotation must have an `item_id` referencing " 2348 "the item it annotated, got %s" % annotation_data 2349 ) 2350 if not annotation_data.get("field_id"): 2351 raise AnnotationException( 2352 "Can't save annotations; annotation must have a `field_id` field, " 2353 "got %s" % annotation_data 2354 ) 2355 if not annotation_data.get("label") or not isinstance( 2356 annotation_data["label"], str 2357 ): 2358 raise AnnotationException( 2359 "Can't save annotations; annotation must have a `label` field, " 2360 "got %s" % annotation_data 2361 ) 2362 2363 # Set dataset key 2364 if not annotation_data.get("dataset"): 2365 annotation_data["dataset"] = self.key 2366 2367 # Set default author to this dataset owner 2368 # If this annotation is made by a processor, it will have the processor name 2369 if not annotation_data.get("author"): 2370 annotation_data["author"] = self.get_owners()[0] 2371 2372 # Create Annotation object, which also saves it to the database 2373 # If this dataset/item_id/field_id combination already exists, this retrieves the 2374 # existing data and updates it with new values. 2375 Annotation(data=annotation_data, db=self.db) 2376 count += 1 2377 2378 # Save annotation fields if things changed 2379 if annotation_fields != self.annotation_fields: 2380 self.save_annotation_fields(annotation_fields) 2381 2382 return count
Takes a list of annotations and saves them to the annotations table.
If a field is not yet present in the annotation_fields column in
the datasets table, it also adds it there.
Parameters
- list annotations: List of dictionaries with annotation items. Must have
item_id,field_id, andlabel.item_idis for the specific item being annotated (e.g. a social media post)field_idrefers to the annotation field.labelis a human-readable description of this annotation. E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", "value": "Yes"}]
:returns int: How many annotations were saved.
2384 def save_annotation_fields(self, new_fields: dict, add=False) -> int: 2385 """ 2386 Save annotation field data to the datasets table (in the `annotation_fields` column). 2387 If changes to the annotation fields affect existing annotations, 2388 this function will also call `update_annotations_via_fields()` to change them. 2389 2390 :param dict new_fields: New annotation fields, with a field ID as key. 2391 2392 :param bool add: Whether we're merely adding new fields 2393 or replacing the whole batch. If add is False, 2394 `new_fields` should contain all fields. 2395 2396 :return int: The number of annotation fields saved. 2397 2398 """ 2399 2400 # Get existing annotation fields to see if stuff changed. 2401 old_fields = self.annotation_fields 2402 changes = False 2403 2404 # Annotation field must be valid JSON. 2405 try: 2406 json.dumps(new_fields) 2407 except ValueError: 2408 raise AnnotationException( 2409 "Can't save annotation fields: not valid JSON (%s)" % new_fields 2410 ) 2411 2412 # No duplicate IDs 2413 if len(new_fields) != len(set(new_fields)): 2414 raise AnnotationException( 2415 "Can't save annotation fields: field IDs must be unique" 2416 ) 2417 2418 # Annotation fields must at minimum have `type` and `label` keys. 2419 for field_id, annotation_field in new_fields.items(): 2420 if not isinstance(field_id, str): 2421 raise AnnotationException( 2422 "Can't save annotation fields: field ID %s is not a valid string" 2423 % field_id 2424 ) 2425 if "label" not in annotation_field: 2426 raise AnnotationException( 2427 "Can't save annotation fields: all fields must have a label" 2428 % field_id 2429 ) 2430 if "type" not in annotation_field: 2431 raise AnnotationException( 2432 "Can't save annotation fields: all fields must have a type" 2433 % field_id 2434 ) 2435 2436 # Check if fields are removed 2437 if not add and old_fields: 2438 for field_id in old_fields.keys(): 2439 if field_id not in new_fields: 2440 changes = True 2441 2442 # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin 2443 # dataset 2444 for field_id in new_fields.keys(): 2445 if field_id in old_fields and old_fields[field_id].get("from_dataset"): 2446 old_fields[field_id]["label"] = new_fields[field_id][ 2447 "label" 2448 ] # Only labels could've been changed 2449 new_fields[field_id] = old_fields[field_id] 2450 2451 # If we're just adding fields, add them to the old fields. 2452 # If the field already exists, overwrite the old field. 2453 if add and old_fields: 2454 all_fields = old_fields 2455 for field_id, annotation_field in new_fields.items(): 2456 all_fields[field_id] = annotation_field 2457 new_fields = all_fields 2458 2459 # We're saving the new annotation fields as-is. 2460 # Ordering of fields is preserved this way. 2461 self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)}) 2462 self.annotation_fields = new_fields 2463 2464 # If anything changed with the annotation fields, possibly update 2465 # existing annotations (e.g. to delete them or change their labels). 2466 if changes: 2467 Annotation.update_annotations_via_fields( 2468 self.key, old_fields, new_fields, self.db 2469 ) 2470 2471 return len(new_fields)
Save annotation field data to the datasets table (in the annotation_fields column).
If changes to the annotation fields affect existing annotations,
this function will also call update_annotations_via_fields() to change them.
Parameters
dict new_fields: New annotation fields, with a field ID as key.
bool add: Whether we're merely adding new fields or replacing the whole batch. If add is False,
new_fieldsshould contain all fields.
Returns
The number of annotation fields saved.
2473 def get_annotation_metadata(self) -> dict: 2474 """ 2475 Retrieves all the data for this dataset from the annotations table. 2476 """ 2477 2478 annotation_data = self.db.fetchall( 2479 "SELECT * FROM annotations WHERE dataset = '%s';" % self.key 2480 ) 2481 return annotation_data
Retrieves all the data for this dataset from the annotations table.