common.lib.dataset
1import collections 2import itertools 3import datetime 4import fnmatch 5import random 6import shutil 7import json 8import time 9import csv 10import re 11 12from common.lib.annotation import Annotation 13from common.lib.job import Job, JobNotFoundException 14 15from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, call_api, hash_to_md5, convert_to_float 16from common.lib.item_mapping import MappedItem, DatasetItem 17from common.lib.fourcat_module import FourcatModule 18from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException, 19 MapItemException, MappedItemIncompleteException, AnnotationException) 20 21 22class DataSet(FourcatModule): 23 """ 24 Provide interface to safely register and run operations on a dataset 25 26 A dataset is a collection of: 27 - A unique identifier 28 - A set of parameters that demarcate the data contained within 29 - The data 30 31 The data is usually stored in a file on the disk; the parameters are stored 32 in a database. The handling of the data, et cetera, is done by other 33 workers; this class defines method to create and manipulate the dataset's 34 properties. 35 """ 36 37 # Attributes must be created here to ensure getattr and setattr work properly 38 data = None 39 key = "" 40 41 _children = None 42 available_processors = None 43 _genealogy = None 44 preset_parent = None 45 parameters = None 46 modules = None 47 annotation_fields = None 48 49 owners = None 50 tagged_owners = None 51 52 db = None 53 folder = None 54 is_new = True 55 56 no_status_updates = False 57 staging_areas = None 58 _queue_position = None 59 60 def __init__( 61 self, 62 parameters=None, 63 key=None, 64 job=None, 65 data=None, 66 db=None, 67 parent="", 68 extension=None, 69 type=None, 70 is_private=True, 71 owner="anonymous", 72 modules=None 73 ): 74 """ 75 Create new dataset object 76 77 If the dataset is not in the database yet, it is added. 78 79 :param dict parameters: Only when creating a new dataset. Dataset 80 parameters, free-form dictionary. 81 :param str key: Dataset key. If given, dataset with this key is loaded. 82 :param int job: Job ID. If given, dataset corresponding to job is 83 loaded. 84 :param dict data: Dataset data, corresponding to a row in the datasets 85 database table. If not given, retrieved from database depending on key. 86 :param db: Database connection 87 :param str parent: Only when creating a new dataset. Parent dataset 88 key to which the one being created is a child. 89 :param str extension: Only when creating a new dataset. Extension of 90 dataset result file. 91 :param str type: Only when creating a new dataset. Type of the dataset, 92 corresponding to the type property of a processor class. 93 :param bool is_private: Only when creating a new dataset. Whether the 94 dataset is private or public. 95 :param str owner: Only when creating a new dataset. The user name of 96 the dataset's creator. 97 :param modules: Module cache. If not given, will be loaded when needed 98 (expensive). Used to figure out what processors are compatible with 99 this dataset. 100 """ 101 self.db = db 102 103 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 104 self.data = {} 105 self.parameters = {} 106 self.available_processors = {} 107 self._genealogy = [] 108 self.staging_areas = [] 109 self.modules = modules 110 111 if key is not None: 112 self.key = key 113 current = self.db.fetchone( 114 "SELECT * FROM datasets WHERE key = %s", (self.key,) 115 ) 116 if not current: 117 raise DataSetNotFoundException( 118 "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" 119 % key 120 ) 121 122 elif job is not None: 123 current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),)) 124 if not current: 125 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 126 127 self.key = current["key"] 128 elif data is not None: 129 current = data 130 if ( 131 "query" not in data 132 or "key" not in data 133 or "parameters" not in data 134 or "key_parent" not in data 135 ): 136 raise DataSetException( 137 "DataSet() requires a complete dataset record for its 'data' argument" 138 ) 139 140 self.key = current["key"] 141 else: 142 if parameters is None: 143 raise DataSetException( 144 "DataSet() requires either 'key', or 'parameters' to be given" 145 ) 146 147 if not type: 148 raise DataSetException("Datasets must have their type set explicitly") 149 150 query = self.get_label(parameters, default=type) 151 self.key = self.get_key(query, parameters, parent) 152 current = self.db.fetchone( 153 "SELECT * FROM datasets WHERE key = %s AND query = %s", 154 (self.key, query), 155 ) 156 157 if current: 158 self.data = current 159 self.parameters = json.loads(self.data["parameters"]) 160 self.annotation_fields = json.loads(self.data["annotation_fields"]) \ 161 if self.data.get("annotation_fields") else {} 162 self.is_new = False 163 else: 164 self.data = {"type": type} # get_own_processor needs this 165 own_processor = self.get_own_processor() 166 version = get_software_commit(own_processor) 167 self.data = { 168 "key": self.key, 169 "query": self.get_label(parameters, default=type), 170 "parameters": json.dumps(parameters), 171 "result_file": "", 172 "creator": owner, 173 "status": "", 174 "type": type, 175 "timestamp": int(time.time()), 176 "is_finished": False, 177 "is_private": is_private, 178 "software_version": version[0], 179 "software_source": version[1], 180 "software_file": "", 181 "num_rows": 0, 182 "progress": 0.0, 183 "key_parent": parent, 184 "annotation_fields": "{}" 185 } 186 self.parameters = parameters 187 self.annotation_fields = {} 188 189 self.db.insert("datasets", data=self.data) 190 self.refresh_owners() 191 self.add_owner(owner) 192 193 # Find desired extension from processor if not explicitly set 194 if extension is None: 195 if own_processor: 196 extension = own_processor.get_extension( 197 parent_dataset=DataSet(key=parent, db=db, modules=self.modules) 198 if parent 199 else None 200 ) 201 # Still no extension, default to 'csv' 202 if not extension: 203 extension = "csv" 204 205 # Reserve filename and update data['result_file'] 206 self.reserve_result_file(parameters, extension) 207 208 self.refresh_owners() 209 210 def check_dataset_finished(self): 211 """ 212 Checks if dataset is finished. Returns path to results file is not empty, 213 or 'empty_file' when there were not matches. 214 215 Only returns a path if the dataset is complete. In other words, if this 216 method returns a path, a file with the complete results for this dataset 217 will exist at that location. 218 219 :return: A path to the results file, 'empty_file', or `None` 220 """ 221 if self.data["is_finished"] and self.data["num_rows"] > 0: 222 return self.get_results_path() 223 elif self.data["is_finished"] and self.data["num_rows"] == 0: 224 return 'empty' 225 else: 226 return None 227 228 def get_results_path(self): 229 """ 230 Get path to results file 231 232 Always returns a path, that will at some point contain the dataset 233 data, but may not do so yet. Use this to get the location to write 234 generated results to. 235 236 :return Path: A path to the results file 237 """ 238 # alas we need to instantiate a config reader here - no way around it 239 if not self.folder: 240 self.folder = self.modules.config.get('PATH_ROOT').joinpath(self.modules.config.get('PATH_DATA')) 241 return self.folder.joinpath(self.data["result_file"]) 242 243 def get_results_folder_path(self): 244 """ 245 Get path to folder containing accompanying results 246 247 Returns a path that may not yet be created 248 249 :return Path: A path to the results file 250 """ 251 return self.get_results_path().parent.joinpath("folder_" + self.key) 252 253 def get_log_path(self): 254 """ 255 Get path to dataset log file 256 257 Each dataset has a single log file that documents its creation. This 258 method returns the path to that file. It is identical to the path of 259 the dataset result file, with 'log' as its extension instead. 260 261 :return Path: A path to the log file 262 """ 263 return self.get_results_path().with_suffix(".log") 264 265 def clear_log(self): 266 """ 267 Clears the dataset log file 268 269 If the log file does not exist, it is created empty. The log file will 270 have the same file name as the dataset result file, with the 'log' 271 extension. 272 """ 273 log_path = self.get_log_path() 274 with log_path.open("w"): 275 pass 276 277 def log(self, log): 278 """ 279 Write log message to file 280 281 Writes the log message to the log file on a new line, including a 282 timestamp at the start of the line. Note that this assumes the log file 283 already exists - it should have been created/cleared with clear_log() 284 prior to calling this. 285 286 :param str log: Log message to write 287 """ 288 log_path = self.get_log_path() 289 with log_path.open("a", encoding="utf-8") as outfile: 290 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log)) 291 292 def _iterate_items(self, processor=None): 293 """ 294 A generator that iterates through a CSV or NDJSON file 295 296 This is an internal method and should not be called directly. Rather, 297 call iterate_items() and use the generated dictionary and its properties. 298 299 If a reference to a processor is provided, with every iteration, 300 the processor's 'interrupted' flag is checked, and if set a 301 ProcessorInterruptedException is raised, which by default is caught 302 in the worker and subsequently stops execution gracefully. 303 304 There are two file types that can be iterated (currently): CSV files 305 and NDJSON (newline-delimited JSON) files. In the future, one could 306 envision adding a pathway to retrieve items from e.g. a MongoDB 307 collection directly instead of from a static file 308 309 :param BasicProcessor processor: A reference to the processor 310 iterating the dataset. 311 :return generator: A generator that yields each item as a dictionary 312 """ 313 path = self.get_results_path() 314 315 316 # Yield through items one by one 317 if path.suffix.lower() == ".csv": 318 with path.open("rb") as infile: 319 wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8") 320 reader = csv.DictReader(wrapped_infile) 321 322 if not self.get_own_processor(): 323 # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not 324 first_item = next(reader) 325 if first_item is None or any( 326 [True for key in first_item if type(key) is not str] 327 ): 328 raise NotImplementedError( 329 f"Cannot iterate through CSV file (deprecated processor {self.type})" 330 ) 331 yield first_item 332 333 for item in reader: 334 if hasattr(processor, "interrupted") and processor.interrupted: 335 raise ProcessorInterruptedException( 336 "Processor interrupted while iterating through CSV file" 337 ) 338 339 yield item 340 341 elif path.suffix.lower() == ".ndjson": 342 # In NDJSON format each line in the file is a self-contained JSON 343 with path.open(encoding="utf-8") as infile: 344 for line in infile: 345 if hasattr(processor, "interrupted") and processor.interrupted: 346 raise ProcessorInterruptedException( 347 "Processor interrupted while iterating through NDJSON file" 348 ) 349 350 yield json.loads(line) 351 352 else: 353 raise NotImplementedError("Cannot iterate through %s file" % path.suffix) 354 355 def iterate_items( 356 self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None 357 ): 358 """ 359 Generate mapped dataset items 360 361 Wrapper for _iterate_items that returns a DatasetItem, which can be 362 accessed as a dict returning the original item or (if a mapper is 363 available) the mapped item. Mapped or original versions of the item can 364 also be accessed via the `original` and `mapped_object` properties of 365 the DatasetItem. 366 367 Processors can define a method called `map_item` that can be used to map 368 an item from the dataset file before it is processed any further. This is 369 slower than storing the data file in the right format to begin with but 370 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 371 are nested objects when retrieved from the twitter API that are easier 372 to store as a JSON file than as a flat CSV file, and it would be a shame 373 to throw away that data. 374 375 Note the two parameters warn_unmappable and map_missing. Items can be 376 unmappable in that their structure is too different to coerce into a 377 neat dictionary of the structure the data source expects. This makes it 378 'unmappable' and warn_unmappable determines what happens in this case. 379 It can also be of the right structure, but with some fields missing or 380 incomplete. map_missing determines what happens in that case. The 381 latter is for example possible when importing data via Zeeschuimer, 382 which produces unstably-structured data captured from social media 383 sites. 384 385 :param BasicProcessor processor: A reference to the processor 386 iterating the dataset. 387 :param bool warn_unmappable: If an item is not mappable, skip the item 388 and log a warning 389 :param max_unmappable: Skip at most this many unmappable items; if 390 more are encountered, stop iterating. `None` to never stop. 391 :param map_missing: Indicates what to do with mapped items for which 392 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 393 - 'default': fill missing fields with the default passed by map_item 394 - 'abort': raise a MappedItemIncompleteException if a field is missing 395 - a callback: replace missing field with the return value of the 396 callback. The MappedItem object is passed to the callback as the 397 first argument and the name of the missing field as the second. 398 - a dictionary with a key for each possible missing field: replace missing 399 field with a strategy for that field ('default', 'abort', or a callback) 400 :param get_annotations: Whether to also fetch annotations from the database. 401 This can be disabled to help speed up iteration. 402 :return generator: A generator that yields DatasetItems 403 """ 404 unmapped_items = 0 405 # Collect item_mapper for use with filter 406 item_mapper = False 407 own_processor = self.get_own_processor() 408 if own_processor and own_processor.map_item_method_available(dataset=self): 409 item_mapper = True 410 411 # Annotations are dynamically added, and we're handling them as 'extra' map_item fields. 412 if get_annotations: 413 annotation_fields = self.annotation_fields 414 num_annotations = 0 if not annotation_fields else self.num_annotations() 415 all_annotations = None 416 # If this dataset has less than n annotations, just retrieve them all before iterating 417 if 0 < num_annotations <= 500: 418 # Convert to dict for faster lookup when iterating over items 419 all_annotations = collections.defaultdict(list) 420 for annotation in self.get_annotations(): 421 all_annotations[annotation.item_id].append(annotation) 422 423 # missing field strategy can be for all fields at once, or per field 424 # if it is per field, it is a dictionary with field names and their strategy 425 # if it is for all fields, it may be a callback, 'abort', or 'default' 426 default_strategy = "default" 427 if type(map_missing) is not dict: 428 default_strategy = map_missing 429 map_missing = {} 430 431 # Loop through items 432 for i, item in enumerate(self._iterate_items(processor)): 433 # Save original to yield 434 original_item = item.copy() 435 436 # Map item 437 if item_mapper: 438 try: 439 mapped_item = own_processor.get_mapped_item(item) 440 except MapItemException as e: 441 if warn_unmappable: 442 self.warn_unmappable_item( 443 i, processor, e, warn_admins=unmapped_items is False 444 ) 445 446 unmapped_items += 1 447 if max_unmappable and unmapped_items > max_unmappable: 448 break 449 else: 450 continue 451 452 # check if fields have been marked as 'missing' in the 453 # underlying data, and treat according to the chosen strategy 454 if mapped_item.get_missing_fields(): 455 for missing_field in mapped_item.get_missing_fields(): 456 strategy = map_missing.get(missing_field, default_strategy) 457 458 if callable(strategy): 459 # delegate handling to a callback 460 mapped_item.data[missing_field] = strategy( 461 mapped_item.data, missing_field 462 ) 463 elif strategy == "abort": 464 # raise an exception to be handled at the processor level 465 raise MappedItemIncompleteException( 466 f"Cannot process item, field {missing_field} missing in source data." 467 ) 468 elif strategy == "default": 469 # use whatever was passed to the object constructor 470 mapped_item.data[missing_field] = mapped_item.data[ 471 missing_field 472 ].value 473 else: 474 raise ValueError( 475 "map_missing must be 'abort', 'default', or a callback." 476 ) 477 else: 478 mapped_item = original_item 479 480 # Add possible annotation values 481 if get_annotations and annotation_fields: 482 # We're always handling annotated data as a MappedItem object, 483 # even if no map_item() function is available for the data source. 484 if not isinstance(mapped_item, MappedItem): 485 mapped_item = MappedItem(mapped_item) 486 487 # Retrieve from all annotations 488 if all_annotations: 489 item_annotations = all_annotations.get(mapped_item.data["id"]) 490 # Or get annotations per item for large datasets (more queries but less memory usage) 491 else: 492 item_annotations = self.get_annotations_for_item( 493 mapped_item.data["id"] 494 ) 495 496 # Loop through annotation fields 497 for ( 498 annotation_field_id, 499 annotation_field_items, 500 ) in annotation_fields.items(): 501 # Set default value to an empty string 502 value = "" 503 # Set value if this item has annotations 504 if item_annotations: 505 # Items can have multiple annotations 506 for annotation in item_annotations: 507 if annotation.field_id == annotation_field_id: 508 value = annotation.value 509 if isinstance(value, list): 510 value = ",".join(value) 511 512 # Make sure labels are unique when iterating through items 513 column_name = annotation_field_items["label"] 514 label_count = 1 515 while column_name in mapped_item.data: 516 label_count += 1 517 column_name = f"{annotation_field_items['label']}_{label_count}" 518 519 # We're always adding an annotation value 520 # as an empty string, even if it's absent. 521 mapped_item.data[column_name] = value 522 523 # yield a DatasetItem, which is a dict with some special properties 524 yield DatasetItem( 525 mapper=item_mapper, 526 original=original_item, 527 mapped_object=mapped_item, 528 **( 529 mapped_item.get_item_data() 530 if type(mapped_item) is MappedItem 531 else mapped_item 532 ), 533 ) 534 535 def sort_and_iterate_items( 536 self, sort="", reverse=False, chunk_size=50000, **kwargs 537 ) -> dict: 538 """ 539 Loop through items in a dataset, sorted by a given key. 540 541 This is a wrapper function for `iterate_items()` with the 542 added functionality of sorting a dataset. 543 544 :param sort: The item key that determines the sort order. 545 :param reverse: Whether to sort by largest values first. 546 547 :returns dict: Yields iterated post 548 """ 549 550 def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False): 551 """ 552 Sort items based on the given key and order. 553 554 :param items_to_sort: The items to sort 555 :param sort_key: The key to sort by 556 :param reverse: Whether to sort in reverse order 557 :return: Sorted items 558 """ 559 if reverse is False and (sort_key == "dataset-order" or sort_key == ""): 560 # Sort by dataset order 561 yield from items_to_sort 562 elif sort_key == "dataset-order" and reverse: 563 # Sort by dataset order in reverse 564 yield from reversed(list(items_to_sort)) 565 else: 566 # Sort on the basis of a column value 567 if not convert_sort_to_float: 568 yield from sorted( 569 items_to_sort, 570 key=lambda x: x.get(sort_key, ""), 571 reverse=reverse, 572 ) 573 else: 574 # Dataset fields can contain integers and empty strings. 575 # Since these cannot be compared, we will convert every 576 # empty string to 0. 577 yield from sorted( 578 items_to_sort, 579 key=lambda x: convert_to_float(x.get(sort_key, ""), force=True), 580 reverse=reverse, 581 ) 582 583 if self.num_rows < chunk_size: 584 try: 585 # First try to force-sort float values. If this doesn't work, it'll be alphabetical. 586 yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True) 587 except (TypeError, ValueError): 588 yield from sort_items( 589 self.iterate_items(**kwargs), 590 sort, 591 reverse, 592 convert_sort_to_float=False, 593 ) 594 595 else: 596 # For large datasets, we will use chunk sorting 597 staging_area = self.get_staging_area() 598 buffer = [] 599 chunk_files = [] 600 convert_sort_to_float = True 601 fieldnames = self.get_columns() 602 603 def write_chunk(buffer, chunk_index): 604 """ 605 Write a chunk of data to a temporary file 606 607 :param buffer: The buffer containing the chunk of data 608 :param chunk_index: The index of the chunk 609 :return: The path to the temporary file 610 """ 611 temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv") 612 with temp_file.open("w", encoding="utf-8") as chunk_file: 613 writer = csv.DictWriter(chunk_file, fieldnames=fieldnames) 614 writer.writeheader() 615 writer.writerows(buffer) 616 return temp_file 617 618 # Divide the dataset into sorted chunks 619 for item in self.iterate_items(**kwargs): 620 buffer.append(item) 621 if len(buffer) >= chunk_size: 622 try: 623 buffer = list( 624 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 625 ) 626 except (TypeError, ValueError): 627 convert_sort_to_float = False 628 buffer = list( 629 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 630 ) 631 632 chunk_files.append(write_chunk(buffer, len(chunk_files))) 633 buffer.clear() 634 635 # Sort and write any remaining items in the buffer 636 if buffer: 637 buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float)) 638 chunk_files.append(write_chunk(buffer, len(chunk_files))) 639 buffer.clear() 640 641 # Merge sorted chunks into the final sorted file 642 sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv") 643 with sorted_file.open("w", encoding="utf-8") as outfile: 644 writer = csv.DictWriter(outfile, fieldnames=self.get_columns()) 645 writer.writeheader() 646 647 # Open all chunk files for reading 648 chunk_readers = [ 649 csv.DictReader(chunk.open("r", encoding="utf-8")) 650 for chunk in chunk_files 651 ] 652 heap = [] 653 654 # Initialize the heap with the first row from each chunk 655 for i, reader in enumerate(chunk_readers): 656 try: 657 row = next(reader) 658 if sort == "dataset-order" and reverse: 659 # Use a reverse index for "dataset-order" and reverse=True 660 sort_key = -i 661 elif convert_sort_to_float: 662 # Negate numeric keys for reverse sorting 663 sort_key = ( 664 -convert_to_float(row.get(sort, "")) 665 if reverse 666 else convert_to_float(row.get(sort, "")) 667 ) 668 else: 669 if reverse: 670 # For reverse string sorting, invert string comparison by creating a tuple 671 # with an inverted string - this makes Python's tuple comparison work in reverse 672 sort_key = ( 673 tuple(-ord(c) for c in row.get(sort, "")), 674 -i, 675 ) 676 else: 677 sort_key = (row.get(sort, ""), i) 678 heap.append((sort_key, i, row)) 679 except StopIteration: 680 pass 681 682 # Use a heap to merge sorted chunks 683 import heapq 684 685 heapq.heapify(heap) 686 while heap: 687 _, chunk_index, smallest_row = heapq.heappop(heap) 688 writer.writerow(smallest_row) 689 try: 690 next_row = next(chunk_readers[chunk_index]) 691 if sort == "dataset-order" and reverse: 692 # Use a reverse index for "dataset-order" and reverse=True 693 sort_key = -chunk_index 694 elif convert_sort_to_float: 695 sort_key = ( 696 -convert_to_float(next_row.get(sort, "")) 697 if reverse 698 else convert_to_float(next_row.get(sort, "")) 699 ) 700 else: 701 # Use the same inverted comparison for string values 702 if reverse: 703 sort_key = ( 704 tuple(-ord(c) for c in next_row.get(sort, "")), 705 -chunk_index, 706 ) 707 else: 708 sort_key = (next_row.get(sort, ""), chunk_index) 709 heapq.heappush(heap, (sort_key, chunk_index, next_row)) 710 except StopIteration: 711 pass 712 713 # Read the sorted file and yield each item 714 with sorted_file.open("r", encoding="utf-8") as infile: 715 reader = csv.DictReader(infile) 716 for item in reader: 717 yield item 718 719 # Remove the temporary files 720 if staging_area.is_dir(): 721 shutil.rmtree(staging_area) 722 723 def get_staging_area(self): 724 """ 725 Get path to a temporary folder in which files can be stored before 726 finishing 727 728 This folder must be created before use, but is guaranteed to not exist 729 yet. The folder may be used as a staging area for the dataset data 730 while it is being processed. 731 732 :return Path: Path to folder 733 """ 734 results_file = self.get_results_path() 735 736 results_dir_base = results_file.parent 737 results_dir = results_file.name.replace(".", "") + "-staging" 738 results_path = results_dir_base.joinpath(results_dir) 739 index = 1 740 while results_path.exists(): 741 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 742 index += 1 743 744 # create temporary folder 745 results_path.mkdir() 746 747 # Storing the staging area with the dataset so that it can be removed later 748 self.staging_areas.append(results_path) 749 750 return results_path 751 752 def remove_staging_areas(self): 753 """ 754 Remove any staging areas that were created and all files contained in them. 755 """ 756 # Remove DataSet staging areas 757 if self.staging_areas: 758 for staging_area in self.staging_areas: 759 if staging_area.is_dir(): 760 shutil.rmtree(staging_area) 761 762 def finish(self, num_rows=0): 763 """ 764 Declare the dataset finished 765 """ 766 if self.data["is_finished"]: 767 raise RuntimeError("Cannot finish a finished dataset again") 768 769 self.db.update( 770 "datasets", 771 where={"key": self.data["key"]}, 772 data={ 773 "is_finished": True, 774 "num_rows": num_rows, 775 "progress": 1.0, 776 "timestamp_finished": int(time.time()), 777 }, 778 ) 779 self.data["is_finished"] = True 780 self.data["num_rows"] = num_rows 781 782 def copy(self, shallow=True): 783 """ 784 Copies the dataset, making a new version with a unique key 785 786 787 :param bool shallow: Shallow copy: does not copy the result file, but 788 instead refers to the same file as the original dataset did 789 :return Dataset: Copied dataset 790 """ 791 parameters = self.parameters.copy() 792 793 # a key is partially based on the parameters. so by setting these extra 794 # attributes, we also ensure a unique key will be generated for the 795 # copy 796 # possibly todo: don't use time for uniqueness (but one shouldn't be 797 # copying a dataset multiple times per microsecond, that's not what 798 # this is for) 799 parameters["copied_from"] = self.key 800 parameters["copied_at"] = time.time() 801 802 copy = DataSet( 803 parameters=parameters, 804 db=self.db, 805 extension=self.result_file.split(".")[-1], 806 type=self.type, 807 modules=self.modules, 808 ) 809 for field in self.data: 810 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 811 continue 812 813 copy.__setattr__(field, self.data[field]) 814 815 if shallow: 816 # use the same result file 817 copy.result_file = self.result_file 818 else: 819 # copy to new file with new key 820 shutil.copy(self.get_results_path(), copy.get_results_path()) 821 822 if self.is_finished(): 823 copy.finish(self.num_rows) 824 825 # make sure ownership is also copied 826 copy.copy_ownership_from(self) 827 828 return copy 829 830 def delete(self, commit=True, queue=None): 831 """ 832 Delete the dataset, and all its children 833 834 Deletes both database records and result files. Note that manipulating 835 a dataset object after it has been deleted is undefined behaviour. 836 837 :param bool commit: Commit SQL DELETE query? 838 """ 839 # first, recursively delete children 840 children = self.db.fetchall( 841 "SELECT * FROM datasets WHERE key_parent = %s", (self.key,) 842 ) 843 for child in children: 844 try: 845 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 846 child.delete(commit=commit) 847 except DataSetException: 848 # dataset already deleted - race condition? 849 pass 850 851 # delete any queued jobs for this dataset 852 try: 853 job = Job.get_by_remote_ID(self.key, self.db, self.type) 854 if job.is_claimed: 855 # tell API to stop any jobs running for this dataset 856 # level 2 = cancel job 857 # we're not interested in the result - if the API is available, 858 # it will do its thing, if it's not the backend is probably not 859 # running so the job also doesn't need to be interrupted 860 call_api( 861 "cancel-job", 862 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 863 False, 864 ) 865 866 # this deletes the job from the database 867 job.finish(True) 868 869 except JobNotFoundException: 870 pass 871 872 # delete this dataset's own annotations 873 self.db.delete("annotations", where={"dataset": self.key}, commit=commit) 874 # delete annotations that have been generated as part of this dataset 875 self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit) 876 # delete annotation fields from other datasets that were created as part of this dataset 877 for parent_dataset in self.get_genealogy(): 878 field_deleted = False 879 annotation_fields = parent_dataset.annotation_fields 880 if annotation_fields: 881 for field_id in list(annotation_fields.keys()): 882 if annotation_fields[field_id].get("from_dataset", "") == self.key: 883 del annotation_fields[field_id] 884 field_deleted = True 885 if field_deleted: 886 parent_dataset.save_annotation_fields(annotation_fields) 887 888 # delete dataset from database 889 self.db.delete("datasets", where={"key": self.key}, commit=commit) 890 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 891 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 892 893 # delete from drive 894 try: 895 if self.get_results_path().exists(): 896 self.get_results_path().unlink() 897 if self.get_results_path().with_suffix(".log").exists(): 898 self.get_results_path().with_suffix(".log").unlink() 899 if self.get_results_folder_path().exists(): 900 shutil.rmtree(self.get_results_folder_path()) 901 902 except FileNotFoundError: 903 # already deleted, apparently 904 pass 905 except PermissionError as e: 906 self.db.log.error( 907 f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}" 908 ) 909 910 def update_children(self, **kwargs): 911 """ 912 Update an attribute for all child datasets 913 914 Can be used to e.g. change the owner, version, finished status for all 915 datasets in a tree 916 917 :param kwargs: Parameters corresponding to known dataset attributes 918 """ 919 for child in self.get_children(update=True): 920 for attr, value in kwargs.items(): 921 child.__setattr__(attr, value) 922 923 child.update_children(**kwargs) 924 925 def is_finished(self): 926 """ 927 Check if dataset is finished 928 :return bool: 929 """ 930 return bool(self.data["is_finished"]) 931 932 def is_rankable(self, multiple_items=True): 933 """ 934 Determine if a dataset is rankable 935 936 Rankable means that it is a CSV file with 'date' and 'value' columns 937 as well as one or more item label columns 938 939 :param bool multiple_items: Consider datasets with multiple items per 940 item (e.g. word_1, word_2, etc)? 941 942 :return bool: Whether the dataset is rankable or not 943 """ 944 if ( 945 self.get_results_path().suffix != ".csv" 946 or not self.get_results_path().exists() 947 ): 948 return False 949 950 column_options = {"date", "value", "item"} 951 if multiple_items: 952 column_options.add("word_1") 953 954 with self.get_results_path().open(encoding="utf-8") as infile: 955 reader = csv.DictReader(infile) 956 try: 957 return len(set(reader.fieldnames) & column_options) >= 3 958 except (TypeError, ValueError): 959 return False 960 961 def is_accessible_by(self, username, role="owner"): 962 """ 963 Check if dataset has given user as owner 964 965 :param str|User username: Username to check for 966 :return bool: 967 """ 968 if type(username) is not str: 969 if hasattr(username, "get_id"): 970 username = username.get_id() 971 else: 972 raise TypeError("User must be a str or User object") 973 974 # 'normal' owners 975 if username in [ 976 owner 977 for owner, meta in self.owners.items() 978 if (role is None or meta["role"] == role) 979 ]: 980 return True 981 982 # owners that are owner by being part of a tag 983 if username in itertools.chain( 984 *[ 985 tagged_owners 986 for tag, tagged_owners in self.tagged_owners.items() 987 if (role is None or self.owners[f"tag:{tag}"]["role"] == role) 988 ] 989 ): 990 return True 991 992 return False 993 994 def get_owners_users(self, role="owner"): 995 """ 996 Get list of dataset owners 997 998 This returns a list of *users* that are considered owners. Tags are 999 transparently replaced with the users with that tag. 1000 1001 :param str|None role: Role to check for. If `None`, all owners are 1002 returned regardless of role. 1003 1004 :return set: De-duplicated owner list 1005 """ 1006 # 'normal' owners 1007 owners = [ 1008 owner 1009 for owner, meta in self.owners.items() 1010 if (role is None or meta["role"] == role) and not owner.startswith("tag:") 1011 ] 1012 1013 # owners that are owner by being part of a tag 1014 owners.extend( 1015 itertools.chain( 1016 *[ 1017 tagged_owners 1018 for tag, tagged_owners in self.tagged_owners.items() 1019 if role is None or self.owners[f"tag:{tag}"]["role"] == role 1020 ] 1021 ) 1022 ) 1023 1024 # de-duplicate before returning 1025 return set(owners) 1026 1027 def get_owners(self, role="owner"): 1028 """ 1029 Get list of dataset owners 1030 1031 This returns a list of all owners, and does not transparently resolve 1032 tags (like `get_owners_users` does). 1033 1034 :param str|None role: Role to check for. If `None`, all owners are 1035 returned regardless of role. 1036 1037 :return set: De-duplicated owner list 1038 """ 1039 return [ 1040 owner 1041 for owner, meta in self.owners.items() 1042 if (role is None or meta["role"] == role) 1043 ] 1044 1045 def add_owner(self, username, role="owner"): 1046 """ 1047 Set dataset owner 1048 1049 If the user is already an owner, but with a different role, the role is 1050 updated. If the user is already an owner with the same role, nothing happens. 1051 1052 :param str|User username: Username to set as owner 1053 :param str|None role: Role to add user with. 1054 """ 1055 if type(username) is not str: 1056 if hasattr(username, "get_id"): 1057 username = username.get_id() 1058 else: 1059 raise TypeError("User must be a str or User object") 1060 1061 if username not in self.owners: 1062 self.owners[username] = {"name": username, "key": self.key, "role": role} 1063 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 1064 1065 elif username in self.owners and self.owners[username]["role"] != role: 1066 self.db.update( 1067 "datasets_owners", 1068 data={"role": role}, 1069 where={"name": username, "key": self.key}, 1070 ) 1071 self.owners[username]["role"] = role 1072 1073 if username.startswith("tag:"): 1074 # this is a bit more complicated than just adding to the list of 1075 # owners, so do a full refresh 1076 self.refresh_owners() 1077 1078 # make sure children's owners remain in sync 1079 for child in self.get_children(update=True): 1080 child.add_owner(username, role) 1081 # not recursive, since we're calling it from recursive code! 1082 child.copy_ownership_from(self, recursive=False) 1083 1084 def remove_owner(self, username): 1085 """ 1086 Remove dataset owner 1087 1088 If no owner is set, the dataset is assigned to the anonymous user. 1089 If the user is not an owner, nothing happens. 1090 1091 :param str|User username: Username to set as owner 1092 """ 1093 if type(username) is not str: 1094 if hasattr(username, "get_id"): 1095 username = username.get_id() 1096 else: 1097 raise TypeError("User must be a str or User object") 1098 1099 if username in self.owners: 1100 del self.owners[username] 1101 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 1102 1103 if not self.owners: 1104 self.add_owner("anonymous") 1105 1106 if username in self.tagged_owners: 1107 del self.tagged_owners[username] 1108 1109 # make sure children's owners remain in sync 1110 for child in self.get_children(update=True): 1111 child.remove_owner(username) 1112 # not recursive, since we're calling it from recursive code! 1113 child.copy_ownership_from(self, recursive=False) 1114 1115 def refresh_owners(self): 1116 """ 1117 Update internal owner cache 1118 1119 This makes sure that the list of *users* and *tags* which can access the 1120 dataset is up to date. 1121 """ 1122 self.owners = { 1123 owner["name"]: owner 1124 for owner in self.db.fetchall( 1125 "SELECT * FROM datasets_owners WHERE key = %s", (self.key,) 1126 ) 1127 } 1128 1129 # determine which users (if any) are owners of the dataset by having a 1130 # tag that is listed as an owner 1131 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 1132 if owner_tags: 1133 tagged_owners = self.db.fetchall( 1134 "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,) 1135 ) 1136 self.tagged_owners = { 1137 owner_tag: [ 1138 user["name"] for user in tagged_owners if owner_tag in user["tags"] 1139 ] 1140 for owner_tag in owner_tags 1141 } 1142 else: 1143 self.tagged_owners = {} 1144 1145 def copy_ownership_from(self, dataset, recursive=True): 1146 """ 1147 Copy ownership 1148 1149 This is useful to e.g. make sure a dataset's ownership stays in sync 1150 with its parent 1151 1152 :param Dataset dataset: Parent to copy from 1153 :return: 1154 """ 1155 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 1156 1157 for role in ("owner", "viewer"): 1158 owners = dataset.get_owners(role=role) 1159 for owner in owners: 1160 self.db.insert( 1161 "datasets_owners", 1162 data={"key": self.key, "name": owner, "role": role}, 1163 commit=False, 1164 safe=True, 1165 ) 1166 1167 self.db.commit() 1168 if recursive: 1169 for child in self.get_children(update=True): 1170 child.copy_ownership_from(self, recursive=recursive) 1171 1172 def get_parameters(self): 1173 """ 1174 Get dataset parameters 1175 1176 The dataset parameters are stored as JSON in the database - parse them 1177 and return the resulting object 1178 1179 :return: Dataset parameters as originally stored 1180 """ 1181 try: 1182 return json.loads(self.data["parameters"]) 1183 except json.JSONDecodeError: 1184 return {} 1185 1186 def get_columns(self): 1187 """ 1188 Returns the dataset columns. 1189 1190 Useful for processor input forms. Can deal with both CSV and NDJSON 1191 files, the latter only if a `map_item` function is available in the 1192 processor that generated it. While in other cases one could use the 1193 keys of the JSON object, this is not always possible in follow-up code 1194 that uses the 'column' names, so for consistency this function acts as 1195 if no column can be parsed if no `map_item` function exists. 1196 1197 :return list: List of dataset columns; empty list if unable to parse 1198 """ 1199 if not self.get_results_path().exists(): 1200 # no file to get columns from 1201 return [] 1202 1203 if (self.get_results_path().suffix.lower() == ".csv") or ( 1204 self.get_results_path().suffix.lower() == ".ndjson" 1205 and self.get_own_processor() is not None 1206 and self.get_own_processor().map_item_method_available(dataset=self) 1207 ): 1208 items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100) 1209 try: 1210 keys = list(next(items).keys()) 1211 if self.annotation_fields: 1212 for annotation_field in self.annotation_fields.values(): 1213 annotation_column = annotation_field["label"] 1214 label_count = 1 1215 while annotation_column in keys: 1216 label_count += 1 1217 annotation_column = ( 1218 f"{annotation_field['label']}_{label_count}" 1219 ) 1220 keys.append(annotation_column) 1221 columns = keys 1222 except (StopIteration, NotImplementedError): 1223 # No items or otherwise unable to iterate 1224 columns = [] 1225 finally: 1226 del items 1227 else: 1228 # Filetype not CSV or an NDJSON with `map_item` 1229 columns = [] 1230 1231 return columns 1232 1233 def update_label(self, label): 1234 """ 1235 Update label for this dataset 1236 1237 :param str label: New label 1238 :return str: The new label, as returned by get_label 1239 """ 1240 self.parameters["label"] = label 1241 1242 self.db.update( 1243 "datasets", 1244 data={"parameters": json.dumps(self.parameters)}, 1245 where={"key": self.key}, 1246 ) 1247 return self.get_label() 1248 1249 def get_label(self, parameters=None, default="Query"): 1250 """ 1251 Generate a readable label for the dataset 1252 1253 :param dict parameters: Parameters of the dataset 1254 :param str default: Label to use if it cannot be inferred from the 1255 parameters 1256 1257 :return str: Label 1258 """ 1259 if not parameters: 1260 parameters = self.parameters 1261 1262 if parameters.get("label"): 1263 return parameters["label"] 1264 elif parameters.get("body_query") and parameters["body_query"] != "empty": 1265 return parameters["body_query"] 1266 elif parameters.get("body_match") and parameters["body_match"] != "empty": 1267 return parameters["body_match"] 1268 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 1269 return parameters["subject_query"] 1270 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 1271 return parameters["subject_match"] 1272 elif parameters.get("query"): 1273 label = parameters["query"] 1274 # Some legacy datasets have lists as query data 1275 if isinstance(label, list): 1276 label = ", ".join(label) 1277 1278 label = label if len(label) < 30 else label[:25] + "..." 1279 label = label.strip().replace("\n", ", ") 1280 return label 1281 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 1282 return "Flag: %s" % parameters["country_flag"] 1283 elif parameters.get("country_name") and parameters["country_name"] != "all": 1284 return "Country: %s" % parameters["country_name"] 1285 elif parameters.get("filename"): 1286 return parameters["filename"] 1287 elif parameters.get("board") and "datasource" in parameters: 1288 return parameters["datasource"] + "/" + parameters["board"] 1289 elif ( 1290 "datasource" in parameters 1291 and parameters["datasource"] in self.modules.datasources 1292 ): 1293 return ( 1294 self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 1295 ) 1296 else: 1297 return default 1298 1299 def change_datasource(self, datasource): 1300 """ 1301 Change the datasource type for this dataset 1302 1303 :param str label: New datasource type 1304 :return str: The new datasource type 1305 """ 1306 1307 self.parameters["datasource"] = datasource 1308 1309 self.db.update( 1310 "datasets", 1311 data={"parameters": json.dumps(self.parameters)}, 1312 where={"key": self.key}, 1313 ) 1314 return datasource 1315 1316 def reserve_result_file(self, parameters=None, extension="csv"): 1317 """ 1318 Generate a unique path to the results file for this dataset 1319 1320 This generates a file name for the data file of this dataset, and makes sure 1321 no file exists or will exist at that location other than the file we 1322 expect (i.e. the data for this particular dataset). 1323 1324 :param str extension: File extension, "csv" by default 1325 :param parameters: Dataset parameters 1326 :return bool: Whether the file path was successfully reserved 1327 """ 1328 if self.data["is_finished"]: 1329 raise RuntimeError("Cannot reserve results file for a finished dataset") 1330 1331 # Use 'random' for random post queries 1332 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 1333 file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"] 1334 # Use country code for country flag queries 1335 elif "country_flag" in parameters and parameters["country_flag"] != "all": 1336 file = ( 1337 "countryflag-" 1338 + str(parameters["country_flag"]) 1339 + "-" 1340 + self.data["key"] 1341 ) 1342 # Use the query string for all other queries 1343 else: 1344 query_bit = self.data["query"].replace(" ", "-").lower() 1345 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 1346 query_bit = query_bit[:100] # Crop to avoid OSError 1347 file = query_bit + "-" + self.data["key"] 1348 file = re.sub(r"[-]+", "-", file) 1349 1350 self.data["result_file"] = file + "." + extension.lower() 1351 index = 1 1352 while self.get_results_path().is_file(): 1353 self.data["result_file"] = file + "-" + str(index) + "." + extension.lower() 1354 index += 1 1355 1356 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 1357 data={"result_file": self.data["result_file"]}) 1358 return updated > 0 1359 1360 def get_key(self, query, parameters, parent="", time_offset=0): 1361 """ 1362 Generate a unique key for this dataset that can be used to identify it 1363 1364 The key is a hash of a combination of the query string and parameters. 1365 You never need to call this, really: it's used internally. 1366 1367 :param str query: Query string 1368 :param parameters: Dataset parameters 1369 :param parent: Parent dataset's key (if applicable) 1370 :param time_offset: Offset to add to the time component of the dataset 1371 key. This can be used to ensure a unique key even if the parameters and 1372 timing is otherwise identical to an existing dataset's 1373 1374 :return str: Dataset key 1375 """ 1376 # Return a hash based on parameters 1377 # we're going to use the hash of the parameters to uniquely identify 1378 # the dataset, so make sure it's always in the same order, or we might 1379 # end up creating multiple keys for the same dataset if python 1380 # decides to return the dict in a different order 1381 param_key = collections.OrderedDict() 1382 for key in sorted(parameters): 1383 param_key[key] = parameters[key] 1384 1385 # we additionally use the current time as a salt - this should usually 1386 # ensure a unique key for the dataset. if for some reason there is a 1387 # hash collision 1388 param_key["_salt"] = int(time.time()) + time_offset 1389 1390 parent_key = str(parent) if parent else "" 1391 plain_key = repr(param_key) + str(query) + parent_key 1392 hashed_key = hash_to_md5(plain_key) 1393 1394 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1395 # key exists, generate a new one 1396 return self.get_key( 1397 query, parameters, parent, time_offset=random.randint(1, 10) 1398 ) 1399 else: 1400 return hashed_key 1401 1402 def set_key(self, key): 1403 """ 1404 Change dataset key 1405 1406 In principe, keys should never be changed. But there are rare cases 1407 where it is useful to do so, in particular when importing a dataset 1408 from another 4CAT instance; in that case it makes sense to try and 1409 ensure that the key is the same as it was before. This function sets 1410 the dataset key and updates any dataset references to it. 1411 1412 :param str key: Key to set 1413 :return str: Key that was set. If the desired key already exists, the 1414 original key is kept. 1415 """ 1416 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1417 if key_exists or not key: 1418 return self.key 1419 1420 old_key = self.key 1421 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1422 1423 # update references 1424 self.db.update( 1425 "datasets", data={"key_parent": key}, where={"key_parent": old_key} 1426 ) 1427 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1428 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1429 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1430 1431 # for good measure 1432 self.db.commit() 1433 self.key = key 1434 1435 return self.key 1436 1437 def get_status(self): 1438 """ 1439 Get Dataset status 1440 1441 :return string: Dataset status 1442 """ 1443 return self.data["status"] 1444 1445 def update_status(self, status, is_final=False): 1446 """ 1447 Update dataset status 1448 1449 The status is a string that may be displayed to a user to keep them 1450 updated and informed about the progress of a dataset. No memory is kept 1451 of earlier dataset statuses; the current status is overwritten when 1452 updated. 1453 1454 Statuses are also written to the dataset log file. 1455 1456 :param string status: Dataset status 1457 :param bool is_final: If this is `True`, subsequent calls to this 1458 method while the object is instantiated will not update the dataset 1459 status. 1460 :return bool: Status update successful? 1461 """ 1462 if self.no_status_updates: 1463 return 1464 1465 # for presets, copy the updated status to the preset(s) this is part of 1466 if self.preset_parent is None: 1467 self.preset_parent = [ 1468 parent 1469 for parent in self.get_genealogy() 1470 if parent.type.find("preset-") == 0 and parent.key != self.key 1471 ][:1] 1472 1473 if self.preset_parent: 1474 for preset_parent in self.preset_parent: 1475 if not preset_parent.is_finished(): 1476 preset_parent.update_status(status) 1477 1478 self.data["status"] = status 1479 updated = self.db.update( 1480 "datasets", where={"key": self.data["key"]}, data={"status": status} 1481 ) 1482 1483 if is_final: 1484 self.no_status_updates = True 1485 1486 self.log(status) 1487 1488 return updated > 0 1489 1490 def update_progress(self, progress): 1491 """ 1492 Update dataset progress 1493 1494 The progress can be used to indicate to a user how close the dataset 1495 is to completion. 1496 1497 :param float progress: Between 0 and 1. 1498 :return: 1499 """ 1500 progress = min(1, max(0, progress)) # clamp 1501 if type(progress) is int: 1502 progress = float(progress) 1503 1504 self.data["progress"] = progress 1505 updated = self.db.update( 1506 "datasets", where={"key": self.data["key"]}, data={"progress": progress} 1507 ) 1508 return updated > 0 1509 1510 def get_progress(self): 1511 """ 1512 Get dataset progress 1513 1514 :return float: Progress, between 0 and 1 1515 """ 1516 return self.data["progress"] 1517 1518 def finish_with_error(self, error): 1519 """ 1520 Set error as final status, and finish with 0 results 1521 1522 This is a convenience function to avoid having to repeat 1523 "update_status" and "finish" a lot. 1524 1525 :param str error: Error message for final dataset status. 1526 :return: 1527 """ 1528 self.update_status(error, is_final=True) 1529 self.finish(0) 1530 1531 return None 1532 1533 def update_version(self, version): 1534 """ 1535 Update software version used for this dataset 1536 1537 This can be used to verify the code that was used to process this dataset. 1538 1539 :param string version: Version identifier 1540 :return bool: Update successul? 1541 """ 1542 try: 1543 # this fails if the processor type is unknown 1544 # edge case, but let's not crash... 1545 processor_path = self.modules.processors.get(self.data["type"]).filepath 1546 except AttributeError: 1547 processor_path = "" 1548 1549 updated = self.db.update( 1550 "datasets", 1551 where={"key": self.data["key"]}, 1552 data={ 1553 "software_version": version[0], 1554 "software_source": version[1], 1555 "software_file": processor_path, 1556 }, 1557 ) 1558 1559 return updated > 0 1560 1561 def delete_parameter(self, parameter, instant=True): 1562 """ 1563 Delete a parameter from the dataset metadata 1564 1565 :param string parameter: Parameter to delete 1566 :param bool instant: Also delete parameters in this instance object? 1567 :return bool: Update successul? 1568 """ 1569 parameters = self.parameters.copy() 1570 if parameter in parameters: 1571 del parameters[parameter] 1572 else: 1573 return False 1574 1575 updated = self.db.update( 1576 "datasets", 1577 where={"key": self.data["key"]}, 1578 data={"parameters": json.dumps(parameters)}, 1579 ) 1580 1581 if instant: 1582 self.parameters = parameters 1583 1584 return updated > 0 1585 1586 def get_version_url(self, file): 1587 """ 1588 Get a versioned github URL for the version this dataset was processed with 1589 1590 :param file: File to link within the repository 1591 :return: URL, or an empty string 1592 """ 1593 if not self.data["software_source"]: 1594 return "" 1595 1596 filepath = self.data.get("software_file", "") 1597 if filepath.startswith("/extensions/"): 1598 # go to root of extension 1599 filepath = "/" + "/".join(filepath.split("/")[3:]) 1600 1601 return ( 1602 self.data["software_source"] 1603 + "/blob/" 1604 + self.data["software_version"] 1605 + filepath 1606 ) 1607 1608 def top_parent(self): 1609 """ 1610 Get root dataset 1611 1612 Traverses the tree of datasets this one is part of until it finds one 1613 with no source_dataset dataset, then returns that dataset. 1614 1615 :return Dataset: Parent dataset 1616 """ 1617 genealogy = self.get_genealogy() 1618 return genealogy[0] 1619 1620 def get_genealogy(self): 1621 """ 1622 Get genealogy of this dataset 1623 1624 Creates a list of DataSet objects, with the first one being the 1625 'top' dataset, and each subsequent one being a child of the previous 1626 one, ending with the current dataset. 1627 1628 :return list: Dataset genealogy, oldest dataset first 1629 """ 1630 if not self._genealogy: 1631 key_parent = self.key_parent 1632 genealogy = [] 1633 1634 while key_parent: 1635 try: 1636 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1637 except DataSetException: 1638 break 1639 1640 genealogy.append(parent) 1641 if parent.key_parent: 1642 key_parent = parent.key_parent 1643 else: 1644 break 1645 1646 genealogy.reverse() 1647 self._genealogy = genealogy 1648 1649 genealogy = self._genealogy 1650 genealogy.append(self) 1651 1652 return genealogy 1653 1654 def get_children(self, update=False): 1655 """ 1656 Get children of this dataset 1657 1658 :param bool update: Update the list of children from database if True, else return cached value 1659 :return list: List of child datasets 1660 """ 1661 if self._children is not None and not update: 1662 return self._children 1663 1664 analyses = self.db.fetchall( 1665 "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", 1666 (self.key,), 1667 ) 1668 self._children = [ 1669 DataSet(data=analysis, db=self.db, modules=self.modules) 1670 for analysis in analyses 1671 ] 1672 return self._children 1673 1674 def get_all_children(self, recursive=True, update=True): 1675 """ 1676 Get all children of this dataset 1677 1678 Results are returned as a non-hierarchical list, i.e. the result does 1679 not reflect the actual dataset hierarchy (but all datasets in the 1680 result will have the original dataset as an ancestor somewhere) 1681 1682 :return list: List of DataSets 1683 """ 1684 children = self.get_children(update=update) 1685 results = children.copy() 1686 if recursive: 1687 for child in children: 1688 results += child.get_all_children(recursive=recursive, update=update) 1689 1690 return results 1691 1692 def nearest(self, type_filter): 1693 """ 1694 Return nearest dataset that matches the given type 1695 1696 Starting with this dataset, traverse the hierarchy upwards and return 1697 whichever dataset matches the given type. 1698 1699 :param str type_filter: Type filter. Can contain wildcards and is matched 1700 using `fnmatch.fnmatch`. 1701 :return: Earliest matching dataset, or `None` if none match. 1702 """ 1703 genealogy = self.get_genealogy() 1704 for dataset in reversed(genealogy): 1705 if fnmatch.fnmatch(dataset.type, type_filter): 1706 return dataset 1707 1708 return None 1709 1710 def get_breadcrumbs(self): 1711 """ 1712 Get breadcrumbs navlink for use in permalinks 1713 1714 Returns a string representing this dataset's genealogy that may be used 1715 to uniquely identify it. 1716 1717 :return str: Nav link 1718 """ 1719 if not self.key_parent: 1720 return self.key 1721 1722 genealogy = self.get_genealogy() 1723 return ",".join([d.key for d in genealogy]) 1724 1725 def get_compatible_processors(self, config=None): 1726 """ 1727 Get list of processors compatible with this dataset 1728 1729 Checks whether this dataset type is one that is listed as being accepted 1730 by the processor, for each known type: if the processor does not 1731 specify accepted types (via the `is_compatible_with` method), it is 1732 assumed it accepts any top-level datasets 1733 1734 :param ConfigManager|None config: Configuration reader to determine 1735 compatibility through. This may not be the same reader the dataset was 1736 instantiated with, e.g. when checking whether some other user should 1737 be able to run processors on this dataset. 1738 :return dict: Compatible processors, `name => class` mapping 1739 """ 1740 processors = self.modules.processors 1741 1742 available = {} 1743 for processor_type, processor in processors.items(): 1744 if processor.is_from_collector(): 1745 continue 1746 1747 own_processor = self.get_own_processor() 1748 if own_processor and own_processor.exclude_followup_processors( 1749 processor_type 1750 ): 1751 continue 1752 1753 # consider a processor compatible if its is_compatible_with 1754 # method returns True *or* if it has no explicit compatibility 1755 # check and this dataset is top-level (i.e. has no parent) 1756 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1757 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)): 1758 available[processor_type] = processor 1759 1760 return available 1761 1762 def get_place_in_queue(self, update=False): 1763 """ 1764 Determine dataset's position in queue 1765 1766 If the dataset is already finished, the position is -1. Else, the 1767 position is the number of datasets to be completed before this one will 1768 be processed. A position of 0 would mean that the dataset is currently 1769 being executed, or that the backend is not running. 1770 1771 :param bool update: Update the queue position from database if True, else return cached value 1772 :return int: Queue position 1773 """ 1774 if self.is_finished() or not self.data.get("job"): 1775 self._queue_position = -1 1776 return self._queue_position 1777 elif not update and self._queue_position is not None: 1778 # Use cached value 1779 return self._queue_position 1780 else: 1781 # Collect queue position from database via the job 1782 try: 1783 job = Job.get_by_ID(self.data["job"], self.db) 1784 self._queue_position = job.get_place_in_queue() 1785 except JobNotFoundException: 1786 self._queue_position = -1 1787 1788 return self._queue_position 1789 1790 def get_own_processor(self): 1791 """ 1792 Get the processor class that produced this dataset 1793 1794 :return: Processor class, or `None` if not available. 1795 """ 1796 processor_type = self.parameters.get("type", self.data.get("type")) 1797 1798 return self.modules.processors.get(processor_type) 1799 1800 def get_available_processors(self, config=None, exclude_hidden=False): 1801 """ 1802 Get list of processors that may be run for this dataset 1803 1804 Returns all compatible processors except for those that are already 1805 queued or finished and have no options. Processors that have been 1806 run but have options are included so they may be run again with a 1807 different configuration 1808 1809 :param ConfigManager|None config: Configuration reader to determine 1810 compatibility through. This may not be the same reader the dataset was 1811 instantiated with, e.g. when checking whether some other user should 1812 be able to run processors on this dataset. 1813 :param bool exclude_hidden: Exclude processors that should be displayed 1814 in the UI? If `False`, all processors are returned. 1815 1816 :return dict: Available processors, `name => properties` mapping 1817 """ 1818 if self.available_processors: 1819 # Update to reflect exclude_hidden parameter which may be different from last call 1820 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1821 return { 1822 processor_type: processor 1823 for processor_type, processor in self.available_processors.items() 1824 if not exclude_hidden or not processor.is_hidden 1825 } 1826 1827 processors = self.get_compatible_processors(config=config) 1828 1829 for analysis in self.get_children(update=True): 1830 if analysis.type not in processors: 1831 continue 1832 1833 if not processors[analysis.type].get_options(config=config): 1834 # No variable options; this processor has been run so remove 1835 del processors[analysis.type] 1836 continue 1837 1838 if exclude_hidden and processors[analysis.type].is_hidden: 1839 del processors[analysis.type] 1840 1841 self.available_processors = processors 1842 return processors 1843 1844 def link_job(self, job): 1845 """ 1846 Link this dataset to a job ID 1847 1848 Updates the dataset data to include a reference to the job that will be 1849 executing (or has already executed) this job. 1850 1851 Note that if no job can be found for this dataset, this method silently 1852 fails. 1853 1854 :param Job job: The job that will run this dataset 1855 1856 :todo: If the job column ever gets used, make sure it always contains 1857 a valid value, rather than silently failing this method. 1858 """ 1859 if type(job) is not Job: 1860 raise TypeError("link_job requires a Job object as its argument") 1861 1862 if "id" not in job.data: 1863 try: 1864 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1865 except JobNotFoundException: 1866 return 1867 1868 self.db.update( 1869 "datasets", where={"key": self.key}, data={"job": job.data["id"]} 1870 ) 1871 1872 def link_parent(self, key_parent): 1873 """ 1874 Set source_dataset key for this dataset 1875 1876 :param key_parent: Parent key. Not checked for validity 1877 """ 1878 self.db.update( 1879 "datasets", where={"key": self.key}, data={"key_parent": key_parent} 1880 ) 1881 1882 def get_parent(self): 1883 """ 1884 Get parent dataset 1885 1886 :return DataSet: Parent dataset, or `None` if not applicable 1887 """ 1888 return ( 1889 DataSet(key=self.key_parent, db=self.db, modules=self.modules) 1890 if self.key_parent 1891 else None 1892 ) 1893 1894 def detach(self): 1895 """ 1896 Makes the datasets standalone, i.e. not having any source_dataset dataset 1897 """ 1898 self.link_parent("") 1899 1900 def is_dataset(self): 1901 """ 1902 Easy way to confirm this is a dataset. 1903 Used for checking processor and dataset compatibility, 1904 which needs to handle both processors and datasets. 1905 """ 1906 return True 1907 1908 def is_top_dataset(self): 1909 """ 1910 Easy way to confirm this is a top dataset. 1911 Used for checking processor and dataset compatibility, 1912 which needs to handle both processors and datasets. 1913 """ 1914 if self.key_parent: 1915 return False 1916 return True 1917 1918 def is_expiring(self, config): 1919 """ 1920 Determine if dataset is set to expire 1921 1922 Similar to `is_expired`, but checks if the dataset will be deleted in 1923 the future, not if it should be deleted right now. 1924 1925 :param ConfigManager config: Configuration reader (context-aware) 1926 :return bool|int: `False`, or the expiration date as a Unix timestamp. 1927 """ 1928 # has someone opted out of deleting this? 1929 if self.parameters.get("keep"): 1930 return False 1931 1932 # is this dataset explicitly marked as expiring after a certain time? 1933 if self.parameters.get("expires-after"): 1934 return self.parameters.get("expires-after") 1935 1936 # is the data source configured to have its datasets expire? 1937 expiration = config.get("datasources.expiration", {}) 1938 if not expiration.get(self.parameters.get("datasource")): 1939 return False 1940 1941 # is there a timeout for this data source? 1942 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1943 return self.timestamp + expiration.get( 1944 self.parameters.get("datasource") 1945 ).get("timeout") 1946 1947 return False 1948 1949 def is_expired(self, config): 1950 """ 1951 Determine if dataset should be deleted 1952 1953 Datasets can be set to expire, but when they should be deleted depends 1954 on a number of factor. This checks them all. 1955 1956 :param ConfigManager config: Configuration reader (context-aware) 1957 :return bool: 1958 """ 1959 # has someone opted out of deleting this? 1960 if not self.is_expiring(config): 1961 return False 1962 1963 # is this dataset explicitly marked as expiring after a certain time? 1964 future = ( 1965 time.time() + 3600 1966 ) # ensure we don't delete datasets with invalid expiration times 1967 if ( 1968 self.parameters.get("expires-after") 1969 and convert_to_int(self.parameters["expires-after"], future) < time.time() 1970 ): 1971 return True 1972 1973 # is the data source configured to have its datasets expire? 1974 expiration = config.get("datasources.expiration", {}) 1975 if not expiration.get(self.parameters.get("datasource")): 1976 return False 1977 1978 # is the dataset older than the set timeout? 1979 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1980 return ( 1981 self.timestamp 1982 + expiration[self.parameters.get("datasource")]["timeout"] 1983 < time.time() 1984 ) 1985 1986 return False 1987 1988 def is_from_collector(self): 1989 """ 1990 Check if this dataset was made by a processor that collects data, i.e. 1991 a search or import worker. 1992 1993 :return bool: 1994 """ 1995 return self.type.endswith("-search") or self.type.endswith("-import") 1996 1997 def get_extension(self): 1998 """ 1999 Gets the file extention this dataset produces. 2000 Also checks whether the results file exists. 2001 Used for checking processor and dataset compatibility. 2002 2003 :return str extension: Extension, e.g. `csv` 2004 """ 2005 if self.get_results_path().exists(): 2006 return self.get_results_path().suffix[1:] 2007 2008 return False 2009 2010 def get_media_type(self): 2011 """ 2012 Gets the media type of the dataset file. 2013 2014 :return str: media type, e.g., "text" 2015 """ 2016 own_processor = self.get_own_processor() 2017 if hasattr(self, "media_type"): 2018 # media type can be defined explicitly in the dataset; this is the priority 2019 return self.media_type 2020 elif own_processor is not None: 2021 # or media type can be defined in the processor 2022 # some processors can set different media types for different datasets (e.g., import_media) 2023 if hasattr(own_processor, "media_type"): 2024 return own_processor.media_type 2025 2026 # Default to text 2027 return self.parameters.get("media_type", "text") 2028 2029 def get_metadata(self): 2030 """ 2031 Get dataset metadata 2032 2033 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 2034 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 2035 update its database (and ensure compatibility with the exporting version of 4CAT). 2036 """ 2037 metadata = self.db.fetchone( 2038 "SELECT * FROM datasets WHERE key = %s", (self.key,) 2039 ) 2040 2041 # get 4CAT version (presumably to ensure export is compatible with import) 2042 metadata["current_4CAT_version"] = get_software_version() 2043 return metadata 2044 2045 def get_result_url(self): 2046 """ 2047 Gets the 4CAT frontend URL of a dataset file. 2048 2049 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 2050 SERVER_HTTPS) plus hardcoded '/result/'. 2051 TODO: create more dynamic method of obtaining url. 2052 """ 2053 filename = self.get_results_path().name 2054 2055 # we cheat a little here by using the modules' config reader, but these 2056 # will never be context-dependent values anyway 2057 url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \ 2058 self.modules.config.get("flask.server_name") + '/result/' + filename 2059 return url_to_file 2060 2061 def warn_unmappable_item( 2062 self, item_count, processor=None, error_message=None, warn_admins=True 2063 ): 2064 """ 2065 Log an item that is unable to be mapped and warn administrators. 2066 2067 :param int item_count: Item index 2068 :param Processor processor: Processor calling function8 2069 """ 2070 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 2071 2072 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 2073 closest_dataset = ( 2074 processor.dataset 2075 if processor is not None and processor.dataset is not None 2076 else self 2077 ) 2078 # Log error to dataset log 2079 closest_dataset.log(dataset_error_message) 2080 2081 if warn_admins: 2082 if processor is not None: 2083 processor.log.warning( 2084 f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}." 2085 ) 2086 elif hasattr(self.db, "log"): 2087 # borrow the database's log handler 2088 self.db.log.warning( 2089 f"Unable to map item all items for dataset {closest_dataset.key}." 2090 ) 2091 else: 2092 # No other log available 2093 raise DataSetException( 2094 f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn" 2095 ) 2096 2097 # Annotation functions (most of it is handled in Annotations) 2098 def has_annotations(self) -> bool: 2099 """ 2100 Whether this dataset has annotations 2101 """ 2102 2103 annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,)) 2104 2105 return True if annotation else False 2106 2107 def num_annotations(self) -> int: 2108 """ 2109 Get the amount of annotations 2110 """ 2111 return self.db.fetchone( 2112 "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,) 2113 )["count"] 2114 2115 def get_annotation(self, data: dict): 2116 """ 2117 Retrieves a specific annotation if it exists. 2118 2119 :param data: A dictionary with which to get the annotations from. 2120 To get specific annotations, include either an `id` field or 2121 `field_id` and `item_id` fields. 2122 2123 return Annotation: Annotation object. 2124 """ 2125 2126 if "id" not in data or ("field_id" not in data and "item_id" not in data): 2127 return None 2128 2129 if "dataset" not in data: 2130 data["dataset"] = self.key 2131 2132 return Annotation(data=data, db=self.db) 2133 2134 def get_annotations(self) -> list: 2135 """ 2136 Retrieves all annotations for this dataset. 2137 2138 return list: List of Annotation objects. 2139 """ 2140 2141 return Annotation.get_annotations_for_dataset(self.db, self.key) 2142 2143 def get_annotations_for_item(self, item_id: str) -> list: 2144 """ 2145 Retrieves all annotations from this dataset for a specific item (e.g. social media post). 2146 """ 2147 return Annotation.get_annotations_for_dataset( 2148 self.db, self.key, item_id=item_id 2149 ) 2150 2151 def has_annotation_fields(self) -> bool: 2152 """ 2153 Returns True if there's annotation fields saved tot the dataset table 2154 Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.). 2155 """ 2156 2157 return True if self.annotation_fields else False 2158 2159 def get_annotation_field_labels(self) -> list: 2160 """ 2161 Retrieves the saved annotation field labels for this dataset. 2162 These are stored in the annotations table. 2163 2164 :return list: List of annotation field labels. 2165 """ 2166 2167 annotation_fields = self.annotation_fields 2168 2169 if not annotation_fields: 2170 return [] 2171 2172 labels = [v["label"] for v in annotation_fields.values()] 2173 2174 return labels 2175 2176 def save_annotations(self, annotations: list) -> int: 2177 """ 2178 Takes a list of annotations and saves them to the annotations table. 2179 If a field is not yet present in the `annotation_fields` column in 2180 the datasets table, it also adds it there. 2181 2182 :param list annotations: List of dictionaries with annotation items. Must have `item_id`, `field_id`, 2183 and `label`. 2184 `item_id` is for the specific item being annotated (e.g. a social media post) 2185 `field_id` refers to the annotation field. 2186 `label` is a human-readable description of this annotation. 2187 E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", 2188 "value": "Yes"}] 2189 2190 :returns int: How many annotations were saved. 2191 2192 """ 2193 2194 if not annotations: 2195 return 0 2196 2197 count = 0 2198 annotation_fields = self.annotation_fields 2199 2200 # Add some dataset data to annotations, if not present 2201 for annotation_data in annotations: 2202 # Check if the required fields are present 2203 if "item_id" not in annotation_data: 2204 raise AnnotationException( 2205 "Can't save annotations; annotation must have an `item_id` referencing " 2206 "the item it annotated, got %s" % annotation_data 2207 ) 2208 if "field_id" not in annotation_data: 2209 raise AnnotationException( 2210 "Can't save annotations; annotation must have a `field_id` field, " 2211 "got %s" % annotation_data 2212 ) 2213 if "label" not in annotation_data or not isinstance( 2214 annotation_data["label"], str 2215 ): 2216 raise AnnotationException( 2217 "Can't save annotations; annotation must have a `label` field, " 2218 "got %s" % annotation_data 2219 ) 2220 2221 # Set dataset key 2222 if not annotation_data.get("dataset"): 2223 annotation_data["dataset"] = self.key 2224 2225 # Set default author to this dataset owner 2226 # If this annotation is made by a processor, it will have the processor name 2227 if not annotation_data.get("author"): 2228 annotation_data["author"] = self.get_owners()[0] 2229 2230 # Create Annotation object, which also saves it to the database 2231 # If this dataset/item ID/label combination already exists, this retrieves the 2232 # existing data and updates it with new values. 2233 Annotation(data=annotation_data, db=self.db) 2234 count += 1 2235 2236 # Save annotation fields if things changed 2237 if annotation_fields != self.annotation_fields: 2238 self.save_annotation_fields(annotation_fields) 2239 2240 return count 2241 2242 def save_annotation_fields(self, new_fields: dict, add=False) -> int: 2243 """ 2244 Save annotation field data to the datasets table (in the `annotation_fields` column). 2245 If changes to the annotation fields affect existing annotations, 2246 this function will also call `update_annotations_via_fields()` to change them. 2247 2248 :param dict new_fields: New annotation fields, with a field ID as key. 2249 2250 :param bool add: Whether we're merely adding new fields 2251 or replacing the whole batch. If add is False, 2252 `new_fields` should contain all fields. 2253 2254 :return int: The number of annotation fields saved. 2255 2256 """ 2257 2258 # Get existing annotation fields to see if stuff changed. 2259 old_fields = self.annotation_fields 2260 changes = False 2261 2262 # Do some validation 2263 # Annotation field must be valid JSON. 2264 try: 2265 json.dumps(new_fields) 2266 except ValueError: 2267 raise AnnotationException( 2268 "Can't save annotation fields: not valid JSON (%s)" % new_fields 2269 ) 2270 2271 # No duplicate IDs 2272 if len(new_fields) != len(set(new_fields)): 2273 raise AnnotationException( 2274 "Can't save annotation fields: field IDs must be unique" 2275 ) 2276 2277 # Annotation fields must at minimum have `type` and `label` keys. 2278 for field_id, annotation_field in new_fields.items(): 2279 if not isinstance(field_id, str): 2280 raise AnnotationException( 2281 "Can't save annotation fields: field ID %s is not a valid string" 2282 % field_id 2283 ) 2284 if "label" not in annotation_field: 2285 raise AnnotationException( 2286 "Can't save annotation fields: all fields must have a label" 2287 % field_id 2288 ) 2289 if "type" not in annotation_field: 2290 raise AnnotationException( 2291 "Can't save annotation fields: all fields must have a type" 2292 % field_id 2293 ) 2294 2295 # Check if fields are removed 2296 if not add: 2297 for field_id in old_fields.keys(): 2298 if field_id not in new_fields: 2299 changes = True 2300 2301 # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin 2302 # dataset 2303 for field_id in new_fields.keys(): 2304 if field_id in old_fields and old_fields[field_id].get("from_dataset"): 2305 old_fields[field_id]["label"] = new_fields[field_id][ 2306 "label" 2307 ] # Only labels could've been changed 2308 new_fields[field_id] = old_fields[field_id] 2309 2310 # If we're just adding fields, add them to the old fields. 2311 # If the field already exists, overwrite the old field. 2312 if add and old_fields: 2313 all_fields = old_fields 2314 for field_id, annotation_field in new_fields.items(): 2315 all_fields[field_id] = annotation_field 2316 new_fields = all_fields 2317 2318 # We're saving the new annotation fields as-is. 2319 # Ordering of fields is preserved this way. 2320 self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)}) 2321 self.annotation_fields = new_fields 2322 2323 # If anything changed with the annotation fields, possibly update 2324 # existing annotations (e.g. to delete them or change their labels). 2325 if changes: 2326 Annotation.update_annotations_via_fields( 2327 self.key, old_fields, new_fields, self.db 2328 ) 2329 2330 return len(new_fields) 2331 2332 def get_annotation_metadata(self) -> dict: 2333 """ 2334 Retrieves all the data for this dataset from the annotations table. 2335 """ 2336 2337 annotation_data = self.db.fetchall( 2338 "SELECT * FROM annotations WHERE dataset = '%s';" % self.key 2339 ) 2340 return annotation_data 2341 2342 def __getattr__(self, attr): 2343 """ 2344 Getter so we don't have to use .data all the time 2345 2346 :param attr: Data key to get 2347 :return: Value 2348 """ 2349 2350 if attr in dir(self): 2351 # an explicitly defined attribute should always be called in favour 2352 # of this passthrough 2353 attribute = getattr(self, attr) 2354 return attribute 2355 elif attr in self.data: 2356 return self.data[attr] 2357 else: 2358 raise AttributeError("DataSet instance has no attribute %s" % attr) 2359 2360 def __setattr__(self, attr, value): 2361 """ 2362 Setter so we can flexibly update the database 2363 2364 Also updates internal data stores (.data etc). If the attribute is 2365 unknown, it is stored within the 'parameters' attribute. 2366 2367 :param str attr: Attribute to update 2368 :param value: New value 2369 """ 2370 2371 # don't override behaviour for *actual* class attributes 2372 if attr in dir(self): 2373 super().__setattr__(attr, value) 2374 return 2375 2376 if attr not in self.data: 2377 self.parameters[attr] = value 2378 attr = "parameters" 2379 value = self.parameters 2380 2381 if attr == "parameters": 2382 value = json.dumps(value) 2383 2384 self.db.update("datasets", where={"key": self.key}, data={attr: value}) 2385 2386 self.data[attr] = value 2387 2388 if attr == "parameters": 2389 self.parameters = json.loads(value)
23class DataSet(FourcatModule): 24 """ 25 Provide interface to safely register and run operations on a dataset 26 27 A dataset is a collection of: 28 - A unique identifier 29 - A set of parameters that demarcate the data contained within 30 - The data 31 32 The data is usually stored in a file on the disk; the parameters are stored 33 in a database. The handling of the data, et cetera, is done by other 34 workers; this class defines method to create and manipulate the dataset's 35 properties. 36 """ 37 38 # Attributes must be created here to ensure getattr and setattr work properly 39 data = None 40 key = "" 41 42 _children = None 43 available_processors = None 44 _genealogy = None 45 preset_parent = None 46 parameters = None 47 modules = None 48 annotation_fields = None 49 50 owners = None 51 tagged_owners = None 52 53 db = None 54 folder = None 55 is_new = True 56 57 no_status_updates = False 58 staging_areas = None 59 _queue_position = None 60 61 def __init__( 62 self, 63 parameters=None, 64 key=None, 65 job=None, 66 data=None, 67 db=None, 68 parent="", 69 extension=None, 70 type=None, 71 is_private=True, 72 owner="anonymous", 73 modules=None 74 ): 75 """ 76 Create new dataset object 77 78 If the dataset is not in the database yet, it is added. 79 80 :param dict parameters: Only when creating a new dataset. Dataset 81 parameters, free-form dictionary. 82 :param str key: Dataset key. If given, dataset with this key is loaded. 83 :param int job: Job ID. If given, dataset corresponding to job is 84 loaded. 85 :param dict data: Dataset data, corresponding to a row in the datasets 86 database table. If not given, retrieved from database depending on key. 87 :param db: Database connection 88 :param str parent: Only when creating a new dataset. Parent dataset 89 key to which the one being created is a child. 90 :param str extension: Only when creating a new dataset. Extension of 91 dataset result file. 92 :param str type: Only when creating a new dataset. Type of the dataset, 93 corresponding to the type property of a processor class. 94 :param bool is_private: Only when creating a new dataset. Whether the 95 dataset is private or public. 96 :param str owner: Only when creating a new dataset. The user name of 97 the dataset's creator. 98 :param modules: Module cache. If not given, will be loaded when needed 99 (expensive). Used to figure out what processors are compatible with 100 this dataset. 101 """ 102 self.db = db 103 104 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 105 self.data = {} 106 self.parameters = {} 107 self.available_processors = {} 108 self._genealogy = [] 109 self.staging_areas = [] 110 self.modules = modules 111 112 if key is not None: 113 self.key = key 114 current = self.db.fetchone( 115 "SELECT * FROM datasets WHERE key = %s", (self.key,) 116 ) 117 if not current: 118 raise DataSetNotFoundException( 119 "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" 120 % key 121 ) 122 123 elif job is not None: 124 current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),)) 125 if not current: 126 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 127 128 self.key = current["key"] 129 elif data is not None: 130 current = data 131 if ( 132 "query" not in data 133 or "key" not in data 134 or "parameters" not in data 135 or "key_parent" not in data 136 ): 137 raise DataSetException( 138 "DataSet() requires a complete dataset record for its 'data' argument" 139 ) 140 141 self.key = current["key"] 142 else: 143 if parameters is None: 144 raise DataSetException( 145 "DataSet() requires either 'key', or 'parameters' to be given" 146 ) 147 148 if not type: 149 raise DataSetException("Datasets must have their type set explicitly") 150 151 query = self.get_label(parameters, default=type) 152 self.key = self.get_key(query, parameters, parent) 153 current = self.db.fetchone( 154 "SELECT * FROM datasets WHERE key = %s AND query = %s", 155 (self.key, query), 156 ) 157 158 if current: 159 self.data = current 160 self.parameters = json.loads(self.data["parameters"]) 161 self.annotation_fields = json.loads(self.data["annotation_fields"]) \ 162 if self.data.get("annotation_fields") else {} 163 self.is_new = False 164 else: 165 self.data = {"type": type} # get_own_processor needs this 166 own_processor = self.get_own_processor() 167 version = get_software_commit(own_processor) 168 self.data = { 169 "key": self.key, 170 "query": self.get_label(parameters, default=type), 171 "parameters": json.dumps(parameters), 172 "result_file": "", 173 "creator": owner, 174 "status": "", 175 "type": type, 176 "timestamp": int(time.time()), 177 "is_finished": False, 178 "is_private": is_private, 179 "software_version": version[0], 180 "software_source": version[1], 181 "software_file": "", 182 "num_rows": 0, 183 "progress": 0.0, 184 "key_parent": parent, 185 "annotation_fields": "{}" 186 } 187 self.parameters = parameters 188 self.annotation_fields = {} 189 190 self.db.insert("datasets", data=self.data) 191 self.refresh_owners() 192 self.add_owner(owner) 193 194 # Find desired extension from processor if not explicitly set 195 if extension is None: 196 if own_processor: 197 extension = own_processor.get_extension( 198 parent_dataset=DataSet(key=parent, db=db, modules=self.modules) 199 if parent 200 else None 201 ) 202 # Still no extension, default to 'csv' 203 if not extension: 204 extension = "csv" 205 206 # Reserve filename and update data['result_file'] 207 self.reserve_result_file(parameters, extension) 208 209 self.refresh_owners() 210 211 def check_dataset_finished(self): 212 """ 213 Checks if dataset is finished. Returns path to results file is not empty, 214 or 'empty_file' when there were not matches. 215 216 Only returns a path if the dataset is complete. In other words, if this 217 method returns a path, a file with the complete results for this dataset 218 will exist at that location. 219 220 :return: A path to the results file, 'empty_file', or `None` 221 """ 222 if self.data["is_finished"] and self.data["num_rows"] > 0: 223 return self.get_results_path() 224 elif self.data["is_finished"] and self.data["num_rows"] == 0: 225 return 'empty' 226 else: 227 return None 228 229 def get_results_path(self): 230 """ 231 Get path to results file 232 233 Always returns a path, that will at some point contain the dataset 234 data, but may not do so yet. Use this to get the location to write 235 generated results to. 236 237 :return Path: A path to the results file 238 """ 239 # alas we need to instantiate a config reader here - no way around it 240 if not self.folder: 241 self.folder = self.modules.config.get('PATH_ROOT').joinpath(self.modules.config.get('PATH_DATA')) 242 return self.folder.joinpath(self.data["result_file"]) 243 244 def get_results_folder_path(self): 245 """ 246 Get path to folder containing accompanying results 247 248 Returns a path that may not yet be created 249 250 :return Path: A path to the results file 251 """ 252 return self.get_results_path().parent.joinpath("folder_" + self.key) 253 254 def get_log_path(self): 255 """ 256 Get path to dataset log file 257 258 Each dataset has a single log file that documents its creation. This 259 method returns the path to that file. It is identical to the path of 260 the dataset result file, with 'log' as its extension instead. 261 262 :return Path: A path to the log file 263 """ 264 return self.get_results_path().with_suffix(".log") 265 266 def clear_log(self): 267 """ 268 Clears the dataset log file 269 270 If the log file does not exist, it is created empty. The log file will 271 have the same file name as the dataset result file, with the 'log' 272 extension. 273 """ 274 log_path = self.get_log_path() 275 with log_path.open("w"): 276 pass 277 278 def log(self, log): 279 """ 280 Write log message to file 281 282 Writes the log message to the log file on a new line, including a 283 timestamp at the start of the line. Note that this assumes the log file 284 already exists - it should have been created/cleared with clear_log() 285 prior to calling this. 286 287 :param str log: Log message to write 288 """ 289 log_path = self.get_log_path() 290 with log_path.open("a", encoding="utf-8") as outfile: 291 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log)) 292 293 def _iterate_items(self, processor=None): 294 """ 295 A generator that iterates through a CSV or NDJSON file 296 297 This is an internal method and should not be called directly. Rather, 298 call iterate_items() and use the generated dictionary and its properties. 299 300 If a reference to a processor is provided, with every iteration, 301 the processor's 'interrupted' flag is checked, and if set a 302 ProcessorInterruptedException is raised, which by default is caught 303 in the worker and subsequently stops execution gracefully. 304 305 There are two file types that can be iterated (currently): CSV files 306 and NDJSON (newline-delimited JSON) files. In the future, one could 307 envision adding a pathway to retrieve items from e.g. a MongoDB 308 collection directly instead of from a static file 309 310 :param BasicProcessor processor: A reference to the processor 311 iterating the dataset. 312 :return generator: A generator that yields each item as a dictionary 313 """ 314 path = self.get_results_path() 315 316 317 # Yield through items one by one 318 if path.suffix.lower() == ".csv": 319 with path.open("rb") as infile: 320 wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8") 321 reader = csv.DictReader(wrapped_infile) 322 323 if not self.get_own_processor(): 324 # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not 325 first_item = next(reader) 326 if first_item is None or any( 327 [True for key in first_item if type(key) is not str] 328 ): 329 raise NotImplementedError( 330 f"Cannot iterate through CSV file (deprecated processor {self.type})" 331 ) 332 yield first_item 333 334 for item in reader: 335 if hasattr(processor, "interrupted") and processor.interrupted: 336 raise ProcessorInterruptedException( 337 "Processor interrupted while iterating through CSV file" 338 ) 339 340 yield item 341 342 elif path.suffix.lower() == ".ndjson": 343 # In NDJSON format each line in the file is a self-contained JSON 344 with path.open(encoding="utf-8") as infile: 345 for line in infile: 346 if hasattr(processor, "interrupted") and processor.interrupted: 347 raise ProcessorInterruptedException( 348 "Processor interrupted while iterating through NDJSON file" 349 ) 350 351 yield json.loads(line) 352 353 else: 354 raise NotImplementedError("Cannot iterate through %s file" % path.suffix) 355 356 def iterate_items( 357 self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None 358 ): 359 """ 360 Generate mapped dataset items 361 362 Wrapper for _iterate_items that returns a DatasetItem, which can be 363 accessed as a dict returning the original item or (if a mapper is 364 available) the mapped item. Mapped or original versions of the item can 365 also be accessed via the `original` and `mapped_object` properties of 366 the DatasetItem. 367 368 Processors can define a method called `map_item` that can be used to map 369 an item from the dataset file before it is processed any further. This is 370 slower than storing the data file in the right format to begin with but 371 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 372 are nested objects when retrieved from the twitter API that are easier 373 to store as a JSON file than as a flat CSV file, and it would be a shame 374 to throw away that data. 375 376 Note the two parameters warn_unmappable and map_missing. Items can be 377 unmappable in that their structure is too different to coerce into a 378 neat dictionary of the structure the data source expects. This makes it 379 'unmappable' and warn_unmappable determines what happens in this case. 380 It can also be of the right structure, but with some fields missing or 381 incomplete. map_missing determines what happens in that case. The 382 latter is for example possible when importing data via Zeeschuimer, 383 which produces unstably-structured data captured from social media 384 sites. 385 386 :param BasicProcessor processor: A reference to the processor 387 iterating the dataset. 388 :param bool warn_unmappable: If an item is not mappable, skip the item 389 and log a warning 390 :param max_unmappable: Skip at most this many unmappable items; if 391 more are encountered, stop iterating. `None` to never stop. 392 :param map_missing: Indicates what to do with mapped items for which 393 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 394 - 'default': fill missing fields with the default passed by map_item 395 - 'abort': raise a MappedItemIncompleteException if a field is missing 396 - a callback: replace missing field with the return value of the 397 callback. The MappedItem object is passed to the callback as the 398 first argument and the name of the missing field as the second. 399 - a dictionary with a key for each possible missing field: replace missing 400 field with a strategy for that field ('default', 'abort', or a callback) 401 :param get_annotations: Whether to also fetch annotations from the database. 402 This can be disabled to help speed up iteration. 403 :return generator: A generator that yields DatasetItems 404 """ 405 unmapped_items = 0 406 # Collect item_mapper for use with filter 407 item_mapper = False 408 own_processor = self.get_own_processor() 409 if own_processor and own_processor.map_item_method_available(dataset=self): 410 item_mapper = True 411 412 # Annotations are dynamically added, and we're handling them as 'extra' map_item fields. 413 if get_annotations: 414 annotation_fields = self.annotation_fields 415 num_annotations = 0 if not annotation_fields else self.num_annotations() 416 all_annotations = None 417 # If this dataset has less than n annotations, just retrieve them all before iterating 418 if 0 < num_annotations <= 500: 419 # Convert to dict for faster lookup when iterating over items 420 all_annotations = collections.defaultdict(list) 421 for annotation in self.get_annotations(): 422 all_annotations[annotation.item_id].append(annotation) 423 424 # missing field strategy can be for all fields at once, or per field 425 # if it is per field, it is a dictionary with field names and their strategy 426 # if it is for all fields, it may be a callback, 'abort', or 'default' 427 default_strategy = "default" 428 if type(map_missing) is not dict: 429 default_strategy = map_missing 430 map_missing = {} 431 432 # Loop through items 433 for i, item in enumerate(self._iterate_items(processor)): 434 # Save original to yield 435 original_item = item.copy() 436 437 # Map item 438 if item_mapper: 439 try: 440 mapped_item = own_processor.get_mapped_item(item) 441 except MapItemException as e: 442 if warn_unmappable: 443 self.warn_unmappable_item( 444 i, processor, e, warn_admins=unmapped_items is False 445 ) 446 447 unmapped_items += 1 448 if max_unmappable and unmapped_items > max_unmappable: 449 break 450 else: 451 continue 452 453 # check if fields have been marked as 'missing' in the 454 # underlying data, and treat according to the chosen strategy 455 if mapped_item.get_missing_fields(): 456 for missing_field in mapped_item.get_missing_fields(): 457 strategy = map_missing.get(missing_field, default_strategy) 458 459 if callable(strategy): 460 # delegate handling to a callback 461 mapped_item.data[missing_field] = strategy( 462 mapped_item.data, missing_field 463 ) 464 elif strategy == "abort": 465 # raise an exception to be handled at the processor level 466 raise MappedItemIncompleteException( 467 f"Cannot process item, field {missing_field} missing in source data." 468 ) 469 elif strategy == "default": 470 # use whatever was passed to the object constructor 471 mapped_item.data[missing_field] = mapped_item.data[ 472 missing_field 473 ].value 474 else: 475 raise ValueError( 476 "map_missing must be 'abort', 'default', or a callback." 477 ) 478 else: 479 mapped_item = original_item 480 481 # Add possible annotation values 482 if get_annotations and annotation_fields: 483 # We're always handling annotated data as a MappedItem object, 484 # even if no map_item() function is available for the data source. 485 if not isinstance(mapped_item, MappedItem): 486 mapped_item = MappedItem(mapped_item) 487 488 # Retrieve from all annotations 489 if all_annotations: 490 item_annotations = all_annotations.get(mapped_item.data["id"]) 491 # Or get annotations per item for large datasets (more queries but less memory usage) 492 else: 493 item_annotations = self.get_annotations_for_item( 494 mapped_item.data["id"] 495 ) 496 497 # Loop through annotation fields 498 for ( 499 annotation_field_id, 500 annotation_field_items, 501 ) in annotation_fields.items(): 502 # Set default value to an empty string 503 value = "" 504 # Set value if this item has annotations 505 if item_annotations: 506 # Items can have multiple annotations 507 for annotation in item_annotations: 508 if annotation.field_id == annotation_field_id: 509 value = annotation.value 510 if isinstance(value, list): 511 value = ",".join(value) 512 513 # Make sure labels are unique when iterating through items 514 column_name = annotation_field_items["label"] 515 label_count = 1 516 while column_name in mapped_item.data: 517 label_count += 1 518 column_name = f"{annotation_field_items['label']}_{label_count}" 519 520 # We're always adding an annotation value 521 # as an empty string, even if it's absent. 522 mapped_item.data[column_name] = value 523 524 # yield a DatasetItem, which is a dict with some special properties 525 yield DatasetItem( 526 mapper=item_mapper, 527 original=original_item, 528 mapped_object=mapped_item, 529 **( 530 mapped_item.get_item_data() 531 if type(mapped_item) is MappedItem 532 else mapped_item 533 ), 534 ) 535 536 def sort_and_iterate_items( 537 self, sort="", reverse=False, chunk_size=50000, **kwargs 538 ) -> dict: 539 """ 540 Loop through items in a dataset, sorted by a given key. 541 542 This is a wrapper function for `iterate_items()` with the 543 added functionality of sorting a dataset. 544 545 :param sort: The item key that determines the sort order. 546 :param reverse: Whether to sort by largest values first. 547 548 :returns dict: Yields iterated post 549 """ 550 551 def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False): 552 """ 553 Sort items based on the given key and order. 554 555 :param items_to_sort: The items to sort 556 :param sort_key: The key to sort by 557 :param reverse: Whether to sort in reverse order 558 :return: Sorted items 559 """ 560 if reverse is False and (sort_key == "dataset-order" or sort_key == ""): 561 # Sort by dataset order 562 yield from items_to_sort 563 elif sort_key == "dataset-order" and reverse: 564 # Sort by dataset order in reverse 565 yield from reversed(list(items_to_sort)) 566 else: 567 # Sort on the basis of a column value 568 if not convert_sort_to_float: 569 yield from sorted( 570 items_to_sort, 571 key=lambda x: x.get(sort_key, ""), 572 reverse=reverse, 573 ) 574 else: 575 # Dataset fields can contain integers and empty strings. 576 # Since these cannot be compared, we will convert every 577 # empty string to 0. 578 yield from sorted( 579 items_to_sort, 580 key=lambda x: convert_to_float(x.get(sort_key, ""), force=True), 581 reverse=reverse, 582 ) 583 584 if self.num_rows < chunk_size: 585 try: 586 # First try to force-sort float values. If this doesn't work, it'll be alphabetical. 587 yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True) 588 except (TypeError, ValueError): 589 yield from sort_items( 590 self.iterate_items(**kwargs), 591 sort, 592 reverse, 593 convert_sort_to_float=False, 594 ) 595 596 else: 597 # For large datasets, we will use chunk sorting 598 staging_area = self.get_staging_area() 599 buffer = [] 600 chunk_files = [] 601 convert_sort_to_float = True 602 fieldnames = self.get_columns() 603 604 def write_chunk(buffer, chunk_index): 605 """ 606 Write a chunk of data to a temporary file 607 608 :param buffer: The buffer containing the chunk of data 609 :param chunk_index: The index of the chunk 610 :return: The path to the temporary file 611 """ 612 temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv") 613 with temp_file.open("w", encoding="utf-8") as chunk_file: 614 writer = csv.DictWriter(chunk_file, fieldnames=fieldnames) 615 writer.writeheader() 616 writer.writerows(buffer) 617 return temp_file 618 619 # Divide the dataset into sorted chunks 620 for item in self.iterate_items(**kwargs): 621 buffer.append(item) 622 if len(buffer) >= chunk_size: 623 try: 624 buffer = list( 625 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 626 ) 627 except (TypeError, ValueError): 628 convert_sort_to_float = False 629 buffer = list( 630 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 631 ) 632 633 chunk_files.append(write_chunk(buffer, len(chunk_files))) 634 buffer.clear() 635 636 # Sort and write any remaining items in the buffer 637 if buffer: 638 buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float)) 639 chunk_files.append(write_chunk(buffer, len(chunk_files))) 640 buffer.clear() 641 642 # Merge sorted chunks into the final sorted file 643 sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv") 644 with sorted_file.open("w", encoding="utf-8") as outfile: 645 writer = csv.DictWriter(outfile, fieldnames=self.get_columns()) 646 writer.writeheader() 647 648 # Open all chunk files for reading 649 chunk_readers = [ 650 csv.DictReader(chunk.open("r", encoding="utf-8")) 651 for chunk in chunk_files 652 ] 653 heap = [] 654 655 # Initialize the heap with the first row from each chunk 656 for i, reader in enumerate(chunk_readers): 657 try: 658 row = next(reader) 659 if sort == "dataset-order" and reverse: 660 # Use a reverse index for "dataset-order" and reverse=True 661 sort_key = -i 662 elif convert_sort_to_float: 663 # Negate numeric keys for reverse sorting 664 sort_key = ( 665 -convert_to_float(row.get(sort, "")) 666 if reverse 667 else convert_to_float(row.get(sort, "")) 668 ) 669 else: 670 if reverse: 671 # For reverse string sorting, invert string comparison by creating a tuple 672 # with an inverted string - this makes Python's tuple comparison work in reverse 673 sort_key = ( 674 tuple(-ord(c) for c in row.get(sort, "")), 675 -i, 676 ) 677 else: 678 sort_key = (row.get(sort, ""), i) 679 heap.append((sort_key, i, row)) 680 except StopIteration: 681 pass 682 683 # Use a heap to merge sorted chunks 684 import heapq 685 686 heapq.heapify(heap) 687 while heap: 688 _, chunk_index, smallest_row = heapq.heappop(heap) 689 writer.writerow(smallest_row) 690 try: 691 next_row = next(chunk_readers[chunk_index]) 692 if sort == "dataset-order" and reverse: 693 # Use a reverse index for "dataset-order" and reverse=True 694 sort_key = -chunk_index 695 elif convert_sort_to_float: 696 sort_key = ( 697 -convert_to_float(next_row.get(sort, "")) 698 if reverse 699 else convert_to_float(next_row.get(sort, "")) 700 ) 701 else: 702 # Use the same inverted comparison for string values 703 if reverse: 704 sort_key = ( 705 tuple(-ord(c) for c in next_row.get(sort, "")), 706 -chunk_index, 707 ) 708 else: 709 sort_key = (next_row.get(sort, ""), chunk_index) 710 heapq.heappush(heap, (sort_key, chunk_index, next_row)) 711 except StopIteration: 712 pass 713 714 # Read the sorted file and yield each item 715 with sorted_file.open("r", encoding="utf-8") as infile: 716 reader = csv.DictReader(infile) 717 for item in reader: 718 yield item 719 720 # Remove the temporary files 721 if staging_area.is_dir(): 722 shutil.rmtree(staging_area) 723 724 def get_staging_area(self): 725 """ 726 Get path to a temporary folder in which files can be stored before 727 finishing 728 729 This folder must be created before use, but is guaranteed to not exist 730 yet. The folder may be used as a staging area for the dataset data 731 while it is being processed. 732 733 :return Path: Path to folder 734 """ 735 results_file = self.get_results_path() 736 737 results_dir_base = results_file.parent 738 results_dir = results_file.name.replace(".", "") + "-staging" 739 results_path = results_dir_base.joinpath(results_dir) 740 index = 1 741 while results_path.exists(): 742 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 743 index += 1 744 745 # create temporary folder 746 results_path.mkdir() 747 748 # Storing the staging area with the dataset so that it can be removed later 749 self.staging_areas.append(results_path) 750 751 return results_path 752 753 def remove_staging_areas(self): 754 """ 755 Remove any staging areas that were created and all files contained in them. 756 """ 757 # Remove DataSet staging areas 758 if self.staging_areas: 759 for staging_area in self.staging_areas: 760 if staging_area.is_dir(): 761 shutil.rmtree(staging_area) 762 763 def finish(self, num_rows=0): 764 """ 765 Declare the dataset finished 766 """ 767 if self.data["is_finished"]: 768 raise RuntimeError("Cannot finish a finished dataset again") 769 770 self.db.update( 771 "datasets", 772 where={"key": self.data["key"]}, 773 data={ 774 "is_finished": True, 775 "num_rows": num_rows, 776 "progress": 1.0, 777 "timestamp_finished": int(time.time()), 778 }, 779 ) 780 self.data["is_finished"] = True 781 self.data["num_rows"] = num_rows 782 783 def copy(self, shallow=True): 784 """ 785 Copies the dataset, making a new version with a unique key 786 787 788 :param bool shallow: Shallow copy: does not copy the result file, but 789 instead refers to the same file as the original dataset did 790 :return Dataset: Copied dataset 791 """ 792 parameters = self.parameters.copy() 793 794 # a key is partially based on the parameters. so by setting these extra 795 # attributes, we also ensure a unique key will be generated for the 796 # copy 797 # possibly todo: don't use time for uniqueness (but one shouldn't be 798 # copying a dataset multiple times per microsecond, that's not what 799 # this is for) 800 parameters["copied_from"] = self.key 801 parameters["copied_at"] = time.time() 802 803 copy = DataSet( 804 parameters=parameters, 805 db=self.db, 806 extension=self.result_file.split(".")[-1], 807 type=self.type, 808 modules=self.modules, 809 ) 810 for field in self.data: 811 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 812 continue 813 814 copy.__setattr__(field, self.data[field]) 815 816 if shallow: 817 # use the same result file 818 copy.result_file = self.result_file 819 else: 820 # copy to new file with new key 821 shutil.copy(self.get_results_path(), copy.get_results_path()) 822 823 if self.is_finished(): 824 copy.finish(self.num_rows) 825 826 # make sure ownership is also copied 827 copy.copy_ownership_from(self) 828 829 return copy 830 831 def delete(self, commit=True, queue=None): 832 """ 833 Delete the dataset, and all its children 834 835 Deletes both database records and result files. Note that manipulating 836 a dataset object after it has been deleted is undefined behaviour. 837 838 :param bool commit: Commit SQL DELETE query? 839 """ 840 # first, recursively delete children 841 children = self.db.fetchall( 842 "SELECT * FROM datasets WHERE key_parent = %s", (self.key,) 843 ) 844 for child in children: 845 try: 846 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 847 child.delete(commit=commit) 848 except DataSetException: 849 # dataset already deleted - race condition? 850 pass 851 852 # delete any queued jobs for this dataset 853 try: 854 job = Job.get_by_remote_ID(self.key, self.db, self.type) 855 if job.is_claimed: 856 # tell API to stop any jobs running for this dataset 857 # level 2 = cancel job 858 # we're not interested in the result - if the API is available, 859 # it will do its thing, if it's not the backend is probably not 860 # running so the job also doesn't need to be interrupted 861 call_api( 862 "cancel-job", 863 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 864 False, 865 ) 866 867 # this deletes the job from the database 868 job.finish(True) 869 870 except JobNotFoundException: 871 pass 872 873 # delete this dataset's own annotations 874 self.db.delete("annotations", where={"dataset": self.key}, commit=commit) 875 # delete annotations that have been generated as part of this dataset 876 self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit) 877 # delete annotation fields from other datasets that were created as part of this dataset 878 for parent_dataset in self.get_genealogy(): 879 field_deleted = False 880 annotation_fields = parent_dataset.annotation_fields 881 if annotation_fields: 882 for field_id in list(annotation_fields.keys()): 883 if annotation_fields[field_id].get("from_dataset", "") == self.key: 884 del annotation_fields[field_id] 885 field_deleted = True 886 if field_deleted: 887 parent_dataset.save_annotation_fields(annotation_fields) 888 889 # delete dataset from database 890 self.db.delete("datasets", where={"key": self.key}, commit=commit) 891 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 892 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 893 894 # delete from drive 895 try: 896 if self.get_results_path().exists(): 897 self.get_results_path().unlink() 898 if self.get_results_path().with_suffix(".log").exists(): 899 self.get_results_path().with_suffix(".log").unlink() 900 if self.get_results_folder_path().exists(): 901 shutil.rmtree(self.get_results_folder_path()) 902 903 except FileNotFoundError: 904 # already deleted, apparently 905 pass 906 except PermissionError as e: 907 self.db.log.error( 908 f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}" 909 ) 910 911 def update_children(self, **kwargs): 912 """ 913 Update an attribute for all child datasets 914 915 Can be used to e.g. change the owner, version, finished status for all 916 datasets in a tree 917 918 :param kwargs: Parameters corresponding to known dataset attributes 919 """ 920 for child in self.get_children(update=True): 921 for attr, value in kwargs.items(): 922 child.__setattr__(attr, value) 923 924 child.update_children(**kwargs) 925 926 def is_finished(self): 927 """ 928 Check if dataset is finished 929 :return bool: 930 """ 931 return bool(self.data["is_finished"]) 932 933 def is_rankable(self, multiple_items=True): 934 """ 935 Determine if a dataset is rankable 936 937 Rankable means that it is a CSV file with 'date' and 'value' columns 938 as well as one or more item label columns 939 940 :param bool multiple_items: Consider datasets with multiple items per 941 item (e.g. word_1, word_2, etc)? 942 943 :return bool: Whether the dataset is rankable or not 944 """ 945 if ( 946 self.get_results_path().suffix != ".csv" 947 or not self.get_results_path().exists() 948 ): 949 return False 950 951 column_options = {"date", "value", "item"} 952 if multiple_items: 953 column_options.add("word_1") 954 955 with self.get_results_path().open(encoding="utf-8") as infile: 956 reader = csv.DictReader(infile) 957 try: 958 return len(set(reader.fieldnames) & column_options) >= 3 959 except (TypeError, ValueError): 960 return False 961 962 def is_accessible_by(self, username, role="owner"): 963 """ 964 Check if dataset has given user as owner 965 966 :param str|User username: Username to check for 967 :return bool: 968 """ 969 if type(username) is not str: 970 if hasattr(username, "get_id"): 971 username = username.get_id() 972 else: 973 raise TypeError("User must be a str or User object") 974 975 # 'normal' owners 976 if username in [ 977 owner 978 for owner, meta in self.owners.items() 979 if (role is None or meta["role"] == role) 980 ]: 981 return True 982 983 # owners that are owner by being part of a tag 984 if username in itertools.chain( 985 *[ 986 tagged_owners 987 for tag, tagged_owners in self.tagged_owners.items() 988 if (role is None or self.owners[f"tag:{tag}"]["role"] == role) 989 ] 990 ): 991 return True 992 993 return False 994 995 def get_owners_users(self, role="owner"): 996 """ 997 Get list of dataset owners 998 999 This returns a list of *users* that are considered owners. Tags are 1000 transparently replaced with the users with that tag. 1001 1002 :param str|None role: Role to check for. If `None`, all owners are 1003 returned regardless of role. 1004 1005 :return set: De-duplicated owner list 1006 """ 1007 # 'normal' owners 1008 owners = [ 1009 owner 1010 for owner, meta in self.owners.items() 1011 if (role is None or meta["role"] == role) and not owner.startswith("tag:") 1012 ] 1013 1014 # owners that are owner by being part of a tag 1015 owners.extend( 1016 itertools.chain( 1017 *[ 1018 tagged_owners 1019 for tag, tagged_owners in self.tagged_owners.items() 1020 if role is None or self.owners[f"tag:{tag}"]["role"] == role 1021 ] 1022 ) 1023 ) 1024 1025 # de-duplicate before returning 1026 return set(owners) 1027 1028 def get_owners(self, role="owner"): 1029 """ 1030 Get list of dataset owners 1031 1032 This returns a list of all owners, and does not transparently resolve 1033 tags (like `get_owners_users` does). 1034 1035 :param str|None role: Role to check for. If `None`, all owners are 1036 returned regardless of role. 1037 1038 :return set: De-duplicated owner list 1039 """ 1040 return [ 1041 owner 1042 for owner, meta in self.owners.items() 1043 if (role is None or meta["role"] == role) 1044 ] 1045 1046 def add_owner(self, username, role="owner"): 1047 """ 1048 Set dataset owner 1049 1050 If the user is already an owner, but with a different role, the role is 1051 updated. If the user is already an owner with the same role, nothing happens. 1052 1053 :param str|User username: Username to set as owner 1054 :param str|None role: Role to add user with. 1055 """ 1056 if type(username) is not str: 1057 if hasattr(username, "get_id"): 1058 username = username.get_id() 1059 else: 1060 raise TypeError("User must be a str or User object") 1061 1062 if username not in self.owners: 1063 self.owners[username] = {"name": username, "key": self.key, "role": role} 1064 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 1065 1066 elif username in self.owners and self.owners[username]["role"] != role: 1067 self.db.update( 1068 "datasets_owners", 1069 data={"role": role}, 1070 where={"name": username, "key": self.key}, 1071 ) 1072 self.owners[username]["role"] = role 1073 1074 if username.startswith("tag:"): 1075 # this is a bit more complicated than just adding to the list of 1076 # owners, so do a full refresh 1077 self.refresh_owners() 1078 1079 # make sure children's owners remain in sync 1080 for child in self.get_children(update=True): 1081 child.add_owner(username, role) 1082 # not recursive, since we're calling it from recursive code! 1083 child.copy_ownership_from(self, recursive=False) 1084 1085 def remove_owner(self, username): 1086 """ 1087 Remove dataset owner 1088 1089 If no owner is set, the dataset is assigned to the anonymous user. 1090 If the user is not an owner, nothing happens. 1091 1092 :param str|User username: Username to set as owner 1093 """ 1094 if type(username) is not str: 1095 if hasattr(username, "get_id"): 1096 username = username.get_id() 1097 else: 1098 raise TypeError("User must be a str or User object") 1099 1100 if username in self.owners: 1101 del self.owners[username] 1102 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 1103 1104 if not self.owners: 1105 self.add_owner("anonymous") 1106 1107 if username in self.tagged_owners: 1108 del self.tagged_owners[username] 1109 1110 # make sure children's owners remain in sync 1111 for child in self.get_children(update=True): 1112 child.remove_owner(username) 1113 # not recursive, since we're calling it from recursive code! 1114 child.copy_ownership_from(self, recursive=False) 1115 1116 def refresh_owners(self): 1117 """ 1118 Update internal owner cache 1119 1120 This makes sure that the list of *users* and *tags* which can access the 1121 dataset is up to date. 1122 """ 1123 self.owners = { 1124 owner["name"]: owner 1125 for owner in self.db.fetchall( 1126 "SELECT * FROM datasets_owners WHERE key = %s", (self.key,) 1127 ) 1128 } 1129 1130 # determine which users (if any) are owners of the dataset by having a 1131 # tag that is listed as an owner 1132 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 1133 if owner_tags: 1134 tagged_owners = self.db.fetchall( 1135 "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,) 1136 ) 1137 self.tagged_owners = { 1138 owner_tag: [ 1139 user["name"] for user in tagged_owners if owner_tag in user["tags"] 1140 ] 1141 for owner_tag in owner_tags 1142 } 1143 else: 1144 self.tagged_owners = {} 1145 1146 def copy_ownership_from(self, dataset, recursive=True): 1147 """ 1148 Copy ownership 1149 1150 This is useful to e.g. make sure a dataset's ownership stays in sync 1151 with its parent 1152 1153 :param Dataset dataset: Parent to copy from 1154 :return: 1155 """ 1156 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 1157 1158 for role in ("owner", "viewer"): 1159 owners = dataset.get_owners(role=role) 1160 for owner in owners: 1161 self.db.insert( 1162 "datasets_owners", 1163 data={"key": self.key, "name": owner, "role": role}, 1164 commit=False, 1165 safe=True, 1166 ) 1167 1168 self.db.commit() 1169 if recursive: 1170 for child in self.get_children(update=True): 1171 child.copy_ownership_from(self, recursive=recursive) 1172 1173 def get_parameters(self): 1174 """ 1175 Get dataset parameters 1176 1177 The dataset parameters are stored as JSON in the database - parse them 1178 and return the resulting object 1179 1180 :return: Dataset parameters as originally stored 1181 """ 1182 try: 1183 return json.loads(self.data["parameters"]) 1184 except json.JSONDecodeError: 1185 return {} 1186 1187 def get_columns(self): 1188 """ 1189 Returns the dataset columns. 1190 1191 Useful for processor input forms. Can deal with both CSV and NDJSON 1192 files, the latter only if a `map_item` function is available in the 1193 processor that generated it. While in other cases one could use the 1194 keys of the JSON object, this is not always possible in follow-up code 1195 that uses the 'column' names, so for consistency this function acts as 1196 if no column can be parsed if no `map_item` function exists. 1197 1198 :return list: List of dataset columns; empty list if unable to parse 1199 """ 1200 if not self.get_results_path().exists(): 1201 # no file to get columns from 1202 return [] 1203 1204 if (self.get_results_path().suffix.lower() == ".csv") or ( 1205 self.get_results_path().suffix.lower() == ".ndjson" 1206 and self.get_own_processor() is not None 1207 and self.get_own_processor().map_item_method_available(dataset=self) 1208 ): 1209 items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100) 1210 try: 1211 keys = list(next(items).keys()) 1212 if self.annotation_fields: 1213 for annotation_field in self.annotation_fields.values(): 1214 annotation_column = annotation_field["label"] 1215 label_count = 1 1216 while annotation_column in keys: 1217 label_count += 1 1218 annotation_column = ( 1219 f"{annotation_field['label']}_{label_count}" 1220 ) 1221 keys.append(annotation_column) 1222 columns = keys 1223 except (StopIteration, NotImplementedError): 1224 # No items or otherwise unable to iterate 1225 columns = [] 1226 finally: 1227 del items 1228 else: 1229 # Filetype not CSV or an NDJSON with `map_item` 1230 columns = [] 1231 1232 return columns 1233 1234 def update_label(self, label): 1235 """ 1236 Update label for this dataset 1237 1238 :param str label: New label 1239 :return str: The new label, as returned by get_label 1240 """ 1241 self.parameters["label"] = label 1242 1243 self.db.update( 1244 "datasets", 1245 data={"parameters": json.dumps(self.parameters)}, 1246 where={"key": self.key}, 1247 ) 1248 return self.get_label() 1249 1250 def get_label(self, parameters=None, default="Query"): 1251 """ 1252 Generate a readable label for the dataset 1253 1254 :param dict parameters: Parameters of the dataset 1255 :param str default: Label to use if it cannot be inferred from the 1256 parameters 1257 1258 :return str: Label 1259 """ 1260 if not parameters: 1261 parameters = self.parameters 1262 1263 if parameters.get("label"): 1264 return parameters["label"] 1265 elif parameters.get("body_query") and parameters["body_query"] != "empty": 1266 return parameters["body_query"] 1267 elif parameters.get("body_match") and parameters["body_match"] != "empty": 1268 return parameters["body_match"] 1269 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 1270 return parameters["subject_query"] 1271 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 1272 return parameters["subject_match"] 1273 elif parameters.get("query"): 1274 label = parameters["query"] 1275 # Some legacy datasets have lists as query data 1276 if isinstance(label, list): 1277 label = ", ".join(label) 1278 1279 label = label if len(label) < 30 else label[:25] + "..." 1280 label = label.strip().replace("\n", ", ") 1281 return label 1282 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 1283 return "Flag: %s" % parameters["country_flag"] 1284 elif parameters.get("country_name") and parameters["country_name"] != "all": 1285 return "Country: %s" % parameters["country_name"] 1286 elif parameters.get("filename"): 1287 return parameters["filename"] 1288 elif parameters.get("board") and "datasource" in parameters: 1289 return parameters["datasource"] + "/" + parameters["board"] 1290 elif ( 1291 "datasource" in parameters 1292 and parameters["datasource"] in self.modules.datasources 1293 ): 1294 return ( 1295 self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 1296 ) 1297 else: 1298 return default 1299 1300 def change_datasource(self, datasource): 1301 """ 1302 Change the datasource type for this dataset 1303 1304 :param str label: New datasource type 1305 :return str: The new datasource type 1306 """ 1307 1308 self.parameters["datasource"] = datasource 1309 1310 self.db.update( 1311 "datasets", 1312 data={"parameters": json.dumps(self.parameters)}, 1313 where={"key": self.key}, 1314 ) 1315 return datasource 1316 1317 def reserve_result_file(self, parameters=None, extension="csv"): 1318 """ 1319 Generate a unique path to the results file for this dataset 1320 1321 This generates a file name for the data file of this dataset, and makes sure 1322 no file exists or will exist at that location other than the file we 1323 expect (i.e. the data for this particular dataset). 1324 1325 :param str extension: File extension, "csv" by default 1326 :param parameters: Dataset parameters 1327 :return bool: Whether the file path was successfully reserved 1328 """ 1329 if self.data["is_finished"]: 1330 raise RuntimeError("Cannot reserve results file for a finished dataset") 1331 1332 # Use 'random' for random post queries 1333 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 1334 file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"] 1335 # Use country code for country flag queries 1336 elif "country_flag" in parameters and parameters["country_flag"] != "all": 1337 file = ( 1338 "countryflag-" 1339 + str(parameters["country_flag"]) 1340 + "-" 1341 + self.data["key"] 1342 ) 1343 # Use the query string for all other queries 1344 else: 1345 query_bit = self.data["query"].replace(" ", "-").lower() 1346 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 1347 query_bit = query_bit[:100] # Crop to avoid OSError 1348 file = query_bit + "-" + self.data["key"] 1349 file = re.sub(r"[-]+", "-", file) 1350 1351 self.data["result_file"] = file + "." + extension.lower() 1352 index = 1 1353 while self.get_results_path().is_file(): 1354 self.data["result_file"] = file + "-" + str(index) + "." + extension.lower() 1355 index += 1 1356 1357 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 1358 data={"result_file": self.data["result_file"]}) 1359 return updated > 0 1360 1361 def get_key(self, query, parameters, parent="", time_offset=0): 1362 """ 1363 Generate a unique key for this dataset that can be used to identify it 1364 1365 The key is a hash of a combination of the query string and parameters. 1366 You never need to call this, really: it's used internally. 1367 1368 :param str query: Query string 1369 :param parameters: Dataset parameters 1370 :param parent: Parent dataset's key (if applicable) 1371 :param time_offset: Offset to add to the time component of the dataset 1372 key. This can be used to ensure a unique key even if the parameters and 1373 timing is otherwise identical to an existing dataset's 1374 1375 :return str: Dataset key 1376 """ 1377 # Return a hash based on parameters 1378 # we're going to use the hash of the parameters to uniquely identify 1379 # the dataset, so make sure it's always in the same order, or we might 1380 # end up creating multiple keys for the same dataset if python 1381 # decides to return the dict in a different order 1382 param_key = collections.OrderedDict() 1383 for key in sorted(parameters): 1384 param_key[key] = parameters[key] 1385 1386 # we additionally use the current time as a salt - this should usually 1387 # ensure a unique key for the dataset. if for some reason there is a 1388 # hash collision 1389 param_key["_salt"] = int(time.time()) + time_offset 1390 1391 parent_key = str(parent) if parent else "" 1392 plain_key = repr(param_key) + str(query) + parent_key 1393 hashed_key = hash_to_md5(plain_key) 1394 1395 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1396 # key exists, generate a new one 1397 return self.get_key( 1398 query, parameters, parent, time_offset=random.randint(1, 10) 1399 ) 1400 else: 1401 return hashed_key 1402 1403 def set_key(self, key): 1404 """ 1405 Change dataset key 1406 1407 In principe, keys should never be changed. But there are rare cases 1408 where it is useful to do so, in particular when importing a dataset 1409 from another 4CAT instance; in that case it makes sense to try and 1410 ensure that the key is the same as it was before. This function sets 1411 the dataset key and updates any dataset references to it. 1412 1413 :param str key: Key to set 1414 :return str: Key that was set. If the desired key already exists, the 1415 original key is kept. 1416 """ 1417 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1418 if key_exists or not key: 1419 return self.key 1420 1421 old_key = self.key 1422 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1423 1424 # update references 1425 self.db.update( 1426 "datasets", data={"key_parent": key}, where={"key_parent": old_key} 1427 ) 1428 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1429 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1430 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1431 1432 # for good measure 1433 self.db.commit() 1434 self.key = key 1435 1436 return self.key 1437 1438 def get_status(self): 1439 """ 1440 Get Dataset status 1441 1442 :return string: Dataset status 1443 """ 1444 return self.data["status"] 1445 1446 def update_status(self, status, is_final=False): 1447 """ 1448 Update dataset status 1449 1450 The status is a string that may be displayed to a user to keep them 1451 updated and informed about the progress of a dataset. No memory is kept 1452 of earlier dataset statuses; the current status is overwritten when 1453 updated. 1454 1455 Statuses are also written to the dataset log file. 1456 1457 :param string status: Dataset status 1458 :param bool is_final: If this is `True`, subsequent calls to this 1459 method while the object is instantiated will not update the dataset 1460 status. 1461 :return bool: Status update successful? 1462 """ 1463 if self.no_status_updates: 1464 return 1465 1466 # for presets, copy the updated status to the preset(s) this is part of 1467 if self.preset_parent is None: 1468 self.preset_parent = [ 1469 parent 1470 for parent in self.get_genealogy() 1471 if parent.type.find("preset-") == 0 and parent.key != self.key 1472 ][:1] 1473 1474 if self.preset_parent: 1475 for preset_parent in self.preset_parent: 1476 if not preset_parent.is_finished(): 1477 preset_parent.update_status(status) 1478 1479 self.data["status"] = status 1480 updated = self.db.update( 1481 "datasets", where={"key": self.data["key"]}, data={"status": status} 1482 ) 1483 1484 if is_final: 1485 self.no_status_updates = True 1486 1487 self.log(status) 1488 1489 return updated > 0 1490 1491 def update_progress(self, progress): 1492 """ 1493 Update dataset progress 1494 1495 The progress can be used to indicate to a user how close the dataset 1496 is to completion. 1497 1498 :param float progress: Between 0 and 1. 1499 :return: 1500 """ 1501 progress = min(1, max(0, progress)) # clamp 1502 if type(progress) is int: 1503 progress = float(progress) 1504 1505 self.data["progress"] = progress 1506 updated = self.db.update( 1507 "datasets", where={"key": self.data["key"]}, data={"progress": progress} 1508 ) 1509 return updated > 0 1510 1511 def get_progress(self): 1512 """ 1513 Get dataset progress 1514 1515 :return float: Progress, between 0 and 1 1516 """ 1517 return self.data["progress"] 1518 1519 def finish_with_error(self, error): 1520 """ 1521 Set error as final status, and finish with 0 results 1522 1523 This is a convenience function to avoid having to repeat 1524 "update_status" and "finish" a lot. 1525 1526 :param str error: Error message for final dataset status. 1527 :return: 1528 """ 1529 self.update_status(error, is_final=True) 1530 self.finish(0) 1531 1532 return None 1533 1534 def update_version(self, version): 1535 """ 1536 Update software version used for this dataset 1537 1538 This can be used to verify the code that was used to process this dataset. 1539 1540 :param string version: Version identifier 1541 :return bool: Update successul? 1542 """ 1543 try: 1544 # this fails if the processor type is unknown 1545 # edge case, but let's not crash... 1546 processor_path = self.modules.processors.get(self.data["type"]).filepath 1547 except AttributeError: 1548 processor_path = "" 1549 1550 updated = self.db.update( 1551 "datasets", 1552 where={"key": self.data["key"]}, 1553 data={ 1554 "software_version": version[0], 1555 "software_source": version[1], 1556 "software_file": processor_path, 1557 }, 1558 ) 1559 1560 return updated > 0 1561 1562 def delete_parameter(self, parameter, instant=True): 1563 """ 1564 Delete a parameter from the dataset metadata 1565 1566 :param string parameter: Parameter to delete 1567 :param bool instant: Also delete parameters in this instance object? 1568 :return bool: Update successul? 1569 """ 1570 parameters = self.parameters.copy() 1571 if parameter in parameters: 1572 del parameters[parameter] 1573 else: 1574 return False 1575 1576 updated = self.db.update( 1577 "datasets", 1578 where={"key": self.data["key"]}, 1579 data={"parameters": json.dumps(parameters)}, 1580 ) 1581 1582 if instant: 1583 self.parameters = parameters 1584 1585 return updated > 0 1586 1587 def get_version_url(self, file): 1588 """ 1589 Get a versioned github URL for the version this dataset was processed with 1590 1591 :param file: File to link within the repository 1592 :return: URL, or an empty string 1593 """ 1594 if not self.data["software_source"]: 1595 return "" 1596 1597 filepath = self.data.get("software_file", "") 1598 if filepath.startswith("/extensions/"): 1599 # go to root of extension 1600 filepath = "/" + "/".join(filepath.split("/")[3:]) 1601 1602 return ( 1603 self.data["software_source"] 1604 + "/blob/" 1605 + self.data["software_version"] 1606 + filepath 1607 ) 1608 1609 def top_parent(self): 1610 """ 1611 Get root dataset 1612 1613 Traverses the tree of datasets this one is part of until it finds one 1614 with no source_dataset dataset, then returns that dataset. 1615 1616 :return Dataset: Parent dataset 1617 """ 1618 genealogy = self.get_genealogy() 1619 return genealogy[0] 1620 1621 def get_genealogy(self): 1622 """ 1623 Get genealogy of this dataset 1624 1625 Creates a list of DataSet objects, with the first one being the 1626 'top' dataset, and each subsequent one being a child of the previous 1627 one, ending with the current dataset. 1628 1629 :return list: Dataset genealogy, oldest dataset first 1630 """ 1631 if not self._genealogy: 1632 key_parent = self.key_parent 1633 genealogy = [] 1634 1635 while key_parent: 1636 try: 1637 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1638 except DataSetException: 1639 break 1640 1641 genealogy.append(parent) 1642 if parent.key_parent: 1643 key_parent = parent.key_parent 1644 else: 1645 break 1646 1647 genealogy.reverse() 1648 self._genealogy = genealogy 1649 1650 genealogy = self._genealogy 1651 genealogy.append(self) 1652 1653 return genealogy 1654 1655 def get_children(self, update=False): 1656 """ 1657 Get children of this dataset 1658 1659 :param bool update: Update the list of children from database if True, else return cached value 1660 :return list: List of child datasets 1661 """ 1662 if self._children is not None and not update: 1663 return self._children 1664 1665 analyses = self.db.fetchall( 1666 "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", 1667 (self.key,), 1668 ) 1669 self._children = [ 1670 DataSet(data=analysis, db=self.db, modules=self.modules) 1671 for analysis in analyses 1672 ] 1673 return self._children 1674 1675 def get_all_children(self, recursive=True, update=True): 1676 """ 1677 Get all children of this dataset 1678 1679 Results are returned as a non-hierarchical list, i.e. the result does 1680 not reflect the actual dataset hierarchy (but all datasets in the 1681 result will have the original dataset as an ancestor somewhere) 1682 1683 :return list: List of DataSets 1684 """ 1685 children = self.get_children(update=update) 1686 results = children.copy() 1687 if recursive: 1688 for child in children: 1689 results += child.get_all_children(recursive=recursive, update=update) 1690 1691 return results 1692 1693 def nearest(self, type_filter): 1694 """ 1695 Return nearest dataset that matches the given type 1696 1697 Starting with this dataset, traverse the hierarchy upwards and return 1698 whichever dataset matches the given type. 1699 1700 :param str type_filter: Type filter. Can contain wildcards and is matched 1701 using `fnmatch.fnmatch`. 1702 :return: Earliest matching dataset, or `None` if none match. 1703 """ 1704 genealogy = self.get_genealogy() 1705 for dataset in reversed(genealogy): 1706 if fnmatch.fnmatch(dataset.type, type_filter): 1707 return dataset 1708 1709 return None 1710 1711 def get_breadcrumbs(self): 1712 """ 1713 Get breadcrumbs navlink for use in permalinks 1714 1715 Returns a string representing this dataset's genealogy that may be used 1716 to uniquely identify it. 1717 1718 :return str: Nav link 1719 """ 1720 if not self.key_parent: 1721 return self.key 1722 1723 genealogy = self.get_genealogy() 1724 return ",".join([d.key for d in genealogy]) 1725 1726 def get_compatible_processors(self, config=None): 1727 """ 1728 Get list of processors compatible with this dataset 1729 1730 Checks whether this dataset type is one that is listed as being accepted 1731 by the processor, for each known type: if the processor does not 1732 specify accepted types (via the `is_compatible_with` method), it is 1733 assumed it accepts any top-level datasets 1734 1735 :param ConfigManager|None config: Configuration reader to determine 1736 compatibility through. This may not be the same reader the dataset was 1737 instantiated with, e.g. when checking whether some other user should 1738 be able to run processors on this dataset. 1739 :return dict: Compatible processors, `name => class` mapping 1740 """ 1741 processors = self.modules.processors 1742 1743 available = {} 1744 for processor_type, processor in processors.items(): 1745 if processor.is_from_collector(): 1746 continue 1747 1748 own_processor = self.get_own_processor() 1749 if own_processor and own_processor.exclude_followup_processors( 1750 processor_type 1751 ): 1752 continue 1753 1754 # consider a processor compatible if its is_compatible_with 1755 # method returns True *or* if it has no explicit compatibility 1756 # check and this dataset is top-level (i.e. has no parent) 1757 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1758 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)): 1759 available[processor_type] = processor 1760 1761 return available 1762 1763 def get_place_in_queue(self, update=False): 1764 """ 1765 Determine dataset's position in queue 1766 1767 If the dataset is already finished, the position is -1. Else, the 1768 position is the number of datasets to be completed before this one will 1769 be processed. A position of 0 would mean that the dataset is currently 1770 being executed, or that the backend is not running. 1771 1772 :param bool update: Update the queue position from database if True, else return cached value 1773 :return int: Queue position 1774 """ 1775 if self.is_finished() or not self.data.get("job"): 1776 self._queue_position = -1 1777 return self._queue_position 1778 elif not update and self._queue_position is not None: 1779 # Use cached value 1780 return self._queue_position 1781 else: 1782 # Collect queue position from database via the job 1783 try: 1784 job = Job.get_by_ID(self.data["job"], self.db) 1785 self._queue_position = job.get_place_in_queue() 1786 except JobNotFoundException: 1787 self._queue_position = -1 1788 1789 return self._queue_position 1790 1791 def get_own_processor(self): 1792 """ 1793 Get the processor class that produced this dataset 1794 1795 :return: Processor class, or `None` if not available. 1796 """ 1797 processor_type = self.parameters.get("type", self.data.get("type")) 1798 1799 return self.modules.processors.get(processor_type) 1800 1801 def get_available_processors(self, config=None, exclude_hidden=False): 1802 """ 1803 Get list of processors that may be run for this dataset 1804 1805 Returns all compatible processors except for those that are already 1806 queued or finished and have no options. Processors that have been 1807 run but have options are included so they may be run again with a 1808 different configuration 1809 1810 :param ConfigManager|None config: Configuration reader to determine 1811 compatibility through. This may not be the same reader the dataset was 1812 instantiated with, e.g. when checking whether some other user should 1813 be able to run processors on this dataset. 1814 :param bool exclude_hidden: Exclude processors that should be displayed 1815 in the UI? If `False`, all processors are returned. 1816 1817 :return dict: Available processors, `name => properties` mapping 1818 """ 1819 if self.available_processors: 1820 # Update to reflect exclude_hidden parameter which may be different from last call 1821 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1822 return { 1823 processor_type: processor 1824 for processor_type, processor in self.available_processors.items() 1825 if not exclude_hidden or not processor.is_hidden 1826 } 1827 1828 processors = self.get_compatible_processors(config=config) 1829 1830 for analysis in self.get_children(update=True): 1831 if analysis.type not in processors: 1832 continue 1833 1834 if not processors[analysis.type].get_options(config=config): 1835 # No variable options; this processor has been run so remove 1836 del processors[analysis.type] 1837 continue 1838 1839 if exclude_hidden and processors[analysis.type].is_hidden: 1840 del processors[analysis.type] 1841 1842 self.available_processors = processors 1843 return processors 1844 1845 def link_job(self, job): 1846 """ 1847 Link this dataset to a job ID 1848 1849 Updates the dataset data to include a reference to the job that will be 1850 executing (or has already executed) this job. 1851 1852 Note that if no job can be found for this dataset, this method silently 1853 fails. 1854 1855 :param Job job: The job that will run this dataset 1856 1857 :todo: If the job column ever gets used, make sure it always contains 1858 a valid value, rather than silently failing this method. 1859 """ 1860 if type(job) is not Job: 1861 raise TypeError("link_job requires a Job object as its argument") 1862 1863 if "id" not in job.data: 1864 try: 1865 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1866 except JobNotFoundException: 1867 return 1868 1869 self.db.update( 1870 "datasets", where={"key": self.key}, data={"job": job.data["id"]} 1871 ) 1872 1873 def link_parent(self, key_parent): 1874 """ 1875 Set source_dataset key for this dataset 1876 1877 :param key_parent: Parent key. Not checked for validity 1878 """ 1879 self.db.update( 1880 "datasets", where={"key": self.key}, data={"key_parent": key_parent} 1881 ) 1882 1883 def get_parent(self): 1884 """ 1885 Get parent dataset 1886 1887 :return DataSet: Parent dataset, or `None` if not applicable 1888 """ 1889 return ( 1890 DataSet(key=self.key_parent, db=self.db, modules=self.modules) 1891 if self.key_parent 1892 else None 1893 ) 1894 1895 def detach(self): 1896 """ 1897 Makes the datasets standalone, i.e. not having any source_dataset dataset 1898 """ 1899 self.link_parent("") 1900 1901 def is_dataset(self): 1902 """ 1903 Easy way to confirm this is a dataset. 1904 Used for checking processor and dataset compatibility, 1905 which needs to handle both processors and datasets. 1906 """ 1907 return True 1908 1909 def is_top_dataset(self): 1910 """ 1911 Easy way to confirm this is a top dataset. 1912 Used for checking processor and dataset compatibility, 1913 which needs to handle both processors and datasets. 1914 """ 1915 if self.key_parent: 1916 return False 1917 return True 1918 1919 def is_expiring(self, config): 1920 """ 1921 Determine if dataset is set to expire 1922 1923 Similar to `is_expired`, but checks if the dataset will be deleted in 1924 the future, not if it should be deleted right now. 1925 1926 :param ConfigManager config: Configuration reader (context-aware) 1927 :return bool|int: `False`, or the expiration date as a Unix timestamp. 1928 """ 1929 # has someone opted out of deleting this? 1930 if self.parameters.get("keep"): 1931 return False 1932 1933 # is this dataset explicitly marked as expiring after a certain time? 1934 if self.parameters.get("expires-after"): 1935 return self.parameters.get("expires-after") 1936 1937 # is the data source configured to have its datasets expire? 1938 expiration = config.get("datasources.expiration", {}) 1939 if not expiration.get(self.parameters.get("datasource")): 1940 return False 1941 1942 # is there a timeout for this data source? 1943 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1944 return self.timestamp + expiration.get( 1945 self.parameters.get("datasource") 1946 ).get("timeout") 1947 1948 return False 1949 1950 def is_expired(self, config): 1951 """ 1952 Determine if dataset should be deleted 1953 1954 Datasets can be set to expire, but when they should be deleted depends 1955 on a number of factor. This checks them all. 1956 1957 :param ConfigManager config: Configuration reader (context-aware) 1958 :return bool: 1959 """ 1960 # has someone opted out of deleting this? 1961 if not self.is_expiring(config): 1962 return False 1963 1964 # is this dataset explicitly marked as expiring after a certain time? 1965 future = ( 1966 time.time() + 3600 1967 ) # ensure we don't delete datasets with invalid expiration times 1968 if ( 1969 self.parameters.get("expires-after") 1970 and convert_to_int(self.parameters["expires-after"], future) < time.time() 1971 ): 1972 return True 1973 1974 # is the data source configured to have its datasets expire? 1975 expiration = config.get("datasources.expiration", {}) 1976 if not expiration.get(self.parameters.get("datasource")): 1977 return False 1978 1979 # is the dataset older than the set timeout? 1980 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1981 return ( 1982 self.timestamp 1983 + expiration[self.parameters.get("datasource")]["timeout"] 1984 < time.time() 1985 ) 1986 1987 return False 1988 1989 def is_from_collector(self): 1990 """ 1991 Check if this dataset was made by a processor that collects data, i.e. 1992 a search or import worker. 1993 1994 :return bool: 1995 """ 1996 return self.type.endswith("-search") or self.type.endswith("-import") 1997 1998 def get_extension(self): 1999 """ 2000 Gets the file extention this dataset produces. 2001 Also checks whether the results file exists. 2002 Used for checking processor and dataset compatibility. 2003 2004 :return str extension: Extension, e.g. `csv` 2005 """ 2006 if self.get_results_path().exists(): 2007 return self.get_results_path().suffix[1:] 2008 2009 return False 2010 2011 def get_media_type(self): 2012 """ 2013 Gets the media type of the dataset file. 2014 2015 :return str: media type, e.g., "text" 2016 """ 2017 own_processor = self.get_own_processor() 2018 if hasattr(self, "media_type"): 2019 # media type can be defined explicitly in the dataset; this is the priority 2020 return self.media_type 2021 elif own_processor is not None: 2022 # or media type can be defined in the processor 2023 # some processors can set different media types for different datasets (e.g., import_media) 2024 if hasattr(own_processor, "media_type"): 2025 return own_processor.media_type 2026 2027 # Default to text 2028 return self.parameters.get("media_type", "text") 2029 2030 def get_metadata(self): 2031 """ 2032 Get dataset metadata 2033 2034 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 2035 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 2036 update its database (and ensure compatibility with the exporting version of 4CAT). 2037 """ 2038 metadata = self.db.fetchone( 2039 "SELECT * FROM datasets WHERE key = %s", (self.key,) 2040 ) 2041 2042 # get 4CAT version (presumably to ensure export is compatible with import) 2043 metadata["current_4CAT_version"] = get_software_version() 2044 return metadata 2045 2046 def get_result_url(self): 2047 """ 2048 Gets the 4CAT frontend URL of a dataset file. 2049 2050 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 2051 SERVER_HTTPS) plus hardcoded '/result/'. 2052 TODO: create more dynamic method of obtaining url. 2053 """ 2054 filename = self.get_results_path().name 2055 2056 # we cheat a little here by using the modules' config reader, but these 2057 # will never be context-dependent values anyway 2058 url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \ 2059 self.modules.config.get("flask.server_name") + '/result/' + filename 2060 return url_to_file 2061 2062 def warn_unmappable_item( 2063 self, item_count, processor=None, error_message=None, warn_admins=True 2064 ): 2065 """ 2066 Log an item that is unable to be mapped and warn administrators. 2067 2068 :param int item_count: Item index 2069 :param Processor processor: Processor calling function8 2070 """ 2071 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 2072 2073 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 2074 closest_dataset = ( 2075 processor.dataset 2076 if processor is not None and processor.dataset is not None 2077 else self 2078 ) 2079 # Log error to dataset log 2080 closest_dataset.log(dataset_error_message) 2081 2082 if warn_admins: 2083 if processor is not None: 2084 processor.log.warning( 2085 f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}." 2086 ) 2087 elif hasattr(self.db, "log"): 2088 # borrow the database's log handler 2089 self.db.log.warning( 2090 f"Unable to map item all items for dataset {closest_dataset.key}." 2091 ) 2092 else: 2093 # No other log available 2094 raise DataSetException( 2095 f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn" 2096 ) 2097 2098 # Annotation functions (most of it is handled in Annotations) 2099 def has_annotations(self) -> bool: 2100 """ 2101 Whether this dataset has annotations 2102 """ 2103 2104 annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,)) 2105 2106 return True if annotation else False 2107 2108 def num_annotations(self) -> int: 2109 """ 2110 Get the amount of annotations 2111 """ 2112 return self.db.fetchone( 2113 "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,) 2114 )["count"] 2115 2116 def get_annotation(self, data: dict): 2117 """ 2118 Retrieves a specific annotation if it exists. 2119 2120 :param data: A dictionary with which to get the annotations from. 2121 To get specific annotations, include either an `id` field or 2122 `field_id` and `item_id` fields. 2123 2124 return Annotation: Annotation object. 2125 """ 2126 2127 if "id" not in data or ("field_id" not in data and "item_id" not in data): 2128 return None 2129 2130 if "dataset" not in data: 2131 data["dataset"] = self.key 2132 2133 return Annotation(data=data, db=self.db) 2134 2135 def get_annotations(self) -> list: 2136 """ 2137 Retrieves all annotations for this dataset. 2138 2139 return list: List of Annotation objects. 2140 """ 2141 2142 return Annotation.get_annotations_for_dataset(self.db, self.key) 2143 2144 def get_annotations_for_item(self, item_id: str) -> list: 2145 """ 2146 Retrieves all annotations from this dataset for a specific item (e.g. social media post). 2147 """ 2148 return Annotation.get_annotations_for_dataset( 2149 self.db, self.key, item_id=item_id 2150 ) 2151 2152 def has_annotation_fields(self) -> bool: 2153 """ 2154 Returns True if there's annotation fields saved tot the dataset table 2155 Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.). 2156 """ 2157 2158 return True if self.annotation_fields else False 2159 2160 def get_annotation_field_labels(self) -> list: 2161 """ 2162 Retrieves the saved annotation field labels for this dataset. 2163 These are stored in the annotations table. 2164 2165 :return list: List of annotation field labels. 2166 """ 2167 2168 annotation_fields = self.annotation_fields 2169 2170 if not annotation_fields: 2171 return [] 2172 2173 labels = [v["label"] for v in annotation_fields.values()] 2174 2175 return labels 2176 2177 def save_annotations(self, annotations: list) -> int: 2178 """ 2179 Takes a list of annotations and saves them to the annotations table. 2180 If a field is not yet present in the `annotation_fields` column in 2181 the datasets table, it also adds it there. 2182 2183 :param list annotations: List of dictionaries with annotation items. Must have `item_id`, `field_id`, 2184 and `label`. 2185 `item_id` is for the specific item being annotated (e.g. a social media post) 2186 `field_id` refers to the annotation field. 2187 `label` is a human-readable description of this annotation. 2188 E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", 2189 "value": "Yes"}] 2190 2191 :returns int: How many annotations were saved. 2192 2193 """ 2194 2195 if not annotations: 2196 return 0 2197 2198 count = 0 2199 annotation_fields = self.annotation_fields 2200 2201 # Add some dataset data to annotations, if not present 2202 for annotation_data in annotations: 2203 # Check if the required fields are present 2204 if "item_id" not in annotation_data: 2205 raise AnnotationException( 2206 "Can't save annotations; annotation must have an `item_id` referencing " 2207 "the item it annotated, got %s" % annotation_data 2208 ) 2209 if "field_id" not in annotation_data: 2210 raise AnnotationException( 2211 "Can't save annotations; annotation must have a `field_id` field, " 2212 "got %s" % annotation_data 2213 ) 2214 if "label" not in annotation_data or not isinstance( 2215 annotation_data["label"], str 2216 ): 2217 raise AnnotationException( 2218 "Can't save annotations; annotation must have a `label` field, " 2219 "got %s" % annotation_data 2220 ) 2221 2222 # Set dataset key 2223 if not annotation_data.get("dataset"): 2224 annotation_data["dataset"] = self.key 2225 2226 # Set default author to this dataset owner 2227 # If this annotation is made by a processor, it will have the processor name 2228 if not annotation_data.get("author"): 2229 annotation_data["author"] = self.get_owners()[0] 2230 2231 # Create Annotation object, which also saves it to the database 2232 # If this dataset/item ID/label combination already exists, this retrieves the 2233 # existing data and updates it with new values. 2234 Annotation(data=annotation_data, db=self.db) 2235 count += 1 2236 2237 # Save annotation fields if things changed 2238 if annotation_fields != self.annotation_fields: 2239 self.save_annotation_fields(annotation_fields) 2240 2241 return count 2242 2243 def save_annotation_fields(self, new_fields: dict, add=False) -> int: 2244 """ 2245 Save annotation field data to the datasets table (in the `annotation_fields` column). 2246 If changes to the annotation fields affect existing annotations, 2247 this function will also call `update_annotations_via_fields()` to change them. 2248 2249 :param dict new_fields: New annotation fields, with a field ID as key. 2250 2251 :param bool add: Whether we're merely adding new fields 2252 or replacing the whole batch. If add is False, 2253 `new_fields` should contain all fields. 2254 2255 :return int: The number of annotation fields saved. 2256 2257 """ 2258 2259 # Get existing annotation fields to see if stuff changed. 2260 old_fields = self.annotation_fields 2261 changes = False 2262 2263 # Do some validation 2264 # Annotation field must be valid JSON. 2265 try: 2266 json.dumps(new_fields) 2267 except ValueError: 2268 raise AnnotationException( 2269 "Can't save annotation fields: not valid JSON (%s)" % new_fields 2270 ) 2271 2272 # No duplicate IDs 2273 if len(new_fields) != len(set(new_fields)): 2274 raise AnnotationException( 2275 "Can't save annotation fields: field IDs must be unique" 2276 ) 2277 2278 # Annotation fields must at minimum have `type` and `label` keys. 2279 for field_id, annotation_field in new_fields.items(): 2280 if not isinstance(field_id, str): 2281 raise AnnotationException( 2282 "Can't save annotation fields: field ID %s is not a valid string" 2283 % field_id 2284 ) 2285 if "label" not in annotation_field: 2286 raise AnnotationException( 2287 "Can't save annotation fields: all fields must have a label" 2288 % field_id 2289 ) 2290 if "type" not in annotation_field: 2291 raise AnnotationException( 2292 "Can't save annotation fields: all fields must have a type" 2293 % field_id 2294 ) 2295 2296 # Check if fields are removed 2297 if not add: 2298 for field_id in old_fields.keys(): 2299 if field_id not in new_fields: 2300 changes = True 2301 2302 # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin 2303 # dataset 2304 for field_id in new_fields.keys(): 2305 if field_id in old_fields and old_fields[field_id].get("from_dataset"): 2306 old_fields[field_id]["label"] = new_fields[field_id][ 2307 "label" 2308 ] # Only labels could've been changed 2309 new_fields[field_id] = old_fields[field_id] 2310 2311 # If we're just adding fields, add them to the old fields. 2312 # If the field already exists, overwrite the old field. 2313 if add and old_fields: 2314 all_fields = old_fields 2315 for field_id, annotation_field in new_fields.items(): 2316 all_fields[field_id] = annotation_field 2317 new_fields = all_fields 2318 2319 # We're saving the new annotation fields as-is. 2320 # Ordering of fields is preserved this way. 2321 self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)}) 2322 self.annotation_fields = new_fields 2323 2324 # If anything changed with the annotation fields, possibly update 2325 # existing annotations (e.g. to delete them or change their labels). 2326 if changes: 2327 Annotation.update_annotations_via_fields( 2328 self.key, old_fields, new_fields, self.db 2329 ) 2330 2331 return len(new_fields) 2332 2333 def get_annotation_metadata(self) -> dict: 2334 """ 2335 Retrieves all the data for this dataset from the annotations table. 2336 """ 2337 2338 annotation_data = self.db.fetchall( 2339 "SELECT * FROM annotations WHERE dataset = '%s';" % self.key 2340 ) 2341 return annotation_data 2342 2343 def __getattr__(self, attr): 2344 """ 2345 Getter so we don't have to use .data all the time 2346 2347 :param attr: Data key to get 2348 :return: Value 2349 """ 2350 2351 if attr in dir(self): 2352 # an explicitly defined attribute should always be called in favour 2353 # of this passthrough 2354 attribute = getattr(self, attr) 2355 return attribute 2356 elif attr in self.data: 2357 return self.data[attr] 2358 else: 2359 raise AttributeError("DataSet instance has no attribute %s" % attr) 2360 2361 def __setattr__(self, attr, value): 2362 """ 2363 Setter so we can flexibly update the database 2364 2365 Also updates internal data stores (.data etc). If the attribute is 2366 unknown, it is stored within the 'parameters' attribute. 2367 2368 :param str attr: Attribute to update 2369 :param value: New value 2370 """ 2371 2372 # don't override behaviour for *actual* class attributes 2373 if attr in dir(self): 2374 super().__setattr__(attr, value) 2375 return 2376 2377 if attr not in self.data: 2378 self.parameters[attr] = value 2379 attr = "parameters" 2380 value = self.parameters 2381 2382 if attr == "parameters": 2383 value = json.dumps(value) 2384 2385 self.db.update("datasets", where={"key": self.key}, data={attr: value}) 2386 2387 self.data[attr] = value 2388 2389 if attr == "parameters": 2390 self.parameters = json.loads(value)
Provide interface to safely register and run operations on a dataset
A dataset is a collection of:
- A unique identifier
- A set of parameters that demarcate the data contained within
- The data
The data is usually stored in a file on the disk; the parameters are stored in a database. The handling of the data, et cetera, is done by other workers; this class defines method to create and manipulate the dataset's properties.
61 def __init__( 62 self, 63 parameters=None, 64 key=None, 65 job=None, 66 data=None, 67 db=None, 68 parent="", 69 extension=None, 70 type=None, 71 is_private=True, 72 owner="anonymous", 73 modules=None 74 ): 75 """ 76 Create new dataset object 77 78 If the dataset is not in the database yet, it is added. 79 80 :param dict parameters: Only when creating a new dataset. Dataset 81 parameters, free-form dictionary. 82 :param str key: Dataset key. If given, dataset with this key is loaded. 83 :param int job: Job ID. If given, dataset corresponding to job is 84 loaded. 85 :param dict data: Dataset data, corresponding to a row in the datasets 86 database table. If not given, retrieved from database depending on key. 87 :param db: Database connection 88 :param str parent: Only when creating a new dataset. Parent dataset 89 key to which the one being created is a child. 90 :param str extension: Only when creating a new dataset. Extension of 91 dataset result file. 92 :param str type: Only when creating a new dataset. Type of the dataset, 93 corresponding to the type property of a processor class. 94 :param bool is_private: Only when creating a new dataset. Whether the 95 dataset is private or public. 96 :param str owner: Only when creating a new dataset. The user name of 97 the dataset's creator. 98 :param modules: Module cache. If not given, will be loaded when needed 99 (expensive). Used to figure out what processors are compatible with 100 this dataset. 101 """ 102 self.db = db 103 104 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 105 self.data = {} 106 self.parameters = {} 107 self.available_processors = {} 108 self._genealogy = [] 109 self.staging_areas = [] 110 self.modules = modules 111 112 if key is not None: 113 self.key = key 114 current = self.db.fetchone( 115 "SELECT * FROM datasets WHERE key = %s", (self.key,) 116 ) 117 if not current: 118 raise DataSetNotFoundException( 119 "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" 120 % key 121 ) 122 123 elif job is not None: 124 current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),)) 125 if not current: 126 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 127 128 self.key = current["key"] 129 elif data is not None: 130 current = data 131 if ( 132 "query" not in data 133 or "key" not in data 134 or "parameters" not in data 135 or "key_parent" not in data 136 ): 137 raise DataSetException( 138 "DataSet() requires a complete dataset record for its 'data' argument" 139 ) 140 141 self.key = current["key"] 142 else: 143 if parameters is None: 144 raise DataSetException( 145 "DataSet() requires either 'key', or 'parameters' to be given" 146 ) 147 148 if not type: 149 raise DataSetException("Datasets must have their type set explicitly") 150 151 query = self.get_label(parameters, default=type) 152 self.key = self.get_key(query, parameters, parent) 153 current = self.db.fetchone( 154 "SELECT * FROM datasets WHERE key = %s AND query = %s", 155 (self.key, query), 156 ) 157 158 if current: 159 self.data = current 160 self.parameters = json.loads(self.data["parameters"]) 161 self.annotation_fields = json.loads(self.data["annotation_fields"]) \ 162 if self.data.get("annotation_fields") else {} 163 self.is_new = False 164 else: 165 self.data = {"type": type} # get_own_processor needs this 166 own_processor = self.get_own_processor() 167 version = get_software_commit(own_processor) 168 self.data = { 169 "key": self.key, 170 "query": self.get_label(parameters, default=type), 171 "parameters": json.dumps(parameters), 172 "result_file": "", 173 "creator": owner, 174 "status": "", 175 "type": type, 176 "timestamp": int(time.time()), 177 "is_finished": False, 178 "is_private": is_private, 179 "software_version": version[0], 180 "software_source": version[1], 181 "software_file": "", 182 "num_rows": 0, 183 "progress": 0.0, 184 "key_parent": parent, 185 "annotation_fields": "{}" 186 } 187 self.parameters = parameters 188 self.annotation_fields = {} 189 190 self.db.insert("datasets", data=self.data) 191 self.refresh_owners() 192 self.add_owner(owner) 193 194 # Find desired extension from processor if not explicitly set 195 if extension is None: 196 if own_processor: 197 extension = own_processor.get_extension( 198 parent_dataset=DataSet(key=parent, db=db, modules=self.modules) 199 if parent 200 else None 201 ) 202 # Still no extension, default to 'csv' 203 if not extension: 204 extension = "csv" 205 206 # Reserve filename and update data['result_file'] 207 self.reserve_result_file(parameters, extension) 208 209 self.refresh_owners()
Create new dataset object
If the dataset is not in the database yet, it is added.
Parameters
- dict parameters: Only when creating a new dataset. Dataset parameters, free-form dictionary.
- str key: Dataset key. If given, dataset with this key is loaded.
- int job: Job ID. If given, dataset corresponding to job is loaded.
- dict data: Dataset data, corresponding to a row in the datasets database table. If not given, retrieved from database depending on key.
- db: Database connection
- str parent: Only when creating a new dataset. Parent dataset key to which the one being created is a child.
- str extension: Only when creating a new dataset. Extension of dataset result file.
- str type: Only when creating a new dataset. Type of the dataset, corresponding to the type property of a processor class.
- bool is_private: Only when creating a new dataset. Whether the dataset is private or public.
- str owner: Only when creating a new dataset. The user name of the dataset's creator.
- modules: Module cache. If not given, will be loaded when needed (expensive). Used to figure out what processors are compatible with this dataset.
211 def check_dataset_finished(self): 212 """ 213 Checks if dataset is finished. Returns path to results file is not empty, 214 or 'empty_file' when there were not matches. 215 216 Only returns a path if the dataset is complete. In other words, if this 217 method returns a path, a file with the complete results for this dataset 218 will exist at that location. 219 220 :return: A path to the results file, 'empty_file', or `None` 221 """ 222 if self.data["is_finished"] and self.data["num_rows"] > 0: 223 return self.get_results_path() 224 elif self.data["is_finished"] and self.data["num_rows"] == 0: 225 return 'empty' 226 else: 227 return None
Checks if dataset is finished. Returns path to results file is not empty, or 'empty_file' when there were not matches.
Only returns a path if the dataset is complete. In other words, if this method returns a path, a file with the complete results for this dataset will exist at that location.
Returns
A path to the results file, 'empty_file', or
None
229 def get_results_path(self): 230 """ 231 Get path to results file 232 233 Always returns a path, that will at some point contain the dataset 234 data, but may not do so yet. Use this to get the location to write 235 generated results to. 236 237 :return Path: A path to the results file 238 """ 239 # alas we need to instantiate a config reader here - no way around it 240 if not self.folder: 241 self.folder = self.modules.config.get('PATH_ROOT').joinpath(self.modules.config.get('PATH_DATA')) 242 return self.folder.joinpath(self.data["result_file"])
Get path to results file
Always returns a path, that will at some point contain the dataset data, but may not do so yet. Use this to get the location to write generated results to.
Returns
A path to the results file
244 def get_results_folder_path(self): 245 """ 246 Get path to folder containing accompanying results 247 248 Returns a path that may not yet be created 249 250 :return Path: A path to the results file 251 """ 252 return self.get_results_path().parent.joinpath("folder_" + self.key)
Get path to folder containing accompanying results
Returns a path that may not yet be created
Returns
A path to the results file
254 def get_log_path(self): 255 """ 256 Get path to dataset log file 257 258 Each dataset has a single log file that documents its creation. This 259 method returns the path to that file. It is identical to the path of 260 the dataset result file, with 'log' as its extension instead. 261 262 :return Path: A path to the log file 263 """ 264 return self.get_results_path().with_suffix(".log")
Get path to dataset log file
Each dataset has a single log file that documents its creation. This method returns the path to that file. It is identical to the path of the dataset result file, with 'log' as its extension instead.
Returns
A path to the log file
266 def clear_log(self): 267 """ 268 Clears the dataset log file 269 270 If the log file does not exist, it is created empty. The log file will 271 have the same file name as the dataset result file, with the 'log' 272 extension. 273 """ 274 log_path = self.get_log_path() 275 with log_path.open("w"): 276 pass
Clears the dataset log file
If the log file does not exist, it is created empty. The log file will have the same file name as the dataset result file, with the 'log' extension.
278 def log(self, log): 279 """ 280 Write log message to file 281 282 Writes the log message to the log file on a new line, including a 283 timestamp at the start of the line. Note that this assumes the log file 284 already exists - it should have been created/cleared with clear_log() 285 prior to calling this. 286 287 :param str log: Log message to write 288 """ 289 log_path = self.get_log_path() 290 with log_path.open("a", encoding="utf-8") as outfile: 291 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
Write log message to file
Writes the log message to the log file on a new line, including a timestamp at the start of the line. Note that this assumes the log file already exists - it should have been created/cleared with clear_log() prior to calling this.
Parameters
- str log: Log message to write
356 def iterate_items( 357 self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None 358 ): 359 """ 360 Generate mapped dataset items 361 362 Wrapper for _iterate_items that returns a DatasetItem, which can be 363 accessed as a dict returning the original item or (if a mapper is 364 available) the mapped item. Mapped or original versions of the item can 365 also be accessed via the `original` and `mapped_object` properties of 366 the DatasetItem. 367 368 Processors can define a method called `map_item` that can be used to map 369 an item from the dataset file before it is processed any further. This is 370 slower than storing the data file in the right format to begin with but 371 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 372 are nested objects when retrieved from the twitter API that are easier 373 to store as a JSON file than as a flat CSV file, and it would be a shame 374 to throw away that data. 375 376 Note the two parameters warn_unmappable and map_missing. Items can be 377 unmappable in that their structure is too different to coerce into a 378 neat dictionary of the structure the data source expects. This makes it 379 'unmappable' and warn_unmappable determines what happens in this case. 380 It can also be of the right structure, but with some fields missing or 381 incomplete. map_missing determines what happens in that case. The 382 latter is for example possible when importing data via Zeeschuimer, 383 which produces unstably-structured data captured from social media 384 sites. 385 386 :param BasicProcessor processor: A reference to the processor 387 iterating the dataset. 388 :param bool warn_unmappable: If an item is not mappable, skip the item 389 and log a warning 390 :param max_unmappable: Skip at most this many unmappable items; if 391 more are encountered, stop iterating. `None` to never stop. 392 :param map_missing: Indicates what to do with mapped items for which 393 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 394 - 'default': fill missing fields with the default passed by map_item 395 - 'abort': raise a MappedItemIncompleteException if a field is missing 396 - a callback: replace missing field with the return value of the 397 callback. The MappedItem object is passed to the callback as the 398 first argument and the name of the missing field as the second. 399 - a dictionary with a key for each possible missing field: replace missing 400 field with a strategy for that field ('default', 'abort', or a callback) 401 :param get_annotations: Whether to also fetch annotations from the database. 402 This can be disabled to help speed up iteration. 403 :return generator: A generator that yields DatasetItems 404 """ 405 unmapped_items = 0 406 # Collect item_mapper for use with filter 407 item_mapper = False 408 own_processor = self.get_own_processor() 409 if own_processor and own_processor.map_item_method_available(dataset=self): 410 item_mapper = True 411 412 # Annotations are dynamically added, and we're handling them as 'extra' map_item fields. 413 if get_annotations: 414 annotation_fields = self.annotation_fields 415 num_annotations = 0 if not annotation_fields else self.num_annotations() 416 all_annotations = None 417 # If this dataset has less than n annotations, just retrieve them all before iterating 418 if 0 < num_annotations <= 500: 419 # Convert to dict for faster lookup when iterating over items 420 all_annotations = collections.defaultdict(list) 421 for annotation in self.get_annotations(): 422 all_annotations[annotation.item_id].append(annotation) 423 424 # missing field strategy can be for all fields at once, or per field 425 # if it is per field, it is a dictionary with field names and their strategy 426 # if it is for all fields, it may be a callback, 'abort', or 'default' 427 default_strategy = "default" 428 if type(map_missing) is not dict: 429 default_strategy = map_missing 430 map_missing = {} 431 432 # Loop through items 433 for i, item in enumerate(self._iterate_items(processor)): 434 # Save original to yield 435 original_item = item.copy() 436 437 # Map item 438 if item_mapper: 439 try: 440 mapped_item = own_processor.get_mapped_item(item) 441 except MapItemException as e: 442 if warn_unmappable: 443 self.warn_unmappable_item( 444 i, processor, e, warn_admins=unmapped_items is False 445 ) 446 447 unmapped_items += 1 448 if max_unmappable and unmapped_items > max_unmappable: 449 break 450 else: 451 continue 452 453 # check if fields have been marked as 'missing' in the 454 # underlying data, and treat according to the chosen strategy 455 if mapped_item.get_missing_fields(): 456 for missing_field in mapped_item.get_missing_fields(): 457 strategy = map_missing.get(missing_field, default_strategy) 458 459 if callable(strategy): 460 # delegate handling to a callback 461 mapped_item.data[missing_field] = strategy( 462 mapped_item.data, missing_field 463 ) 464 elif strategy == "abort": 465 # raise an exception to be handled at the processor level 466 raise MappedItemIncompleteException( 467 f"Cannot process item, field {missing_field} missing in source data." 468 ) 469 elif strategy == "default": 470 # use whatever was passed to the object constructor 471 mapped_item.data[missing_field] = mapped_item.data[ 472 missing_field 473 ].value 474 else: 475 raise ValueError( 476 "map_missing must be 'abort', 'default', or a callback." 477 ) 478 else: 479 mapped_item = original_item 480 481 # Add possible annotation values 482 if get_annotations and annotation_fields: 483 # We're always handling annotated data as a MappedItem object, 484 # even if no map_item() function is available for the data source. 485 if not isinstance(mapped_item, MappedItem): 486 mapped_item = MappedItem(mapped_item) 487 488 # Retrieve from all annotations 489 if all_annotations: 490 item_annotations = all_annotations.get(mapped_item.data["id"]) 491 # Or get annotations per item for large datasets (more queries but less memory usage) 492 else: 493 item_annotations = self.get_annotations_for_item( 494 mapped_item.data["id"] 495 ) 496 497 # Loop through annotation fields 498 for ( 499 annotation_field_id, 500 annotation_field_items, 501 ) in annotation_fields.items(): 502 # Set default value to an empty string 503 value = "" 504 # Set value if this item has annotations 505 if item_annotations: 506 # Items can have multiple annotations 507 for annotation in item_annotations: 508 if annotation.field_id == annotation_field_id: 509 value = annotation.value 510 if isinstance(value, list): 511 value = ",".join(value) 512 513 # Make sure labels are unique when iterating through items 514 column_name = annotation_field_items["label"] 515 label_count = 1 516 while column_name in mapped_item.data: 517 label_count += 1 518 column_name = f"{annotation_field_items['label']}_{label_count}" 519 520 # We're always adding an annotation value 521 # as an empty string, even if it's absent. 522 mapped_item.data[column_name] = value 523 524 # yield a DatasetItem, which is a dict with some special properties 525 yield DatasetItem( 526 mapper=item_mapper, 527 original=original_item, 528 mapped_object=mapped_item, 529 **( 530 mapped_item.get_item_data() 531 if type(mapped_item) is MappedItem 532 else mapped_item 533 ), 534 )
Generate mapped dataset items
Wrapper for _iterate_items that returns a DatasetItem, which can be
accessed as a dict returning the original item or (if a mapper is
available) the mapped item. Mapped or original versions of the item can
also be accessed via the original
and mapped_object
properties of
the DatasetItem.
Processors can define a method called map_item
that can be used to map
an item from the dataset file before it is processed any further. This is
slower than storing the data file in the right format to begin with but
not all data sources allow for easy 'flat' mapping of items, e.g. tweets
are nested objects when retrieved from the twitter API that are easier
to store as a JSON file than as a flat CSV file, and it would be a shame
to throw away that data.
Note the two parameters warn_unmappable and map_missing. Items can be unmappable in that their structure is too different to coerce into a neat dictionary of the structure the data source expects. This makes it 'unmappable' and warn_unmappable determines what happens in this case. It can also be of the right structure, but with some fields missing or incomplete. map_missing determines what happens in that case. The latter is for example possible when importing data via Zeeschuimer, which produces unstably-structured data captured from social media sites.
Parameters
- BasicProcessor processor: A reference to the processor iterating the dataset.
- bool warn_unmappable: If an item is not mappable, skip the item and log a warning
- max_unmappable: Skip at most this many unmappable items; if
more are encountered, stop iterating.
None
to never stop. - map_missing: Indicates what to do with mapped items for which
some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
- 'default': fill missing fields with the default passed by map_item
- 'abort': raise a MappedItemIncompleteException if a field is missing
- a callback: replace missing field with the return value of the callback. The MappedItem object is passed to the callback as the first argument and the name of the missing field as the second.
- a dictionary with a key for each possible missing field: replace missing field with a strategy for that field ('default', 'abort', or a callback)
- get_annotations: Whether to also fetch annotations from the database. This can be disabled to help speed up iteration.
Returns
A generator that yields DatasetItems
536 def sort_and_iterate_items( 537 self, sort="", reverse=False, chunk_size=50000, **kwargs 538 ) -> dict: 539 """ 540 Loop through items in a dataset, sorted by a given key. 541 542 This is a wrapper function for `iterate_items()` with the 543 added functionality of sorting a dataset. 544 545 :param sort: The item key that determines the sort order. 546 :param reverse: Whether to sort by largest values first. 547 548 :returns dict: Yields iterated post 549 """ 550 551 def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False): 552 """ 553 Sort items based on the given key and order. 554 555 :param items_to_sort: The items to sort 556 :param sort_key: The key to sort by 557 :param reverse: Whether to sort in reverse order 558 :return: Sorted items 559 """ 560 if reverse is False and (sort_key == "dataset-order" or sort_key == ""): 561 # Sort by dataset order 562 yield from items_to_sort 563 elif sort_key == "dataset-order" and reverse: 564 # Sort by dataset order in reverse 565 yield from reversed(list(items_to_sort)) 566 else: 567 # Sort on the basis of a column value 568 if not convert_sort_to_float: 569 yield from sorted( 570 items_to_sort, 571 key=lambda x: x.get(sort_key, ""), 572 reverse=reverse, 573 ) 574 else: 575 # Dataset fields can contain integers and empty strings. 576 # Since these cannot be compared, we will convert every 577 # empty string to 0. 578 yield from sorted( 579 items_to_sort, 580 key=lambda x: convert_to_float(x.get(sort_key, ""), force=True), 581 reverse=reverse, 582 ) 583 584 if self.num_rows < chunk_size: 585 try: 586 # First try to force-sort float values. If this doesn't work, it'll be alphabetical. 587 yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True) 588 except (TypeError, ValueError): 589 yield from sort_items( 590 self.iterate_items(**kwargs), 591 sort, 592 reverse, 593 convert_sort_to_float=False, 594 ) 595 596 else: 597 # For large datasets, we will use chunk sorting 598 staging_area = self.get_staging_area() 599 buffer = [] 600 chunk_files = [] 601 convert_sort_to_float = True 602 fieldnames = self.get_columns() 603 604 def write_chunk(buffer, chunk_index): 605 """ 606 Write a chunk of data to a temporary file 607 608 :param buffer: The buffer containing the chunk of data 609 :param chunk_index: The index of the chunk 610 :return: The path to the temporary file 611 """ 612 temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv") 613 with temp_file.open("w", encoding="utf-8") as chunk_file: 614 writer = csv.DictWriter(chunk_file, fieldnames=fieldnames) 615 writer.writeheader() 616 writer.writerows(buffer) 617 return temp_file 618 619 # Divide the dataset into sorted chunks 620 for item in self.iterate_items(**kwargs): 621 buffer.append(item) 622 if len(buffer) >= chunk_size: 623 try: 624 buffer = list( 625 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 626 ) 627 except (TypeError, ValueError): 628 convert_sort_to_float = False 629 buffer = list( 630 sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float) 631 ) 632 633 chunk_files.append(write_chunk(buffer, len(chunk_files))) 634 buffer.clear() 635 636 # Sort and write any remaining items in the buffer 637 if buffer: 638 buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float)) 639 chunk_files.append(write_chunk(buffer, len(chunk_files))) 640 buffer.clear() 641 642 # Merge sorted chunks into the final sorted file 643 sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv") 644 with sorted_file.open("w", encoding="utf-8") as outfile: 645 writer = csv.DictWriter(outfile, fieldnames=self.get_columns()) 646 writer.writeheader() 647 648 # Open all chunk files for reading 649 chunk_readers = [ 650 csv.DictReader(chunk.open("r", encoding="utf-8")) 651 for chunk in chunk_files 652 ] 653 heap = [] 654 655 # Initialize the heap with the first row from each chunk 656 for i, reader in enumerate(chunk_readers): 657 try: 658 row = next(reader) 659 if sort == "dataset-order" and reverse: 660 # Use a reverse index for "dataset-order" and reverse=True 661 sort_key = -i 662 elif convert_sort_to_float: 663 # Negate numeric keys for reverse sorting 664 sort_key = ( 665 -convert_to_float(row.get(sort, "")) 666 if reverse 667 else convert_to_float(row.get(sort, "")) 668 ) 669 else: 670 if reverse: 671 # For reverse string sorting, invert string comparison by creating a tuple 672 # with an inverted string - this makes Python's tuple comparison work in reverse 673 sort_key = ( 674 tuple(-ord(c) for c in row.get(sort, "")), 675 -i, 676 ) 677 else: 678 sort_key = (row.get(sort, ""), i) 679 heap.append((sort_key, i, row)) 680 except StopIteration: 681 pass 682 683 # Use a heap to merge sorted chunks 684 import heapq 685 686 heapq.heapify(heap) 687 while heap: 688 _, chunk_index, smallest_row = heapq.heappop(heap) 689 writer.writerow(smallest_row) 690 try: 691 next_row = next(chunk_readers[chunk_index]) 692 if sort == "dataset-order" and reverse: 693 # Use a reverse index for "dataset-order" and reverse=True 694 sort_key = -chunk_index 695 elif convert_sort_to_float: 696 sort_key = ( 697 -convert_to_float(next_row.get(sort, "")) 698 if reverse 699 else convert_to_float(next_row.get(sort, "")) 700 ) 701 else: 702 # Use the same inverted comparison for string values 703 if reverse: 704 sort_key = ( 705 tuple(-ord(c) for c in next_row.get(sort, "")), 706 -chunk_index, 707 ) 708 else: 709 sort_key = (next_row.get(sort, ""), chunk_index) 710 heapq.heappush(heap, (sort_key, chunk_index, next_row)) 711 except StopIteration: 712 pass 713 714 # Read the sorted file and yield each item 715 with sorted_file.open("r", encoding="utf-8") as infile: 716 reader = csv.DictReader(infile) 717 for item in reader: 718 yield item 719 720 # Remove the temporary files 721 if staging_area.is_dir(): 722 shutil.rmtree(staging_area)
Loop through items in a dataset, sorted by a given key.
This is a wrapper function for iterate_items()
with the
added functionality of sorting a dataset.
Parameters
- sort: The item key that determines the sort order.
- reverse: Whether to sort by largest values first.
:returns dict: Yields iterated post
724 def get_staging_area(self): 725 """ 726 Get path to a temporary folder in which files can be stored before 727 finishing 728 729 This folder must be created before use, but is guaranteed to not exist 730 yet. The folder may be used as a staging area for the dataset data 731 while it is being processed. 732 733 :return Path: Path to folder 734 """ 735 results_file = self.get_results_path() 736 737 results_dir_base = results_file.parent 738 results_dir = results_file.name.replace(".", "") + "-staging" 739 results_path = results_dir_base.joinpath(results_dir) 740 index = 1 741 while results_path.exists(): 742 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 743 index += 1 744 745 # create temporary folder 746 results_path.mkdir() 747 748 # Storing the staging area with the dataset so that it can be removed later 749 self.staging_areas.append(results_path) 750 751 return results_path
Get path to a temporary folder in which files can be stored before finishing
This folder must be created before use, but is guaranteed to not exist yet. The folder may be used as a staging area for the dataset data while it is being processed.
Returns
Path to folder
753 def remove_staging_areas(self): 754 """ 755 Remove any staging areas that were created and all files contained in them. 756 """ 757 # Remove DataSet staging areas 758 if self.staging_areas: 759 for staging_area in self.staging_areas: 760 if staging_area.is_dir(): 761 shutil.rmtree(staging_area)
Remove any staging areas that were created and all files contained in them.
763 def finish(self, num_rows=0): 764 """ 765 Declare the dataset finished 766 """ 767 if self.data["is_finished"]: 768 raise RuntimeError("Cannot finish a finished dataset again") 769 770 self.db.update( 771 "datasets", 772 where={"key": self.data["key"]}, 773 data={ 774 "is_finished": True, 775 "num_rows": num_rows, 776 "progress": 1.0, 777 "timestamp_finished": int(time.time()), 778 }, 779 ) 780 self.data["is_finished"] = True 781 self.data["num_rows"] = num_rows
Declare the dataset finished
783 def copy(self, shallow=True): 784 """ 785 Copies the dataset, making a new version with a unique key 786 787 788 :param bool shallow: Shallow copy: does not copy the result file, but 789 instead refers to the same file as the original dataset did 790 :return Dataset: Copied dataset 791 """ 792 parameters = self.parameters.copy() 793 794 # a key is partially based on the parameters. so by setting these extra 795 # attributes, we also ensure a unique key will be generated for the 796 # copy 797 # possibly todo: don't use time for uniqueness (but one shouldn't be 798 # copying a dataset multiple times per microsecond, that's not what 799 # this is for) 800 parameters["copied_from"] = self.key 801 parameters["copied_at"] = time.time() 802 803 copy = DataSet( 804 parameters=parameters, 805 db=self.db, 806 extension=self.result_file.split(".")[-1], 807 type=self.type, 808 modules=self.modules, 809 ) 810 for field in self.data: 811 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 812 continue 813 814 copy.__setattr__(field, self.data[field]) 815 816 if shallow: 817 # use the same result file 818 copy.result_file = self.result_file 819 else: 820 # copy to new file with new key 821 shutil.copy(self.get_results_path(), copy.get_results_path()) 822 823 if self.is_finished(): 824 copy.finish(self.num_rows) 825 826 # make sure ownership is also copied 827 copy.copy_ownership_from(self) 828 829 return copy
Copies the dataset, making a new version with a unique key
Parameters
- bool shallow: Shallow copy: does not copy the result file, but instead refers to the same file as the original dataset did
Returns
Copied dataset
831 def delete(self, commit=True, queue=None): 832 """ 833 Delete the dataset, and all its children 834 835 Deletes both database records and result files. Note that manipulating 836 a dataset object after it has been deleted is undefined behaviour. 837 838 :param bool commit: Commit SQL DELETE query? 839 """ 840 # first, recursively delete children 841 children = self.db.fetchall( 842 "SELECT * FROM datasets WHERE key_parent = %s", (self.key,) 843 ) 844 for child in children: 845 try: 846 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 847 child.delete(commit=commit) 848 except DataSetException: 849 # dataset already deleted - race condition? 850 pass 851 852 # delete any queued jobs for this dataset 853 try: 854 job = Job.get_by_remote_ID(self.key, self.db, self.type) 855 if job.is_claimed: 856 # tell API to stop any jobs running for this dataset 857 # level 2 = cancel job 858 # we're not interested in the result - if the API is available, 859 # it will do its thing, if it's not the backend is probably not 860 # running so the job also doesn't need to be interrupted 861 call_api( 862 "cancel-job", 863 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 864 False, 865 ) 866 867 # this deletes the job from the database 868 job.finish(True) 869 870 except JobNotFoundException: 871 pass 872 873 # delete this dataset's own annotations 874 self.db.delete("annotations", where={"dataset": self.key}, commit=commit) 875 # delete annotations that have been generated as part of this dataset 876 self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit) 877 # delete annotation fields from other datasets that were created as part of this dataset 878 for parent_dataset in self.get_genealogy(): 879 field_deleted = False 880 annotation_fields = parent_dataset.annotation_fields 881 if annotation_fields: 882 for field_id in list(annotation_fields.keys()): 883 if annotation_fields[field_id].get("from_dataset", "") == self.key: 884 del annotation_fields[field_id] 885 field_deleted = True 886 if field_deleted: 887 parent_dataset.save_annotation_fields(annotation_fields) 888 889 # delete dataset from database 890 self.db.delete("datasets", where={"key": self.key}, commit=commit) 891 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 892 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 893 894 # delete from drive 895 try: 896 if self.get_results_path().exists(): 897 self.get_results_path().unlink() 898 if self.get_results_path().with_suffix(".log").exists(): 899 self.get_results_path().with_suffix(".log").unlink() 900 if self.get_results_folder_path().exists(): 901 shutil.rmtree(self.get_results_folder_path()) 902 903 except FileNotFoundError: 904 # already deleted, apparently 905 pass 906 except PermissionError as e: 907 self.db.log.error( 908 f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}" 909 )
Delete the dataset, and all its children
Deletes both database records and result files. Note that manipulating a dataset object after it has been deleted is undefined behaviour.
Parameters
- bool commit: Commit SQL DELETE query?
911 def update_children(self, **kwargs): 912 """ 913 Update an attribute for all child datasets 914 915 Can be used to e.g. change the owner, version, finished status for all 916 datasets in a tree 917 918 :param kwargs: Parameters corresponding to known dataset attributes 919 """ 920 for child in self.get_children(update=True): 921 for attr, value in kwargs.items(): 922 child.__setattr__(attr, value) 923 924 child.update_children(**kwargs)
Update an attribute for all child datasets
Can be used to e.g. change the owner, version, finished status for all datasets in a tree
Parameters
- kwargs: Parameters corresponding to known dataset attributes
926 def is_finished(self): 927 """ 928 Check if dataset is finished 929 :return bool: 930 """ 931 return bool(self.data["is_finished"])
Check if dataset is finished
Returns
933 def is_rankable(self, multiple_items=True): 934 """ 935 Determine if a dataset is rankable 936 937 Rankable means that it is a CSV file with 'date' and 'value' columns 938 as well as one or more item label columns 939 940 :param bool multiple_items: Consider datasets with multiple items per 941 item (e.g. word_1, word_2, etc)? 942 943 :return bool: Whether the dataset is rankable or not 944 """ 945 if ( 946 self.get_results_path().suffix != ".csv" 947 or not self.get_results_path().exists() 948 ): 949 return False 950 951 column_options = {"date", "value", "item"} 952 if multiple_items: 953 column_options.add("word_1") 954 955 with self.get_results_path().open(encoding="utf-8") as infile: 956 reader = csv.DictReader(infile) 957 try: 958 return len(set(reader.fieldnames) & column_options) >= 3 959 except (TypeError, ValueError): 960 return False
Determine if a dataset is rankable
Rankable means that it is a CSV file with 'date' and 'value' columns as well as one or more item label columns
Parameters
- bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)?
Returns
Whether the dataset is rankable or not
962 def is_accessible_by(self, username, role="owner"): 963 """ 964 Check if dataset has given user as owner 965 966 :param str|User username: Username to check for 967 :return bool: 968 """ 969 if type(username) is not str: 970 if hasattr(username, "get_id"): 971 username = username.get_id() 972 else: 973 raise TypeError("User must be a str or User object") 974 975 # 'normal' owners 976 if username in [ 977 owner 978 for owner, meta in self.owners.items() 979 if (role is None or meta["role"] == role) 980 ]: 981 return True 982 983 # owners that are owner by being part of a tag 984 if username in itertools.chain( 985 *[ 986 tagged_owners 987 for tag, tagged_owners in self.tagged_owners.items() 988 if (role is None or self.owners[f"tag:{tag}"]["role"] == role) 989 ] 990 ): 991 return True 992 993 return False
Check if dataset has given user as owner
Parameters
- str|User username: Username to check for
Returns
995 def get_owners_users(self, role="owner"): 996 """ 997 Get list of dataset owners 998 999 This returns a list of *users* that are considered owners. Tags are 1000 transparently replaced with the users with that tag. 1001 1002 :param str|None role: Role to check for. If `None`, all owners are 1003 returned regardless of role. 1004 1005 :return set: De-duplicated owner list 1006 """ 1007 # 'normal' owners 1008 owners = [ 1009 owner 1010 for owner, meta in self.owners.items() 1011 if (role is None or meta["role"] == role) and not owner.startswith("tag:") 1012 ] 1013 1014 # owners that are owner by being part of a tag 1015 owners.extend( 1016 itertools.chain( 1017 *[ 1018 tagged_owners 1019 for tag, tagged_owners in self.tagged_owners.items() 1020 if role is None or self.owners[f"tag:{tag}"]["role"] == role 1021 ] 1022 ) 1023 ) 1024 1025 # de-duplicate before returning 1026 return set(owners)
Get list of dataset owners
This returns a list of users that are considered owners. Tags are transparently replaced with the users with that tag.
Parameters
- str|None role: Role to check for. If
None
, all owners are returned regardless of role.
Returns
De-duplicated owner list
1028 def get_owners(self, role="owner"): 1029 """ 1030 Get list of dataset owners 1031 1032 This returns a list of all owners, and does not transparently resolve 1033 tags (like `get_owners_users` does). 1034 1035 :param str|None role: Role to check for. If `None`, all owners are 1036 returned regardless of role. 1037 1038 :return set: De-duplicated owner list 1039 """ 1040 return [ 1041 owner 1042 for owner, meta in self.owners.items() 1043 if (role is None or meta["role"] == role) 1044 ]
Get list of dataset owners
This returns a list of all owners, and does not transparently resolve
tags (like get_owners_users
does).
Parameters
- str|None role: Role to check for. If
None
, all owners are returned regardless of role.
Returns
De-duplicated owner list
1046 def add_owner(self, username, role="owner"): 1047 """ 1048 Set dataset owner 1049 1050 If the user is already an owner, but with a different role, the role is 1051 updated. If the user is already an owner with the same role, nothing happens. 1052 1053 :param str|User username: Username to set as owner 1054 :param str|None role: Role to add user with. 1055 """ 1056 if type(username) is not str: 1057 if hasattr(username, "get_id"): 1058 username = username.get_id() 1059 else: 1060 raise TypeError("User must be a str or User object") 1061 1062 if username not in self.owners: 1063 self.owners[username] = {"name": username, "key": self.key, "role": role} 1064 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 1065 1066 elif username in self.owners and self.owners[username]["role"] != role: 1067 self.db.update( 1068 "datasets_owners", 1069 data={"role": role}, 1070 where={"name": username, "key": self.key}, 1071 ) 1072 self.owners[username]["role"] = role 1073 1074 if username.startswith("tag:"): 1075 # this is a bit more complicated than just adding to the list of 1076 # owners, so do a full refresh 1077 self.refresh_owners() 1078 1079 # make sure children's owners remain in sync 1080 for child in self.get_children(update=True): 1081 child.add_owner(username, role) 1082 # not recursive, since we're calling it from recursive code! 1083 child.copy_ownership_from(self, recursive=False)
Set dataset owner
If the user is already an owner, but with a different role, the role is updated. If the user is already an owner with the same role, nothing happens.
Parameters
- str|User username: Username to set as owner
- str|None role: Role to add user with.
1085 def remove_owner(self, username): 1086 """ 1087 Remove dataset owner 1088 1089 If no owner is set, the dataset is assigned to the anonymous user. 1090 If the user is not an owner, nothing happens. 1091 1092 :param str|User username: Username to set as owner 1093 """ 1094 if type(username) is not str: 1095 if hasattr(username, "get_id"): 1096 username = username.get_id() 1097 else: 1098 raise TypeError("User must be a str or User object") 1099 1100 if username in self.owners: 1101 del self.owners[username] 1102 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 1103 1104 if not self.owners: 1105 self.add_owner("anonymous") 1106 1107 if username in self.tagged_owners: 1108 del self.tagged_owners[username] 1109 1110 # make sure children's owners remain in sync 1111 for child in self.get_children(update=True): 1112 child.remove_owner(username) 1113 # not recursive, since we're calling it from recursive code! 1114 child.copy_ownership_from(self, recursive=False)
Remove dataset owner
If no owner is set, the dataset is assigned to the anonymous user. If the user is not an owner, nothing happens.
Parameters
- str|User username: Username to set as owner
1116 def refresh_owners(self): 1117 """ 1118 Update internal owner cache 1119 1120 This makes sure that the list of *users* and *tags* which can access the 1121 dataset is up to date. 1122 """ 1123 self.owners = { 1124 owner["name"]: owner 1125 for owner in self.db.fetchall( 1126 "SELECT * FROM datasets_owners WHERE key = %s", (self.key,) 1127 ) 1128 } 1129 1130 # determine which users (if any) are owners of the dataset by having a 1131 # tag that is listed as an owner 1132 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 1133 if owner_tags: 1134 tagged_owners = self.db.fetchall( 1135 "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,) 1136 ) 1137 self.tagged_owners = { 1138 owner_tag: [ 1139 user["name"] for user in tagged_owners if owner_tag in user["tags"] 1140 ] 1141 for owner_tag in owner_tags 1142 } 1143 else: 1144 self.tagged_owners = {}
Update internal owner cache
This makes sure that the list of users and tags which can access the dataset is up to date.
1146 def copy_ownership_from(self, dataset, recursive=True): 1147 """ 1148 Copy ownership 1149 1150 This is useful to e.g. make sure a dataset's ownership stays in sync 1151 with its parent 1152 1153 :param Dataset dataset: Parent to copy from 1154 :return: 1155 """ 1156 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 1157 1158 for role in ("owner", "viewer"): 1159 owners = dataset.get_owners(role=role) 1160 for owner in owners: 1161 self.db.insert( 1162 "datasets_owners", 1163 data={"key": self.key, "name": owner, "role": role}, 1164 commit=False, 1165 safe=True, 1166 ) 1167 1168 self.db.commit() 1169 if recursive: 1170 for child in self.get_children(update=True): 1171 child.copy_ownership_from(self, recursive=recursive)
Copy ownership
This is useful to e.g. make sure a dataset's ownership stays in sync with its parent
Parameters
- Dataset dataset: Parent to copy from
Returns
1173 def get_parameters(self): 1174 """ 1175 Get dataset parameters 1176 1177 The dataset parameters are stored as JSON in the database - parse them 1178 and return the resulting object 1179 1180 :return: Dataset parameters as originally stored 1181 """ 1182 try: 1183 return json.loads(self.data["parameters"]) 1184 except json.JSONDecodeError: 1185 return {}
Get dataset parameters
The dataset parameters are stored as JSON in the database - parse them and return the resulting object
Returns
Dataset parameters as originally stored
1187 def get_columns(self): 1188 """ 1189 Returns the dataset columns. 1190 1191 Useful for processor input forms. Can deal with both CSV and NDJSON 1192 files, the latter only if a `map_item` function is available in the 1193 processor that generated it. While in other cases one could use the 1194 keys of the JSON object, this is not always possible in follow-up code 1195 that uses the 'column' names, so for consistency this function acts as 1196 if no column can be parsed if no `map_item` function exists. 1197 1198 :return list: List of dataset columns; empty list if unable to parse 1199 """ 1200 if not self.get_results_path().exists(): 1201 # no file to get columns from 1202 return [] 1203 1204 if (self.get_results_path().suffix.lower() == ".csv") or ( 1205 self.get_results_path().suffix.lower() == ".ndjson" 1206 and self.get_own_processor() is not None 1207 and self.get_own_processor().map_item_method_available(dataset=self) 1208 ): 1209 items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100) 1210 try: 1211 keys = list(next(items).keys()) 1212 if self.annotation_fields: 1213 for annotation_field in self.annotation_fields.values(): 1214 annotation_column = annotation_field["label"] 1215 label_count = 1 1216 while annotation_column in keys: 1217 label_count += 1 1218 annotation_column = ( 1219 f"{annotation_field['label']}_{label_count}" 1220 ) 1221 keys.append(annotation_column) 1222 columns = keys 1223 except (StopIteration, NotImplementedError): 1224 # No items or otherwise unable to iterate 1225 columns = [] 1226 finally: 1227 del items 1228 else: 1229 # Filetype not CSV or an NDJSON with `map_item` 1230 columns = [] 1231 1232 return columns
Returns the dataset columns.
Useful for processor input forms. Can deal with both CSV and NDJSON
files, the latter only if a map_item
function is available in the
processor that generated it. While in other cases one could use the
keys of the JSON object, this is not always possible in follow-up code
that uses the 'column' names, so for consistency this function acts as
if no column can be parsed if no map_item
function exists.
Returns
List of dataset columns; empty list if unable to parse
1234 def update_label(self, label): 1235 """ 1236 Update label for this dataset 1237 1238 :param str label: New label 1239 :return str: The new label, as returned by get_label 1240 """ 1241 self.parameters["label"] = label 1242 1243 self.db.update( 1244 "datasets", 1245 data={"parameters": json.dumps(self.parameters)}, 1246 where={"key": self.key}, 1247 ) 1248 return self.get_label()
Update label for this dataset
Parameters
- str label: New label
Returns
The new label, as returned by get_label
1250 def get_label(self, parameters=None, default="Query"): 1251 """ 1252 Generate a readable label for the dataset 1253 1254 :param dict parameters: Parameters of the dataset 1255 :param str default: Label to use if it cannot be inferred from the 1256 parameters 1257 1258 :return str: Label 1259 """ 1260 if not parameters: 1261 parameters = self.parameters 1262 1263 if parameters.get("label"): 1264 return parameters["label"] 1265 elif parameters.get("body_query") and parameters["body_query"] != "empty": 1266 return parameters["body_query"] 1267 elif parameters.get("body_match") and parameters["body_match"] != "empty": 1268 return parameters["body_match"] 1269 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 1270 return parameters["subject_query"] 1271 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 1272 return parameters["subject_match"] 1273 elif parameters.get("query"): 1274 label = parameters["query"] 1275 # Some legacy datasets have lists as query data 1276 if isinstance(label, list): 1277 label = ", ".join(label) 1278 1279 label = label if len(label) < 30 else label[:25] + "..." 1280 label = label.strip().replace("\n", ", ") 1281 return label 1282 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 1283 return "Flag: %s" % parameters["country_flag"] 1284 elif parameters.get("country_name") and parameters["country_name"] != "all": 1285 return "Country: %s" % parameters["country_name"] 1286 elif parameters.get("filename"): 1287 return parameters["filename"] 1288 elif parameters.get("board") and "datasource" in parameters: 1289 return parameters["datasource"] + "/" + parameters["board"] 1290 elif ( 1291 "datasource" in parameters 1292 and parameters["datasource"] in self.modules.datasources 1293 ): 1294 return ( 1295 self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 1296 ) 1297 else: 1298 return default
Generate a readable label for the dataset
Parameters
- dict parameters: Parameters of the dataset
- str default: Label to use if it cannot be inferred from the parameters
Returns
Label
1300 def change_datasource(self, datasource): 1301 """ 1302 Change the datasource type for this dataset 1303 1304 :param str label: New datasource type 1305 :return str: The new datasource type 1306 """ 1307 1308 self.parameters["datasource"] = datasource 1309 1310 self.db.update( 1311 "datasets", 1312 data={"parameters": json.dumps(self.parameters)}, 1313 where={"key": self.key}, 1314 ) 1315 return datasource
Change the datasource type for this dataset
Parameters
- str label: New datasource type
Returns
The new datasource type
1317 def reserve_result_file(self, parameters=None, extension="csv"): 1318 """ 1319 Generate a unique path to the results file for this dataset 1320 1321 This generates a file name for the data file of this dataset, and makes sure 1322 no file exists or will exist at that location other than the file we 1323 expect (i.e. the data for this particular dataset). 1324 1325 :param str extension: File extension, "csv" by default 1326 :param parameters: Dataset parameters 1327 :return bool: Whether the file path was successfully reserved 1328 """ 1329 if self.data["is_finished"]: 1330 raise RuntimeError("Cannot reserve results file for a finished dataset") 1331 1332 # Use 'random' for random post queries 1333 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 1334 file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"] 1335 # Use country code for country flag queries 1336 elif "country_flag" in parameters and parameters["country_flag"] != "all": 1337 file = ( 1338 "countryflag-" 1339 + str(parameters["country_flag"]) 1340 + "-" 1341 + self.data["key"] 1342 ) 1343 # Use the query string for all other queries 1344 else: 1345 query_bit = self.data["query"].replace(" ", "-").lower() 1346 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 1347 query_bit = query_bit[:100] # Crop to avoid OSError 1348 file = query_bit + "-" + self.data["key"] 1349 file = re.sub(r"[-]+", "-", file) 1350 1351 self.data["result_file"] = file + "." + extension.lower() 1352 index = 1 1353 while self.get_results_path().is_file(): 1354 self.data["result_file"] = file + "-" + str(index) + "." + extension.lower() 1355 index += 1 1356 1357 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 1358 data={"result_file": self.data["result_file"]}) 1359 return updated > 0
Generate a unique path to the results file for this dataset
This generates a file name for the data file of this dataset, and makes sure no file exists or will exist at that location other than the file we expect (i.e. the data for this particular dataset).
Parameters
- str extension: File extension, "csv" by default
- parameters: Dataset parameters
Returns
Whether the file path was successfully reserved
1361 def get_key(self, query, parameters, parent="", time_offset=0): 1362 """ 1363 Generate a unique key for this dataset that can be used to identify it 1364 1365 The key is a hash of a combination of the query string and parameters. 1366 You never need to call this, really: it's used internally. 1367 1368 :param str query: Query string 1369 :param parameters: Dataset parameters 1370 :param parent: Parent dataset's key (if applicable) 1371 :param time_offset: Offset to add to the time component of the dataset 1372 key. This can be used to ensure a unique key even if the parameters and 1373 timing is otherwise identical to an existing dataset's 1374 1375 :return str: Dataset key 1376 """ 1377 # Return a hash based on parameters 1378 # we're going to use the hash of the parameters to uniquely identify 1379 # the dataset, so make sure it's always in the same order, or we might 1380 # end up creating multiple keys for the same dataset if python 1381 # decides to return the dict in a different order 1382 param_key = collections.OrderedDict() 1383 for key in sorted(parameters): 1384 param_key[key] = parameters[key] 1385 1386 # we additionally use the current time as a salt - this should usually 1387 # ensure a unique key for the dataset. if for some reason there is a 1388 # hash collision 1389 param_key["_salt"] = int(time.time()) + time_offset 1390 1391 parent_key = str(parent) if parent else "" 1392 plain_key = repr(param_key) + str(query) + parent_key 1393 hashed_key = hash_to_md5(plain_key) 1394 1395 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1396 # key exists, generate a new one 1397 return self.get_key( 1398 query, parameters, parent, time_offset=random.randint(1, 10) 1399 ) 1400 else: 1401 return hashed_key
Generate a unique key for this dataset that can be used to identify it
The key is a hash of a combination of the query string and parameters. You never need to call this, really: it's used internally.
Parameters
- str query: Query string
- parameters: Dataset parameters
- parent: Parent dataset's key (if applicable)
- time_offset: Offset to add to the time component of the dataset key. This can be used to ensure a unique key even if the parameters and timing is otherwise identical to an existing dataset's
Returns
Dataset key
1403 def set_key(self, key): 1404 """ 1405 Change dataset key 1406 1407 In principe, keys should never be changed. But there are rare cases 1408 where it is useful to do so, in particular when importing a dataset 1409 from another 4CAT instance; in that case it makes sense to try and 1410 ensure that the key is the same as it was before. This function sets 1411 the dataset key and updates any dataset references to it. 1412 1413 :param str key: Key to set 1414 :return str: Key that was set. If the desired key already exists, the 1415 original key is kept. 1416 """ 1417 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1418 if key_exists or not key: 1419 return self.key 1420 1421 old_key = self.key 1422 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1423 1424 # update references 1425 self.db.update( 1426 "datasets", data={"key_parent": key}, where={"key_parent": old_key} 1427 ) 1428 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1429 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1430 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1431 1432 # for good measure 1433 self.db.commit() 1434 self.key = key 1435 1436 return self.key
Change dataset key
In principe, keys should never be changed. But there are rare cases where it is useful to do so, in particular when importing a dataset from another 4CAT instance; in that case it makes sense to try and ensure that the key is the same as it was before. This function sets the dataset key and updates any dataset references to it.
Parameters
- str key: Key to set
Returns
Key that was set. If the desired key already exists, the original key is kept.
1438 def get_status(self): 1439 """ 1440 Get Dataset status 1441 1442 :return string: Dataset status 1443 """ 1444 return self.data["status"]
Get Dataset status
Returns
Dataset status
1446 def update_status(self, status, is_final=False): 1447 """ 1448 Update dataset status 1449 1450 The status is a string that may be displayed to a user to keep them 1451 updated and informed about the progress of a dataset. No memory is kept 1452 of earlier dataset statuses; the current status is overwritten when 1453 updated. 1454 1455 Statuses are also written to the dataset log file. 1456 1457 :param string status: Dataset status 1458 :param bool is_final: If this is `True`, subsequent calls to this 1459 method while the object is instantiated will not update the dataset 1460 status. 1461 :return bool: Status update successful? 1462 """ 1463 if self.no_status_updates: 1464 return 1465 1466 # for presets, copy the updated status to the preset(s) this is part of 1467 if self.preset_parent is None: 1468 self.preset_parent = [ 1469 parent 1470 for parent in self.get_genealogy() 1471 if parent.type.find("preset-") == 0 and parent.key != self.key 1472 ][:1] 1473 1474 if self.preset_parent: 1475 for preset_parent in self.preset_parent: 1476 if not preset_parent.is_finished(): 1477 preset_parent.update_status(status) 1478 1479 self.data["status"] = status 1480 updated = self.db.update( 1481 "datasets", where={"key": self.data["key"]}, data={"status": status} 1482 ) 1483 1484 if is_final: 1485 self.no_status_updates = True 1486 1487 self.log(status) 1488 1489 return updated > 0
Update dataset status
The status is a string that may be displayed to a user to keep them updated and informed about the progress of a dataset. No memory is kept of earlier dataset statuses; the current status is overwritten when updated.
Statuses are also written to the dataset log file.
Parameters
- string status: Dataset status
- bool is_final: If this is
True
, subsequent calls to this method while the object is instantiated will not update the dataset status.
Returns
Status update successful?
1491 def update_progress(self, progress): 1492 """ 1493 Update dataset progress 1494 1495 The progress can be used to indicate to a user how close the dataset 1496 is to completion. 1497 1498 :param float progress: Between 0 and 1. 1499 :return: 1500 """ 1501 progress = min(1, max(0, progress)) # clamp 1502 if type(progress) is int: 1503 progress = float(progress) 1504 1505 self.data["progress"] = progress 1506 updated = self.db.update( 1507 "datasets", where={"key": self.data["key"]}, data={"progress": progress} 1508 ) 1509 return updated > 0
Update dataset progress
The progress can be used to indicate to a user how close the dataset is to completion.
Parameters
- float progress: Between 0 and 1.
Returns
1511 def get_progress(self): 1512 """ 1513 Get dataset progress 1514 1515 :return float: Progress, between 0 and 1 1516 """ 1517 return self.data["progress"]
Get dataset progress
Returns
Progress, between 0 and 1
1519 def finish_with_error(self, error): 1520 """ 1521 Set error as final status, and finish with 0 results 1522 1523 This is a convenience function to avoid having to repeat 1524 "update_status" and "finish" a lot. 1525 1526 :param str error: Error message for final dataset status. 1527 :return: 1528 """ 1529 self.update_status(error, is_final=True) 1530 self.finish(0) 1531 1532 return None
Set error as final status, and finish with 0 results
This is a convenience function to avoid having to repeat "update_status" and "finish" a lot.
Parameters
- str error: Error message for final dataset status.
Returns
1534 def update_version(self, version): 1535 """ 1536 Update software version used for this dataset 1537 1538 This can be used to verify the code that was used to process this dataset. 1539 1540 :param string version: Version identifier 1541 :return bool: Update successul? 1542 """ 1543 try: 1544 # this fails if the processor type is unknown 1545 # edge case, but let's not crash... 1546 processor_path = self.modules.processors.get(self.data["type"]).filepath 1547 except AttributeError: 1548 processor_path = "" 1549 1550 updated = self.db.update( 1551 "datasets", 1552 where={"key": self.data["key"]}, 1553 data={ 1554 "software_version": version[0], 1555 "software_source": version[1], 1556 "software_file": processor_path, 1557 }, 1558 ) 1559 1560 return updated > 0
Update software version used for this dataset
This can be used to verify the code that was used to process this dataset.
Parameters
- string version: Version identifier
Returns
Update successul?
1562 def delete_parameter(self, parameter, instant=True): 1563 """ 1564 Delete a parameter from the dataset metadata 1565 1566 :param string parameter: Parameter to delete 1567 :param bool instant: Also delete parameters in this instance object? 1568 :return bool: Update successul? 1569 """ 1570 parameters = self.parameters.copy() 1571 if parameter in parameters: 1572 del parameters[parameter] 1573 else: 1574 return False 1575 1576 updated = self.db.update( 1577 "datasets", 1578 where={"key": self.data["key"]}, 1579 data={"parameters": json.dumps(parameters)}, 1580 ) 1581 1582 if instant: 1583 self.parameters = parameters 1584 1585 return updated > 0
Delete a parameter from the dataset metadata
Parameters
- string parameter: Parameter to delete
- bool instant: Also delete parameters in this instance object?
Returns
Update successul?
1587 def get_version_url(self, file): 1588 """ 1589 Get a versioned github URL for the version this dataset was processed with 1590 1591 :param file: File to link within the repository 1592 :return: URL, or an empty string 1593 """ 1594 if not self.data["software_source"]: 1595 return "" 1596 1597 filepath = self.data.get("software_file", "") 1598 if filepath.startswith("/extensions/"): 1599 # go to root of extension 1600 filepath = "/" + "/".join(filepath.split("/")[3:]) 1601 1602 return ( 1603 self.data["software_source"] 1604 + "/blob/" 1605 + self.data["software_version"] 1606 + filepath 1607 )
Get a versioned github URL for the version this dataset was processed with
Parameters
- file: File to link within the repository
Returns
URL, or an empty string
1609 def top_parent(self): 1610 """ 1611 Get root dataset 1612 1613 Traverses the tree of datasets this one is part of until it finds one 1614 with no source_dataset dataset, then returns that dataset. 1615 1616 :return Dataset: Parent dataset 1617 """ 1618 genealogy = self.get_genealogy() 1619 return genealogy[0]
Get root dataset
Traverses the tree of datasets this one is part of until it finds one with no source_dataset dataset, then returns that dataset.
Returns
Parent dataset
1621 def get_genealogy(self): 1622 """ 1623 Get genealogy of this dataset 1624 1625 Creates a list of DataSet objects, with the first one being the 1626 'top' dataset, and each subsequent one being a child of the previous 1627 one, ending with the current dataset. 1628 1629 :return list: Dataset genealogy, oldest dataset first 1630 """ 1631 if not self._genealogy: 1632 key_parent = self.key_parent 1633 genealogy = [] 1634 1635 while key_parent: 1636 try: 1637 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1638 except DataSetException: 1639 break 1640 1641 genealogy.append(parent) 1642 if parent.key_parent: 1643 key_parent = parent.key_parent 1644 else: 1645 break 1646 1647 genealogy.reverse() 1648 self._genealogy = genealogy 1649 1650 genealogy = self._genealogy 1651 genealogy.append(self) 1652 1653 return genealogy
Get genealogy of this dataset
Creates a list of DataSet objects, with the first one being the 'top' dataset, and each subsequent one being a child of the previous one, ending with the current dataset.
Returns
Dataset genealogy, oldest dataset first
1655 def get_children(self, update=False): 1656 """ 1657 Get children of this dataset 1658 1659 :param bool update: Update the list of children from database if True, else return cached value 1660 :return list: List of child datasets 1661 """ 1662 if self._children is not None and not update: 1663 return self._children 1664 1665 analyses = self.db.fetchall( 1666 "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", 1667 (self.key,), 1668 ) 1669 self._children = [ 1670 DataSet(data=analysis, db=self.db, modules=self.modules) 1671 for analysis in analyses 1672 ] 1673 return self._children
Get children of this dataset
Parameters
- bool update: Update the list of children from database if True, else return cached value
Returns
List of child datasets
1675 def get_all_children(self, recursive=True, update=True): 1676 """ 1677 Get all children of this dataset 1678 1679 Results are returned as a non-hierarchical list, i.e. the result does 1680 not reflect the actual dataset hierarchy (but all datasets in the 1681 result will have the original dataset as an ancestor somewhere) 1682 1683 :return list: List of DataSets 1684 """ 1685 children = self.get_children(update=update) 1686 results = children.copy() 1687 if recursive: 1688 for child in children: 1689 results += child.get_all_children(recursive=recursive, update=update) 1690 1691 return results
Get all children of this dataset
Results are returned as a non-hierarchical list, i.e. the result does not reflect the actual dataset hierarchy (but all datasets in the result will have the original dataset as an ancestor somewhere)
Returns
List of DataSets
1693 def nearest(self, type_filter): 1694 """ 1695 Return nearest dataset that matches the given type 1696 1697 Starting with this dataset, traverse the hierarchy upwards and return 1698 whichever dataset matches the given type. 1699 1700 :param str type_filter: Type filter. Can contain wildcards and is matched 1701 using `fnmatch.fnmatch`. 1702 :return: Earliest matching dataset, or `None` if none match. 1703 """ 1704 genealogy = self.get_genealogy() 1705 for dataset in reversed(genealogy): 1706 if fnmatch.fnmatch(dataset.type, type_filter): 1707 return dataset 1708 1709 return None
Return nearest dataset that matches the given type
Starting with this dataset, traverse the hierarchy upwards and return whichever dataset matches the given type.
Parameters
- str type_filter: Type filter. Can contain wildcards and is matched
using
fnmatch.fnmatch
.
Returns
Earliest matching dataset, or
None
if none match.
1726 def get_compatible_processors(self, config=None): 1727 """ 1728 Get list of processors compatible with this dataset 1729 1730 Checks whether this dataset type is one that is listed as being accepted 1731 by the processor, for each known type: if the processor does not 1732 specify accepted types (via the `is_compatible_with` method), it is 1733 assumed it accepts any top-level datasets 1734 1735 :param ConfigManager|None config: Configuration reader to determine 1736 compatibility through. This may not be the same reader the dataset was 1737 instantiated with, e.g. when checking whether some other user should 1738 be able to run processors on this dataset. 1739 :return dict: Compatible processors, `name => class` mapping 1740 """ 1741 processors = self.modules.processors 1742 1743 available = {} 1744 for processor_type, processor in processors.items(): 1745 if processor.is_from_collector(): 1746 continue 1747 1748 own_processor = self.get_own_processor() 1749 if own_processor and own_processor.exclude_followup_processors( 1750 processor_type 1751 ): 1752 continue 1753 1754 # consider a processor compatible if its is_compatible_with 1755 # method returns True *or* if it has no explicit compatibility 1756 # check and this dataset is top-level (i.e. has no parent) 1757 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1758 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)): 1759 available[processor_type] = processor 1760 1761 return available
Get list of processors compatible with this dataset
Checks whether this dataset type is one that is listed as being accepted
by the processor, for each known type: if the processor does not
specify accepted types (via the is_compatible_with
method), it is
assumed it accepts any top-level datasets
Parameters
- ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
Returns
Compatible processors,
name => class
mapping
1763 def get_place_in_queue(self, update=False): 1764 """ 1765 Determine dataset's position in queue 1766 1767 If the dataset is already finished, the position is -1. Else, the 1768 position is the number of datasets to be completed before this one will 1769 be processed. A position of 0 would mean that the dataset is currently 1770 being executed, or that the backend is not running. 1771 1772 :param bool update: Update the queue position from database if True, else return cached value 1773 :return int: Queue position 1774 """ 1775 if self.is_finished() or not self.data.get("job"): 1776 self._queue_position = -1 1777 return self._queue_position 1778 elif not update and self._queue_position is not None: 1779 # Use cached value 1780 return self._queue_position 1781 else: 1782 # Collect queue position from database via the job 1783 try: 1784 job = Job.get_by_ID(self.data["job"], self.db) 1785 self._queue_position = job.get_place_in_queue() 1786 except JobNotFoundException: 1787 self._queue_position = -1 1788 1789 return self._queue_position
Determine dataset's position in queue
If the dataset is already finished, the position is -1. Else, the position is the number of datasets to be completed before this one will be processed. A position of 0 would mean that the dataset is currently being executed, or that the backend is not running.
Parameters
- bool update: Update the queue position from database if True, else return cached value
Returns
Queue position
1791 def get_own_processor(self): 1792 """ 1793 Get the processor class that produced this dataset 1794 1795 :return: Processor class, or `None` if not available. 1796 """ 1797 processor_type = self.parameters.get("type", self.data.get("type")) 1798 1799 return self.modules.processors.get(processor_type)
Get the processor class that produced this dataset
Returns
Processor class, or
None
if not available.
1801 def get_available_processors(self, config=None, exclude_hidden=False): 1802 """ 1803 Get list of processors that may be run for this dataset 1804 1805 Returns all compatible processors except for those that are already 1806 queued or finished and have no options. Processors that have been 1807 run but have options are included so they may be run again with a 1808 different configuration 1809 1810 :param ConfigManager|None config: Configuration reader to determine 1811 compatibility through. This may not be the same reader the dataset was 1812 instantiated with, e.g. when checking whether some other user should 1813 be able to run processors on this dataset. 1814 :param bool exclude_hidden: Exclude processors that should be displayed 1815 in the UI? If `False`, all processors are returned. 1816 1817 :return dict: Available processors, `name => properties` mapping 1818 """ 1819 if self.available_processors: 1820 # Update to reflect exclude_hidden parameter which may be different from last call 1821 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1822 return { 1823 processor_type: processor 1824 for processor_type, processor in self.available_processors.items() 1825 if not exclude_hidden or not processor.is_hidden 1826 } 1827 1828 processors = self.get_compatible_processors(config=config) 1829 1830 for analysis in self.get_children(update=True): 1831 if analysis.type not in processors: 1832 continue 1833 1834 if not processors[analysis.type].get_options(config=config): 1835 # No variable options; this processor has been run so remove 1836 del processors[analysis.type] 1837 continue 1838 1839 if exclude_hidden and processors[analysis.type].is_hidden: 1840 del processors[analysis.type] 1841 1842 self.available_processors = processors 1843 return processors
Get list of processors that may be run for this dataset
Returns all compatible processors except for those that are already queued or finished and have no options. Processors that have been run but have options are included so they may be run again with a different configuration
Parameters
- ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
- bool exclude_hidden: Exclude processors that should be displayed
in the UI? If
False
, all processors are returned.
Returns
Available processors,
name => properties
mapping
1845 def link_job(self, job): 1846 """ 1847 Link this dataset to a job ID 1848 1849 Updates the dataset data to include a reference to the job that will be 1850 executing (or has already executed) this job. 1851 1852 Note that if no job can be found for this dataset, this method silently 1853 fails. 1854 1855 :param Job job: The job that will run this dataset 1856 1857 :todo: If the job column ever gets used, make sure it always contains 1858 a valid value, rather than silently failing this method. 1859 """ 1860 if type(job) is not Job: 1861 raise TypeError("link_job requires a Job object as its argument") 1862 1863 if "id" not in job.data: 1864 try: 1865 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1866 except JobNotFoundException: 1867 return 1868 1869 self.db.update( 1870 "datasets", where={"key": self.key}, data={"job": job.data["id"]} 1871 )
Link this dataset to a job ID
Updates the dataset data to include a reference to the job that will be executing (or has already executed) this job.
Note that if no job can be found for this dataset, this method silently fails.
Parameters
- Job job: The job that will run this dataset
:todo: If the job column ever gets used, make sure it always contains a valid value, rather than silently failing this method.
1873 def link_parent(self, key_parent): 1874 """ 1875 Set source_dataset key for this dataset 1876 1877 :param key_parent: Parent key. Not checked for validity 1878 """ 1879 self.db.update( 1880 "datasets", where={"key": self.key}, data={"key_parent": key_parent} 1881 )
Set source_dataset key for this dataset
Parameters
- key_parent: Parent key. Not checked for validity
1883 def get_parent(self): 1884 """ 1885 Get parent dataset 1886 1887 :return DataSet: Parent dataset, or `None` if not applicable 1888 """ 1889 return ( 1890 DataSet(key=self.key_parent, db=self.db, modules=self.modules) 1891 if self.key_parent 1892 else None 1893 )
Get parent dataset
Returns
Parent dataset, or
None
if not applicable
1895 def detach(self): 1896 """ 1897 Makes the datasets standalone, i.e. not having any source_dataset dataset 1898 """ 1899 self.link_parent("")
Makes the datasets standalone, i.e. not having any source_dataset dataset
1901 def is_dataset(self): 1902 """ 1903 Easy way to confirm this is a dataset. 1904 Used for checking processor and dataset compatibility, 1905 which needs to handle both processors and datasets. 1906 """ 1907 return True
Easy way to confirm this is a dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.
1909 def is_top_dataset(self): 1910 """ 1911 Easy way to confirm this is a top dataset. 1912 Used for checking processor and dataset compatibility, 1913 which needs to handle both processors and datasets. 1914 """ 1915 if self.key_parent: 1916 return False 1917 return True
Easy way to confirm this is a top dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.
1919 def is_expiring(self, config): 1920 """ 1921 Determine if dataset is set to expire 1922 1923 Similar to `is_expired`, but checks if the dataset will be deleted in 1924 the future, not if it should be deleted right now. 1925 1926 :param ConfigManager config: Configuration reader (context-aware) 1927 :return bool|int: `False`, or the expiration date as a Unix timestamp. 1928 """ 1929 # has someone opted out of deleting this? 1930 if self.parameters.get("keep"): 1931 return False 1932 1933 # is this dataset explicitly marked as expiring after a certain time? 1934 if self.parameters.get("expires-after"): 1935 return self.parameters.get("expires-after") 1936 1937 # is the data source configured to have its datasets expire? 1938 expiration = config.get("datasources.expiration", {}) 1939 if not expiration.get(self.parameters.get("datasource")): 1940 return False 1941 1942 # is there a timeout for this data source? 1943 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1944 return self.timestamp + expiration.get( 1945 self.parameters.get("datasource") 1946 ).get("timeout") 1947 1948 return False
Determine if dataset is set to expire
Similar to is_expired
, but checks if the dataset will be deleted in
the future, not if it should be deleted right now.
Parameters
- ConfigManager config: Configuration reader (context-aware)
Returns
False
, or the expiration date as a Unix timestamp.
1950 def is_expired(self, config): 1951 """ 1952 Determine if dataset should be deleted 1953 1954 Datasets can be set to expire, but when they should be deleted depends 1955 on a number of factor. This checks them all. 1956 1957 :param ConfigManager config: Configuration reader (context-aware) 1958 :return bool: 1959 """ 1960 # has someone opted out of deleting this? 1961 if not self.is_expiring(config): 1962 return False 1963 1964 # is this dataset explicitly marked as expiring after a certain time? 1965 future = ( 1966 time.time() + 3600 1967 ) # ensure we don't delete datasets with invalid expiration times 1968 if ( 1969 self.parameters.get("expires-after") 1970 and convert_to_int(self.parameters["expires-after"], future) < time.time() 1971 ): 1972 return True 1973 1974 # is the data source configured to have its datasets expire? 1975 expiration = config.get("datasources.expiration", {}) 1976 if not expiration.get(self.parameters.get("datasource")): 1977 return False 1978 1979 # is the dataset older than the set timeout? 1980 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1981 return ( 1982 self.timestamp 1983 + expiration[self.parameters.get("datasource")]["timeout"] 1984 < time.time() 1985 ) 1986 1987 return False
Determine if dataset should be deleted
Datasets can be set to expire, but when they should be deleted depends on a number of factor. This checks them all.
Parameters
- ConfigManager config: Configuration reader (context-aware)
Returns
1989 def is_from_collector(self): 1990 """ 1991 Check if this dataset was made by a processor that collects data, i.e. 1992 a search or import worker. 1993 1994 :return bool: 1995 """ 1996 return self.type.endswith("-search") or self.type.endswith("-import")
Check if this dataset was made by a processor that collects data, i.e. a search or import worker.
Returns
1998 def get_extension(self): 1999 """ 2000 Gets the file extention this dataset produces. 2001 Also checks whether the results file exists. 2002 Used for checking processor and dataset compatibility. 2003 2004 :return str extension: Extension, e.g. `csv` 2005 """ 2006 if self.get_results_path().exists(): 2007 return self.get_results_path().suffix[1:] 2008 2009 return False
Gets the file extention this dataset produces. Also checks whether the results file exists. Used for checking processor and dataset compatibility.
Returns
Extension, e.g.
csv
2011 def get_media_type(self): 2012 """ 2013 Gets the media type of the dataset file. 2014 2015 :return str: media type, e.g., "text" 2016 """ 2017 own_processor = self.get_own_processor() 2018 if hasattr(self, "media_type"): 2019 # media type can be defined explicitly in the dataset; this is the priority 2020 return self.media_type 2021 elif own_processor is not None: 2022 # or media type can be defined in the processor 2023 # some processors can set different media types for different datasets (e.g., import_media) 2024 if hasattr(own_processor, "media_type"): 2025 return own_processor.media_type 2026 2027 # Default to text 2028 return self.parameters.get("media_type", "text")
Gets the media type of the dataset file.
Returns
media type, e.g., "text"
2030 def get_metadata(self): 2031 """ 2032 Get dataset metadata 2033 2034 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 2035 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 2036 update its database (and ensure compatibility with the exporting version of 4CAT). 2037 """ 2038 metadata = self.db.fetchone( 2039 "SELECT * FROM datasets WHERE key = %s", (self.key,) 2040 ) 2041 2042 # get 4CAT version (presumably to ensure export is compatible with import) 2043 metadata["current_4CAT_version"] = get_software_version() 2044 return metadata
Get dataset metadata
This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to update its database (and ensure compatibility with the exporting version of 4CAT).
2046 def get_result_url(self): 2047 """ 2048 Gets the 4CAT frontend URL of a dataset file. 2049 2050 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 2051 SERVER_HTTPS) plus hardcoded '/result/'. 2052 TODO: create more dynamic method of obtaining url. 2053 """ 2054 filename = self.get_results_path().name 2055 2056 # we cheat a little here by using the modules' config reader, but these 2057 # will never be context-dependent values anyway 2058 url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \ 2059 self.modules.config.get("flask.server_name") + '/result/' + filename 2060 return url_to_file
Gets the 4CAT frontend URL of a dataset file.
Uses the FlaskConfig attributes (i.e., SERVER_NAME and SERVER_HTTPS) plus hardcoded '/result/'. TODO: create more dynamic method of obtaining url.
2062 def warn_unmappable_item( 2063 self, item_count, processor=None, error_message=None, warn_admins=True 2064 ): 2065 """ 2066 Log an item that is unable to be mapped and warn administrators. 2067 2068 :param int item_count: Item index 2069 :param Processor processor: Processor calling function8 2070 """ 2071 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 2072 2073 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 2074 closest_dataset = ( 2075 processor.dataset 2076 if processor is not None and processor.dataset is not None 2077 else self 2078 ) 2079 # Log error to dataset log 2080 closest_dataset.log(dataset_error_message) 2081 2082 if warn_admins: 2083 if processor is not None: 2084 processor.log.warning( 2085 f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}." 2086 ) 2087 elif hasattr(self.db, "log"): 2088 # borrow the database's log handler 2089 self.db.log.warning( 2090 f"Unable to map item all items for dataset {closest_dataset.key}." 2091 ) 2092 else: 2093 # No other log available 2094 raise DataSetException( 2095 f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn" 2096 )
Log an item that is unable to be mapped and warn administrators.
Parameters
- int item_count: Item index
- Processor processor: Processor calling function8
2099 def has_annotations(self) -> bool: 2100 """ 2101 Whether this dataset has annotations 2102 """ 2103 2104 annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,)) 2105 2106 return True if annotation else False
Whether this dataset has annotations
2108 def num_annotations(self) -> int: 2109 """ 2110 Get the amount of annotations 2111 """ 2112 return self.db.fetchone( 2113 "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,) 2114 )["count"]
Get the amount of annotations
2116 def get_annotation(self, data: dict): 2117 """ 2118 Retrieves a specific annotation if it exists. 2119 2120 :param data: A dictionary with which to get the annotations from. 2121 To get specific annotations, include either an `id` field or 2122 `field_id` and `item_id` fields. 2123 2124 return Annotation: Annotation object. 2125 """ 2126 2127 if "id" not in data or ("field_id" not in data and "item_id" not in data): 2128 return None 2129 2130 if "dataset" not in data: 2131 data["dataset"] = self.key 2132 2133 return Annotation(data=data, db=self.db)
Retrieves a specific annotation if it exists.
Parameters
- data: A dictionary with which to get the annotations from.
To get specific annotations, include either an
id
field orfield_id
anditem_id
fields.
return Annotation: Annotation object.
2135 def get_annotations(self) -> list: 2136 """ 2137 Retrieves all annotations for this dataset. 2138 2139 return list: List of Annotation objects. 2140 """ 2141 2142 return Annotation.get_annotations_for_dataset(self.db, self.key)
Retrieves all annotations for this dataset.
return list: List of Annotation objects.
2144 def get_annotations_for_item(self, item_id: str) -> list: 2145 """ 2146 Retrieves all annotations from this dataset for a specific item (e.g. social media post). 2147 """ 2148 return Annotation.get_annotations_for_dataset( 2149 self.db, self.key, item_id=item_id 2150 )
Retrieves all annotations from this dataset for a specific item (e.g. social media post).
2152 def has_annotation_fields(self) -> bool: 2153 """ 2154 Returns True if there's annotation fields saved tot the dataset table 2155 Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.). 2156 """ 2157 2158 return True if self.annotation_fields else False
Returns True if there's annotation fields saved tot the dataset table
Annotation fields are metadata that describe a type of annotation (with info on id
, type
, etc.).
2160 def get_annotation_field_labels(self) -> list: 2161 """ 2162 Retrieves the saved annotation field labels for this dataset. 2163 These are stored in the annotations table. 2164 2165 :return list: List of annotation field labels. 2166 """ 2167 2168 annotation_fields = self.annotation_fields 2169 2170 if not annotation_fields: 2171 return [] 2172 2173 labels = [v["label"] for v in annotation_fields.values()] 2174 2175 return labels
Retrieves the saved annotation field labels for this dataset. These are stored in the annotations table.
Returns
List of annotation field labels.
2177 def save_annotations(self, annotations: list) -> int: 2178 """ 2179 Takes a list of annotations and saves them to the annotations table. 2180 If a field is not yet present in the `annotation_fields` column in 2181 the datasets table, it also adds it there. 2182 2183 :param list annotations: List of dictionaries with annotation items. Must have `item_id`, `field_id`, 2184 and `label`. 2185 `item_id` is for the specific item being annotated (e.g. a social media post) 2186 `field_id` refers to the annotation field. 2187 `label` is a human-readable description of this annotation. 2188 E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", 2189 "value": "Yes"}] 2190 2191 :returns int: How many annotations were saved. 2192 2193 """ 2194 2195 if not annotations: 2196 return 0 2197 2198 count = 0 2199 annotation_fields = self.annotation_fields 2200 2201 # Add some dataset data to annotations, if not present 2202 for annotation_data in annotations: 2203 # Check if the required fields are present 2204 if "item_id" not in annotation_data: 2205 raise AnnotationException( 2206 "Can't save annotations; annotation must have an `item_id` referencing " 2207 "the item it annotated, got %s" % annotation_data 2208 ) 2209 if "field_id" not in annotation_data: 2210 raise AnnotationException( 2211 "Can't save annotations; annotation must have a `field_id` field, " 2212 "got %s" % annotation_data 2213 ) 2214 if "label" not in annotation_data or not isinstance( 2215 annotation_data["label"], str 2216 ): 2217 raise AnnotationException( 2218 "Can't save annotations; annotation must have a `label` field, " 2219 "got %s" % annotation_data 2220 ) 2221 2222 # Set dataset key 2223 if not annotation_data.get("dataset"): 2224 annotation_data["dataset"] = self.key 2225 2226 # Set default author to this dataset owner 2227 # If this annotation is made by a processor, it will have the processor name 2228 if not annotation_data.get("author"): 2229 annotation_data["author"] = self.get_owners()[0] 2230 2231 # Create Annotation object, which also saves it to the database 2232 # If this dataset/item ID/label combination already exists, this retrieves the 2233 # existing data and updates it with new values. 2234 Annotation(data=annotation_data, db=self.db) 2235 count += 1 2236 2237 # Save annotation fields if things changed 2238 if annotation_fields != self.annotation_fields: 2239 self.save_annotation_fields(annotation_fields) 2240 2241 return count
Takes a list of annotations and saves them to the annotations table.
If a field is not yet present in the annotation_fields
column in
the datasets table, it also adds it there.
Parameters
- list annotations: List of dictionaries with annotation items. Must have
item_id
,field_id
, andlabel
.item_id
is for the specific item being annotated (e.g. a social media post)field_id
refers to the annotation field.label
is a human-readable description of this annotation. E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", "value": "Yes"}]
:returns int: How many annotations were saved.
2243 def save_annotation_fields(self, new_fields: dict, add=False) -> int: 2244 """ 2245 Save annotation field data to the datasets table (in the `annotation_fields` column). 2246 If changes to the annotation fields affect existing annotations, 2247 this function will also call `update_annotations_via_fields()` to change them. 2248 2249 :param dict new_fields: New annotation fields, with a field ID as key. 2250 2251 :param bool add: Whether we're merely adding new fields 2252 or replacing the whole batch. If add is False, 2253 `new_fields` should contain all fields. 2254 2255 :return int: The number of annotation fields saved. 2256 2257 """ 2258 2259 # Get existing annotation fields to see if stuff changed. 2260 old_fields = self.annotation_fields 2261 changes = False 2262 2263 # Do some validation 2264 # Annotation field must be valid JSON. 2265 try: 2266 json.dumps(new_fields) 2267 except ValueError: 2268 raise AnnotationException( 2269 "Can't save annotation fields: not valid JSON (%s)" % new_fields 2270 ) 2271 2272 # No duplicate IDs 2273 if len(new_fields) != len(set(new_fields)): 2274 raise AnnotationException( 2275 "Can't save annotation fields: field IDs must be unique" 2276 ) 2277 2278 # Annotation fields must at minimum have `type` and `label` keys. 2279 for field_id, annotation_field in new_fields.items(): 2280 if not isinstance(field_id, str): 2281 raise AnnotationException( 2282 "Can't save annotation fields: field ID %s is not a valid string" 2283 % field_id 2284 ) 2285 if "label" not in annotation_field: 2286 raise AnnotationException( 2287 "Can't save annotation fields: all fields must have a label" 2288 % field_id 2289 ) 2290 if "type" not in annotation_field: 2291 raise AnnotationException( 2292 "Can't save annotation fields: all fields must have a type" 2293 % field_id 2294 ) 2295 2296 # Check if fields are removed 2297 if not add: 2298 for field_id in old_fields.keys(): 2299 if field_id not in new_fields: 2300 changes = True 2301 2302 # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin 2303 # dataset 2304 for field_id in new_fields.keys(): 2305 if field_id in old_fields and old_fields[field_id].get("from_dataset"): 2306 old_fields[field_id]["label"] = new_fields[field_id][ 2307 "label" 2308 ] # Only labels could've been changed 2309 new_fields[field_id] = old_fields[field_id] 2310 2311 # If we're just adding fields, add them to the old fields. 2312 # If the field already exists, overwrite the old field. 2313 if add and old_fields: 2314 all_fields = old_fields 2315 for field_id, annotation_field in new_fields.items(): 2316 all_fields[field_id] = annotation_field 2317 new_fields = all_fields 2318 2319 # We're saving the new annotation fields as-is. 2320 # Ordering of fields is preserved this way. 2321 self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)}) 2322 self.annotation_fields = new_fields 2323 2324 # If anything changed with the annotation fields, possibly update 2325 # existing annotations (e.g. to delete them or change their labels). 2326 if changes: 2327 Annotation.update_annotations_via_fields( 2328 self.key, old_fields, new_fields, self.db 2329 ) 2330 2331 return len(new_fields)
Save annotation field data to the datasets table (in the annotation_fields
column).
If changes to the annotation fields affect existing annotations,
this function will also call update_annotations_via_fields()
to change them.
Parameters
dict new_fields: New annotation fields, with a field ID as key.
bool add: Whether we're merely adding new fields or replacing the whole batch. If add is False,
new_fields
should contain all fields.
Returns
The number of annotation fields saved.
2333 def get_annotation_metadata(self) -> dict: 2334 """ 2335 Retrieves all the data for this dataset from the annotations table. 2336 """ 2337 2338 annotation_data = self.db.fetchall( 2339 "SELECT * FROM annotations WHERE dataset = '%s';" % self.key 2340 ) 2341 return annotation_data
Retrieves all the data for this dataset from the annotations table.