common.lib.dataset
1import collections 2import itertools 3import datetime 4import hashlib 5import fnmatch 6import random 7import shutil 8import json 9import time 10import csv 11import re 12 13from pathlib import Path 14 15from common.config_manager import config 16from common.lib.job import Job, JobNotFoundException 17from common.lib.module_loader import ModuleCollector 18from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, call_api 19from common.lib.item_mapping import MappedItem, MissingMappedField, DatasetItem 20from common.lib.fourcat_module import FourcatModule 21from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException, 22 MapItemException, MappedItemIncompleteException) 23 24 25class DataSet(FourcatModule): 26 """ 27 Provide interface to safely register and run operations on a dataset 28 29 A dataset is a collection of: 30 - A unique identifier 31 - A set of parameters that demarcate the data contained within 32 - The data 33 34 The data is usually stored in a file on the disk; the parameters are stored 35 in a database. The handling of the data, et cetera, is done by other 36 workers; this class defines method to create and manipulate the dataset's 37 properties. 38 """ 39 # Attributes must be created here to ensure getattr and setattr work properly 40 data = None 41 key = "" 42 43 children = None 44 available_processors = None 45 genealogy = None 46 preset_parent = None 47 parameters = None 48 modules = None 49 50 owners = None 51 tagged_owners = None 52 53 db = None 54 folder = None 55 is_new = True 56 57 no_status_updates = False 58 staging_areas = None 59 _queue_position = None 60 61 def __init__(self, parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None, 62 type=None, is_private=True, owner="anonymous", modules=None): 63 """ 64 Create new dataset object 65 66 If the dataset is not in the database yet, it is added. 67 68 :param dict parameters: Only when creating a new dataset. Dataset 69 parameters, free-form dictionary. 70 :param str key: Dataset key. If given, dataset with this key is loaded. 71 :param int job: Job ID. If given, dataset corresponding to job is 72 loaded. 73 :param dict data: Dataset data, corresponding to a row in the datasets 74 database table. If not given, retrieved from database depending on key. 75 :param db: Database connection 76 :param str parent: Only when creating a new dataset. Parent dataset 77 key to which the one being created is a child. 78 :param str extension: Only when creating a new dataset. Extension of 79 dataset result file. 80 :param str type: Only when creating a new dataset. Type of the dataset, 81 corresponding to the type property of a processor class. 82 :param bool is_private: Only when creating a new dataset. Whether the 83 dataset is private or public. 84 :param str owner: Only when creating a new dataset. The user name of 85 the dataset's creator. 86 :param modules: Module cache. If not given, will be loaded when needed 87 (expensive). Used to figure out what processors are compatible with 88 this dataset. 89 """ 90 self.db = db 91 self.folder = config.get('PATH_ROOT').joinpath(config.get('PATH_DATA')) 92 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 93 self.data = {} 94 self.parameters = {} 95 self.children = [] 96 self.available_processors = {} 97 self.genealogy = [] 98 self.staging_areas = [] 99 self.modules = modules 100 101 if key is not None: 102 self.key = key 103 current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,)) 104 if not current: 105 raise DataSetNotFoundException("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key) 106 107 elif job is not None: 108 current = self.db.fetchone("SELECT * FROM datasets WHERE parameters::json->>'job' = %s", (job,)) 109 if not current: 110 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 111 112 self.key = current["key"] 113 elif data is not None: 114 current = data 115 if "query" not in data or "key" not in data or "parameters" not in data or "key_parent" not in data: 116 raise DataSetException("DataSet() requires a complete dataset record for its 'data' argument") 117 118 self.key = current["key"] 119 else: 120 if parameters is None: 121 raise DataSetException("DataSet() requires either 'key', or 'parameters' to be given") 122 123 if not type: 124 raise DataSetException("Datasets must have their type set explicitly") 125 126 query = self.get_label(parameters, default=type) 127 self.key = self.get_key(query, parameters, parent) 128 current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s AND query = %s", (self.key, query)) 129 130 if current: 131 self.data = current 132 self.parameters = json.loads(self.data["parameters"]) 133 self.is_new = False 134 else: 135 self.data = {"type": type} # get_own_processor needs this 136 own_processor = self.get_own_processor() 137 version = get_software_commit(own_processor) 138 self.data = { 139 "key": self.key, 140 "query": self.get_label(parameters, default=type), 141 "parameters": json.dumps(parameters), 142 "result_file": "", 143 "creator": owner, 144 "status": "", 145 "type": type, 146 "timestamp": int(time.time()), 147 "is_finished": False, 148 "is_private": is_private, 149 "software_version": version[0], 150 "software_source": version[1], 151 "software_file": "", 152 "num_rows": 0, 153 "progress": 0.0, 154 "key_parent": parent 155 } 156 self.parameters = parameters 157 158 self.db.insert("datasets", data=self.data) 159 self.refresh_owners() 160 self.add_owner(owner) 161 162 # Find desired extension from processor if not explicitly set 163 if extension is None: 164 if own_processor: 165 extension = own_processor.get_extension(parent_dataset=DataSet(key=parent, db=db, modules=self.modules) if parent else None) 166 # Still no extension, default to 'csv' 167 if not extension: 168 extension = "csv" 169 170 # Reserve filename and update data['result_file'] 171 self.reserve_result_file(parameters, extension) 172 173 # retrieve analyses and processors that may be run for this dataset 174 analyses = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", (self.key,)) 175 self.children = sorted([DataSet(data=analysis, db=self.db, modules=self.modules) for analysis in analyses], 176 key=lambda dataset: dataset.is_finished(), reverse=True) 177 178 self.refresh_owners() 179 180 def check_dataset_finished(self): 181 """ 182 Checks if dataset is finished. Returns path to results file is not empty, 183 or 'empty_file' when there were not matches. 184 185 Only returns a path if the dataset is complete. In other words, if this 186 method returns a path, a file with the complete results for this dataset 187 will exist at that location. 188 189 :return: A path to the results file, 'empty_file', or `None` 190 """ 191 if self.data["is_finished"] and self.data["num_rows"] > 0: 192 return self.folder.joinpath(self.data["result_file"]) 193 elif self.data["is_finished"] and self.data["num_rows"] == 0: 194 return 'empty' 195 else: 196 return None 197 198 def get_results_path(self): 199 """ 200 Get path to results file 201 202 Always returns a path, that will at some point contain the dataset 203 data, but may not do so yet. Use this to get the location to write 204 generated results to. 205 206 :return Path: A path to the results file 207 """ 208 return self.folder.joinpath(self.data["result_file"]) 209 210 def get_results_folder_path(self): 211 """ 212 Get path to folder containing accompanying results 213 214 Returns a path that may not yet be created 215 216 :return Path: A path to the results file 217 """ 218 return self.folder.joinpath("folder_" + self.key) 219 220 def get_log_path(self): 221 """ 222 Get path to dataset log file 223 224 Each dataset has a single log file that documents its creation. This 225 method returns the path to that file. It is identical to the path of 226 the dataset result file, with 'log' as its extension instead. 227 228 :return Path: A path to the log file 229 """ 230 return self.get_results_path().with_suffix(".log") 231 232 def clear_log(self): 233 """ 234 Clears the dataset log file 235 236 If the log file does not exist, it is created empty. The log file will 237 have the same file name as the dataset result file, with the 'log' 238 extension. 239 """ 240 log_path = self.get_log_path() 241 with log_path.open("w") as outfile: 242 pass 243 244 def log(self, log): 245 """ 246 Write log message to file 247 248 Writes the log message to the log file on a new line, including a 249 timestamp at the start of the line. Note that this assumes the log file 250 already exists - it should have been created/cleared with clear_log() 251 prior to calling this. 252 253 :param str log: Log message to write 254 """ 255 log_path = self.get_log_path() 256 with log_path.open("a", encoding="utf-8") as outfile: 257 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log)) 258 259 def _iterate_items(self, processor=None): 260 """ 261 A generator that iterates through a CSV or NDJSON file 262 263 This is an internal method and should not be called directly. Rather, 264 call iterate_items() and use the generated dictionary and its properties. 265 266 If a reference to a processor is provided, with every iteration, 267 the processor's 'interrupted' flag is checked, and if set a 268 ProcessorInterruptedException is raised, which by default is caught 269 in the worker and subsequently stops execution gracefully. 270 271 There are two file types that can be iterated (currently): CSV files 272 and NDJSON (newline-delimited JSON) files. In the future, one could 273 envision adding a pathway to retrieve items from e.g. a MongoDB 274 collection directly instead of from a static file 275 276 :param BasicProcessor processor: A reference to the processor 277 iterating the dataset. 278 :return generator: A generator that yields each item as a dictionary 279 """ 280 path = self.get_results_path() 281 282 # Yield through items one by one 283 if path.suffix.lower() == ".csv": 284 with path.open("rb") as infile: 285 wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8") 286 reader = csv.DictReader(wrapped_infile) 287 288 if not self.get_own_processor(): 289 # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not 290 first_item = next(reader) 291 if first_item is None or any([True for key in first_item if type(key) is not str]): 292 raise NotImplementedError(f"Cannot iterate through CSV file (deprecated processor {self.type})") 293 yield first_item 294 295 for item in reader: 296 if hasattr(processor, "interrupted") and processor.interrupted: 297 raise ProcessorInterruptedException("Processor interrupted while iterating through CSV file") 298 299 yield item 300 301 elif path.suffix.lower() == ".ndjson": 302 # In NDJSON format each line in the file is a self-contained JSON 303 with path.open(encoding="utf-8") as infile: 304 for line in infile: 305 if hasattr(processor, "interrupted") and processor.interrupted: 306 raise ProcessorInterruptedException("Processor interrupted while iterating through NDJSON file") 307 308 yield json.loads(line) 309 310 else: 311 raise NotImplementedError("Cannot iterate through %s file" % path.suffix) 312 313 def iterate_items(self, processor=None, warn_unmappable=True, map_missing="default"): 314 """ 315 Generate mapped dataset items 316 317 Wrapper for _iterate_items that returns a DatasetItem, which can be 318 accessed as a dict returning the original item or (if a mapper is 319 available) the mapped item. Mapped or original versions of the item can 320 also be accessed via the `original` and `mapped_object` properties of 321 the DatasetItem. 322 323 Processors can define a method called `map_item` that can be used to map 324 an item from the dataset file before it is processed any further. This is 325 slower than storing the data file in the right format to begin with but 326 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 327 are nested objects when retrieved from the twitter API that are easier 328 to store as a JSON file than as a flat CSV file, and it would be a shame 329 to throw away that data. 330 331 Note the two parameters warn_unmappable and map_missing. Items can be 332 unmappable in that their structure is too different to coerce into a 333 neat dictionary of the structure the data source expects. This makes it 334 'unmappable' and warn_unmappable determines what happens in this case. 335 It can also be of the right structure, but with some fields missing or 336 incomplete. map_missing determines what happens in that case. The 337 latter is for example possible when importing data via Zeeschuimer, 338 which produces unstably-structured data captured from social media 339 sites. 340 341 :param BasicProcessor processor: A reference to the processor 342 iterating the dataset. 343 :param bool warn_unmappable: If an item is not mappable, skip the item 344 and log a warning 345 :param map_missing: Indicates what to do with mapped items for which 346 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 347 - 'default': fill missing fields with the default passed by map_item 348 - 'abort': raise a MappedItemIncompleteException if a field is missing 349 - a callback: replace missing field with the return value of the 350 callback. The MappedItem object is passed to the callback as the 351 first argument and the name of the missing field as the second. 352 - a dictionary with a key for each possible missing field: replace missing 353 field with a strategy for that field ('default', 'abort', or a callback) 354 355 :return generator: A generator that yields DatasetItems 356 """ 357 unmapped_items = False 358 # Collect item_mapper for use with filter 359 item_mapper = False 360 own_processor = self.get_own_processor() 361 if own_processor and own_processor.map_item_method_available(dataset=self): 362 item_mapper = True 363 364 # missing field strategy can be for all fields at once, or per field 365 # if it is per field, it is a dictionary with field names and their strategy 366 # if it is for all fields, it is may be a callback, 'abort', or 'default' 367 default_strategy = "default" 368 if type(map_missing) is not dict: 369 default_strategy = map_missing 370 map_missing = {} 371 372 # Loop through items 373 for i, item in enumerate(self._iterate_items(processor)): 374 # Save original to yield 375 original_item = item.copy() 376 377 # Map item 378 if item_mapper: 379 try: 380 mapped_item = own_processor.get_mapped_item(item) 381 except MapItemException as e: 382 if warn_unmappable: 383 self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False) 384 unmapped_items = True 385 continue 386 387 # check if fields have been marked as 'missing' in the 388 # underlying data, and treat according to the chosen strategy 389 if mapped_item.get_missing_fields(): 390 for missing_field in mapped_item.get_missing_fields(): 391 strategy = map_missing.get(missing_field, default_strategy) 392 393 if callable(strategy): 394 # delegate handling to a callback 395 mapped_item.data[missing_field] = strategy(mapped_item.data, missing_field) 396 elif strategy == "abort": 397 # raise an exception to be handled at the processor level 398 raise MappedItemIncompleteException(f"Cannot process item, field {missing_field} missing in source data.") 399 elif strategy == "default": 400 # use whatever was passed to the object constructor 401 mapped_item.data[missing_field] = mapped_item.data[missing_field].value 402 else: 403 raise ValueError("map_missing must be 'abort', 'default', or a callback.") 404 405 else: 406 mapped_item = original_item 407 408 # yield a DatasetItem, which is a dict with some special properties 409 yield DatasetItem(mapper=item_mapper, original=original_item, mapped_object=mapped_item, **(mapped_item.get_item_data() if type(mapped_item) is MappedItem else mapped_item)) 410 411 def get_staging_area(self): 412 """ 413 Get path to a temporary folder in which files can be stored before 414 finishing 415 416 This folder must be created before use, but is guaranteed to not exist 417 yet. The folder may be used as a staging area for the dataset data 418 while it is being processed. 419 420 :return Path: Path to folder 421 """ 422 results_file = self.get_results_path() 423 424 results_dir_base = results_file.parent 425 results_dir = results_file.name.replace(".", "") + "-staging" 426 results_path = results_dir_base.joinpath(results_dir) 427 index = 1 428 while results_path.exists(): 429 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 430 index += 1 431 432 # create temporary folder 433 results_path.mkdir() 434 435 # Storing the staging area with the dataset so that it can be removed later 436 self.staging_areas.append(results_path) 437 438 return results_path 439 440 def remove_staging_areas(self): 441 """ 442 Remove any staging areas that were created and all files contained in them. 443 """ 444 # Remove DataSet staging areas 445 if self.staging_areas: 446 for staging_area in self.staging_areas: 447 if staging_area.is_dir(): 448 shutil.rmtree(staging_area) 449 450 def finish(self, num_rows=0): 451 """ 452 Declare the dataset finished 453 """ 454 if self.data["is_finished"]: 455 raise RuntimeError("Cannot finish a finished dataset again") 456 457 self.db.update("datasets", where={"key": self.data["key"]}, 458 data={"is_finished": True, "num_rows": num_rows, "progress": 1.0, "timestamp_finished": int(time.time())}) 459 self.data["is_finished"] = True 460 self.data["num_rows"] = num_rows 461 462 def copy(self, shallow=True): 463 """ 464 Copies the dataset, making a new version with a unique key 465 466 467 :param bool shallow: Shallow copy: does not copy the result file, but 468 instead refers to the same file as the original dataset did 469 :return Dataset: Copied dataset 470 """ 471 parameters = self.parameters.copy() 472 473 # a key is partially based on the parameters. so by setting these extra 474 # attributes, we also ensure a unique key will be generated for the 475 # copy 476 # possibly todo: don't use time for uniqueness (but one shouldn't be 477 # copying a dataset multiple times per microsecond, that's not what 478 # this is for) 479 parameters["copied_from"] = self.key 480 parameters["copied_at"] = time.time() 481 482 copy = DataSet(parameters=parameters, db=self.db, extension=self.result_file.split(".")[-1], type=self.type, modules=self.modules) 483 for field in self.data: 484 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 485 continue 486 487 copy.__setattr__(field, self.data[field]) 488 489 if shallow: 490 # use the same result file 491 copy.result_file = self.result_file 492 else: 493 # copy to new file with new key 494 shutil.copy(self.get_results_path(), copy.get_results_path()) 495 496 if self.is_finished(): 497 copy.finish(self.num_rows) 498 499 # make sure ownership is also copied 500 copy.copy_ownership_from(self) 501 502 return copy 503 504 def delete(self, commit=True, queue=None): 505 """ 506 Delete the dataset, and all its children 507 508 Deletes both database records and result files. Note that manipulating 509 a dataset object after it has been deleted is undefined behaviour. 510 511 :param bool commit: Commit SQL DELETE query? 512 """ 513 # first, recursively delete children 514 children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,)) 515 for child in children: 516 try: 517 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 518 child.delete(commit=commit) 519 except DataSetException: 520 # dataset already deleted - race condition? 521 pass 522 523 # delete any queued jobs for this dataset 524 try: 525 job = Job.get_by_remote_ID(self.key, self.db, self.type) 526 if job.is_claimed: 527 # tell API to stop any jobs running for this dataset 528 # level 2 = cancel job 529 # we're not interested in the result - if the API is available, 530 # it will do its thing, if it's not the backend is probably not 531 # running so the job also doesn't need to be interrupted 532 call_api( 533 "cancel-job", 534 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 535 False 536 ) 537 538 # this deletes the job from the database 539 job.finish(True) 540 541 except JobNotFoundException: 542 pass 543 544 # delete from database 545 self.db.delete("datasets", where={"key": self.key}, commit=commit) 546 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 547 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 548 549 # delete from drive 550 try: 551 if self.get_results_path().exists(): 552 self.get_results_path().unlink() 553 if self.get_results_path().with_suffix(".log").exists(): 554 self.get_results_path().with_suffix(".log").unlink() 555 if self.get_results_folder_path().exists(): 556 shutil.rmtree(self.get_results_folder_path()) 557 558 except FileNotFoundError: 559 # already deleted, apparently 560 pass 561 except PermissionError as e: 562 self.db.log.error(f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}") 563 564 def update_children(self, **kwargs): 565 """ 566 Update an attribute for all child datasets 567 568 Can be used to e.g. change the owner, version, finished status for all 569 datasets in a tree 570 571 :param kwargs: Parameters corresponding to known dataset attributes 572 """ 573 children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,)) 574 for child in children: 575 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 576 for attr, value in kwargs.items(): 577 child.__setattr__(attr, value) 578 579 child.update_children(**kwargs) 580 581 def is_finished(self): 582 """ 583 Check if dataset is finished 584 :return bool: 585 """ 586 return self.data["is_finished"] == True 587 588 def is_rankable(self, multiple_items=True): 589 """ 590 Determine if a dataset is rankable 591 592 Rankable means that it is a CSV file with 'date' and 'value' columns 593 as well as one or more item label columns 594 595 :param bool multiple_items: Consider datasets with multiple items per 596 item (e.g. word_1, word_2, etc)? 597 598 :return bool: Whether the dataset is rankable or not 599 """ 600 if self.get_results_path().suffix != ".csv" or not self.get_results_path().exists(): 601 return False 602 603 column_options = {"date", "value", "item"} 604 if multiple_items: 605 column_options.add("word_1") 606 607 with self.get_results_path().open(encoding="utf-8") as infile: 608 reader = csv.DictReader(infile) 609 try: 610 return len(set(reader.fieldnames) & column_options) >= 3 611 except (TypeError, ValueError): 612 return False 613 614 def is_accessible_by(self, username, role="owner"): 615 """ 616 Check if dataset has given user as owner 617 618 :param str|User username: Username to check for 619 :return bool: 620 """ 621 if type(username) is not str: 622 if hasattr(username, "get_id"): 623 username = username.get_id() 624 else: 625 raise TypeError("User must be a str or User object") 626 627 # 'normal' owners 628 if username in [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]: 629 return True 630 631 # owners that are owner by being part of a tag 632 if username in itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if (role is None or self.owners[f"tag:{tag}"]["role"] == role)]): 633 return True 634 635 return False 636 637 def get_owners_users(self, role="owner"): 638 """ 639 Get list of dataset owners 640 641 This returns a list of *users* that are considered owners. Tags are 642 transparently replaced with the users with that tag. 643 644 :param str|None role: Role to check for. If `None`, all owners are 645 returned regardless of role. 646 647 :return set: De-duplicated owner list 648 """ 649 # 'normal' owners 650 owners = [owner for owner, meta in self.owners.items() if 651 (role is None or meta["role"] == role) and not owner.startswith("tag:")] 652 653 # owners that are owner by being part of a tag 654 owners.extend(itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if 655 role is None or self.owners[f"tag:{tag}"]["role"] == role])) 656 657 # de-duplicate before returning 658 return set(owners) 659 660 def get_owners(self, role="owner"): 661 """ 662 Get list of dataset owners 663 664 This returns a list of all owners, and does not transparently resolve 665 tags (like `get_owners_users` does). 666 667 :param str|None role: Role to check for. If `None`, all owners are 668 returned regardless of role. 669 670 :return set: De-duplicated owner list 671 """ 672 return [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)] 673 674 def add_owner(self, username, role="owner"): 675 """ 676 Set dataset owner 677 678 If the user is already an owner, but with a different role, the role is 679 updated. If the user is already an owner with the same role, nothing happens. 680 681 :param str|User username: Username to set as owner 682 :param str|None role: Role to add user with. 683 """ 684 if type(username) is not str: 685 if hasattr(username, "get_id"): 686 username = username.get_id() 687 else: 688 raise TypeError("User must be a str or User object") 689 690 if username not in self.owners: 691 self.owners[username] = { 692 "name": username, 693 "key": self.key, 694 "role": role 695 } 696 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 697 698 elif username in self.owners and self.owners[username]["role"] != role: 699 self.db.update("datasets_owners", data={"role": role}, where={"name": username, "key": self.key}) 700 self.owners[username]["role"] = role 701 702 if username.startswith("tag:"): 703 # this is a bit more complicated than just adding to the list of 704 # owners, so do a full refresh 705 self.refresh_owners() 706 707 # make sure children's owners remain in sync 708 for child in self.children: 709 child.add_owner(username, role) 710 # not recursive, since we're calling it from recursive code! 711 child.copy_ownership_from(self, recursive=False) 712 713 def remove_owner(self, username): 714 """ 715 Remove dataset owner 716 717 If no owner is set, the dataset is assigned to the anonymous user. 718 If the user is not an owner, nothing happens. 719 720 :param str|User username: Username to set as owner 721 """ 722 if type(username) is not str: 723 if hasattr(username, "get_id"): 724 username = username.get_id() 725 else: 726 raise TypeError("User must be a str or User object") 727 728 if username in self.owners: 729 del self.owners[username] 730 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 731 732 if not self.owners: 733 self.add_owner("anonymous") 734 735 if username in self.tagged_owners: 736 del self.tagged_owners[username] 737 738 # make sure children's owners remain in sync 739 for child in self.children: 740 child.remove_owner(username) 741 # not recursive, since we're calling it from recursive code! 742 child.copy_ownership_from(self, recursive=False) 743 744 def refresh_owners(self): 745 """ 746 Update internal owner cache 747 748 This makes sure that the list of *users* and *tags* which can access the 749 dataset is up to date. 750 """ 751 self.owners = {owner["name"]: owner for owner in self.db.fetchall("SELECT * FROM datasets_owners WHERE key = %s", (self.key,))} 752 753 # determine which users (if any) are owners of the dataset by having a 754 # tag that is listed as an owner 755 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 756 if owner_tags: 757 tagged_owners = self.db.fetchall("SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)) 758 self.tagged_owners = { 759 owner_tag: [user["name"] for user in tagged_owners if owner_tag in user["tags"]] 760 for owner_tag in owner_tags 761 } 762 else: 763 self.tagged_owners = {} 764 765 def copy_ownership_from(self, dataset, recursive=True): 766 """ 767 Copy ownership 768 769 This is useful to e.g. make sure a dataset's ownership stays in sync 770 with its parent 771 772 :param Dataset dataset: Parent to copy from 773 :return: 774 """ 775 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 776 777 for role in ("owner", "viewer"): 778 owners = dataset.get_owners(role=role) 779 for owner in owners: 780 self.db.insert("datasets_owners", data={"key": self.key, "name": owner, "role": role}, commit=False, safe=True) 781 782 self.db.commit() 783 if recursive: 784 for child in self.children: 785 child.copy_ownership_from(self, recursive=recursive) 786 787 def get_parameters(self): 788 """ 789 Get dataset parameters 790 791 The dataset parameters are stored as JSON in the database - parse them 792 and return the resulting object 793 794 :return: Dataset parameters as originally stored 795 """ 796 try: 797 return json.loads(self.data["parameters"]) 798 except json.JSONDecodeError: 799 return {} 800 801 def get_columns(self): 802 """ 803 Returns the dataset columns. 804 805 Useful for processor input forms. Can deal with both CSV and NDJSON 806 files, the latter only if a `map_item` function is available in the 807 processor that generated it. While in other cases one could use the 808 keys of the JSON object, this is not always possible in follow-up code 809 that uses the 'column' names, so for consistency this function acts as 810 if no column can be parsed if no `map_item` function exists. 811 812 :return list: List of dataset columns; empty list if unable to parse 813 """ 814 if not self.get_results_path().exists(): 815 # no file to get columns from 816 return [] 817 818 if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)): 819 items = self.iterate_items(warn_unmappable=False) 820 try: 821 keys = list(items.__next__().keys()) 822 except (StopIteration, NotImplementedError): 823 # No items or otherwise unable to iterate 824 return [] 825 finally: 826 del items 827 return keys 828 else: 829 # Filetype not CSV or an NDJSON with `map_item` 830 return [] 831 832 def get_annotation_fields(self): 833 """ 834 Retrieves the saved annotation fields for this dataset. 835 :return dict: The saved annotation fields. 836 """ 837 838 annotation_fields = self.db.fetchone("SELECT annotation_fields FROM datasets WHERE key = %s;", (self.top_parent().key,)) 839 840 if annotation_fields and annotation_fields.get("annotation_fields"): 841 annotation_fields = json.loads(annotation_fields["annotation_fields"]) 842 else: 843 annotation_fields = {} 844 845 return annotation_fields 846 847 def get_annotations(self): 848 """ 849 Retrieves the annotations for this dataset. 850 return dict: The annotations 851 """ 852 853 annotations = self.db.fetchone("SELECT annotations FROM annotations WHERE key = %s;", (self.top_parent().key,)) 854 855 if annotations and annotations.get("annotations"): 856 return json.loads(annotations["annotations"]) 857 else: 858 return None 859 860 def update_label(self, label): 861 """ 862 Update label for this dataset 863 864 :param str label: New label 865 :return str: The new label, as returned by get_label 866 """ 867 self.parameters["label"] = label 868 869 self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key}) 870 return self.get_label() 871 872 def get_label(self, parameters=None, default="Query"): 873 """ 874 Generate a readable label for the dataset 875 876 :param dict parameters: Parameters of the dataset 877 :param str default: Label to use if it cannot be inferred from the 878 parameters 879 880 :return str: Label 881 """ 882 if not parameters: 883 parameters = self.parameters 884 885 if parameters.get("label"): 886 return parameters["label"] 887 elif parameters.get("body_query") and parameters["body_query"] != "empty": 888 return parameters["body_query"] 889 elif parameters.get("body_match") and parameters["body_match"] != "empty": 890 return parameters["body_match"] 891 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 892 return parameters["subject_query"] 893 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 894 return parameters["subject_match"] 895 elif parameters.get("query"): 896 label = parameters["query"] 897 # Some legacy datasets have lists as query data 898 if isinstance(label, list): 899 label = ", ".join(label) 900 901 label = label if len(label) < 30 else label[:25] + "..." 902 label = label.strip().replace("\n", ", ") 903 return label 904 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 905 return "Flag: %s" % parameters["country_flag"] 906 elif parameters.get("country_name") and parameters["country_name"] != "all": 907 return "Country: %s" % parameters["country_name"] 908 elif parameters.get("filename"): 909 return parameters["filename"] 910 elif parameters.get("board") and "datasource" in parameters: 911 return parameters["datasource"] + "/" + parameters["board"] 912 elif "datasource" in parameters and parameters["datasource"] in self.modules.datasources: 913 return self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 914 else: 915 return default 916 917 def change_datasource(self, datasource): 918 """ 919 Change the datasource type for this dataset 920 921 :param str label: New datasource type 922 :return str: The new datasource type 923 """ 924 925 self.parameters["datasource"] = datasource 926 927 self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key}) 928 return datasource 929 930 def reserve_result_file(self, parameters=None, extension="csv"): 931 """ 932 Generate a unique path to the results file for this dataset 933 934 This generates a file name for the data file of this dataset, and makes sure 935 no file exists or will exist at that location other than the file we 936 expect (i.e. the data for this particular dataset). 937 938 :param str extension: File extension, "csv" by default 939 :param parameters: Dataset parameters 940 :return bool: Whether the file path was successfully reserved 941 """ 942 if self.data["is_finished"]: 943 raise RuntimeError("Cannot reserve results file for a finished dataset") 944 945 # Use 'random' for random post queries 946 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 947 file = 'random-' + str(parameters["random_amount"]) + '-' + self.data["key"] 948 # Use country code for country flag queries 949 elif "country_flag" in parameters and parameters["country_flag"] != 'all': 950 file = 'countryflag-' + str(parameters["country_flag"]) + '-' + self.data["key"] 951 # Use the query string for all other queries 952 else: 953 query_bit = self.data["query"].replace(" ", "-").lower() 954 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 955 query_bit = query_bit[:100] # Crop to avoid OSError 956 file = query_bit + "-" + self.data["key"] 957 file = re.sub(r"[-]+", "-", file) 958 959 path = self.folder.joinpath(file + "." + extension.lower()) 960 index = 1 961 while path.is_file(): 962 path = self.folder.joinpath(file + "-" + str(index) + "." + extension.lower()) 963 index += 1 964 965 file = path.name 966 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 967 data={"result_file": file}) 968 self.data["result_file"] = file 969 return updated > 0 970 971 def get_key(self, query, parameters, parent="", time_offset=0): 972 """ 973 Generate a unique key for this dataset that can be used to identify it 974 975 The key is a hash of a combination of the query string and parameters. 976 You never need to call this, really: it's used internally. 977 978 :param str query: Query string 979 :param parameters: Dataset parameters 980 :param parent: Parent dataset's key (if applicable) 981 :param time_offset: Offset to add to the time component of the dataset 982 key. This can be used to ensure a unique key even if the parameters and 983 timing is otherwise identical to an existing dataset's 984 985 :return str: Dataset key 986 """ 987 # Return a hash based on parameters 988 # we're going to use the hash of the parameters to uniquely identify 989 # the dataset, so make sure it's always in the same order, or we might 990 # end up creating multiple keys for the same dataset if python 991 # decides to return the dict in a different order 992 param_key = collections.OrderedDict() 993 for key in sorted(parameters): 994 param_key[key] = parameters[key] 995 996 # we additionally use the current time as a salt - this should usually 997 # ensure a unique key for the dataset. if for some reason there is a 998 # hash collision 999 param_key["_salt"] = int(time.time()) + time_offset 1000 1001 parent_key = str(parent) if parent else "" 1002 plain_key = repr(param_key) + str(query) + parent_key 1003 hashed_key = hashlib.md5(plain_key.encode("utf-8")).hexdigest() 1004 1005 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1006 # key exists, generate a new one 1007 return self.get_key(query, parameters, parent, time_offset=random.randint(1,10)) 1008 else: 1009 return hashed_key 1010 1011 def set_key(self, key): 1012 """ 1013 Change dataset key 1014 1015 In principe, keys should never be changed. But there are rare cases 1016 where it is useful to do so, in particular when importing a dataset 1017 from another 4CAT instance; in that case it makes sense to try and 1018 ensure that the key is the same as it was before. This function sets 1019 the dataset key and updates any dataset references to it. 1020 1021 :param str key: Key to set 1022 :return str: Key that was set. If the desired key already exists, the 1023 original key is kept. 1024 """ 1025 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1026 if key_exists or not key: 1027 return self.key 1028 1029 old_key = self.key 1030 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1031 1032 # update references 1033 self.db.update("datasets", data={"key_parent": key}, where={"key_parent": old_key}) 1034 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1035 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1036 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1037 1038 # for good measure 1039 self.db.commit() 1040 self.key = key 1041 1042 return self.key 1043 1044 def get_status(self): 1045 """ 1046 Get Dataset status 1047 1048 :return string: Dataset status 1049 """ 1050 return self.data["status"] 1051 1052 def update_status(self, status, is_final=False): 1053 """ 1054 Update dataset status 1055 1056 The status is a string that may be displayed to a user to keep them 1057 updated and informed about the progress of a dataset. No memory is kept 1058 of earlier dataset statuses; the current status is overwritten when 1059 updated. 1060 1061 Statuses are also written to the dataset log file. 1062 1063 :param string status: Dataset status 1064 :param bool is_final: If this is `True`, subsequent calls to this 1065 method while the object is instantiated will not update the dataset 1066 status. 1067 :return bool: Status update successful? 1068 """ 1069 if self.no_status_updates: 1070 return 1071 1072 # for presets, copy the updated status to the preset(s) this is part of 1073 if self.preset_parent is None: 1074 self.preset_parent = [parent for parent in self.get_genealogy() if parent.type.find("preset-") == 0 and parent.key != self.key][:1] 1075 1076 if self.preset_parent: 1077 for preset_parent in self.preset_parent: 1078 if not preset_parent.is_finished(): 1079 preset_parent.update_status(status) 1080 1081 self.data["status"] = status 1082 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"status": status}) 1083 1084 if is_final: 1085 self.no_status_updates = True 1086 1087 self.log(status) 1088 1089 return updated > 0 1090 1091 def update_progress(self, progress): 1092 """ 1093 Update dataset progress 1094 1095 The progress can be used to indicate to a user how close the dataset 1096 is to completion. 1097 1098 :param float progress: Between 0 and 1. 1099 :return: 1100 """ 1101 progress = min(1, max(0, progress)) # clamp 1102 if type(progress) is int: 1103 progress = float(progress) 1104 1105 self.data["progress"] = progress 1106 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"progress": progress}) 1107 return updated > 0 1108 1109 def get_progress(self): 1110 """ 1111 Get dataset progress 1112 1113 :return float: Progress, between 0 and 1 1114 """ 1115 return self.data["progress"] 1116 1117 def finish_with_error(self, error): 1118 """ 1119 Set error as final status, and finish with 0 results 1120 1121 This is a convenience function to avoid having to repeat 1122 "update_status" and "finish" a lot. 1123 1124 :param str error: Error message for final dataset status. 1125 :return: 1126 """ 1127 self.update_status(error, is_final=True) 1128 self.finish(0) 1129 1130 return None 1131 1132 def update_version(self, version): 1133 """ 1134 Update software version used for this dataset 1135 1136 This can be used to verify the code that was used to process this dataset. 1137 1138 :param string version: Version identifier 1139 :return bool: Update successul? 1140 """ 1141 try: 1142 # this fails if the processor type is unknown 1143 # edge case, but let's not crash... 1144 processor_path = self.modules.processors.get(self.data["type"]).filepath 1145 except AttributeError: 1146 processor_path = "" 1147 1148 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={ 1149 "software_version": version[0], 1150 "software_source": version[1], 1151 "software_file": processor_path 1152 }) 1153 1154 return updated > 0 1155 1156 def delete_parameter(self, parameter, instant=True): 1157 """ 1158 Delete a parameter from the dataset metadata 1159 1160 :param string parameter: Parameter to delete 1161 :param bool instant: Also delete parameters in this instance object? 1162 :return bool: Update successul? 1163 """ 1164 parameters = self.parameters.copy() 1165 if parameter in parameters: 1166 del parameters[parameter] 1167 else: 1168 return False 1169 1170 updated = self.db.update("datasets", where={"key": self.data["key"]}, 1171 data={"parameters": json.dumps(parameters)}) 1172 1173 if instant: 1174 self.parameters = parameters 1175 1176 return updated > 0 1177 1178 def get_version_url(self, file): 1179 """ 1180 Get a versioned github URL for the version this dataset was processed with 1181 1182 :param file: File to link within the repository 1183 :return: URL, or an empty string 1184 """ 1185 if not self.data["software_source"]: 1186 return "" 1187 1188 filepath = self.data.get("software_file", "") 1189 if filepath.startswith("/extensions/"): 1190 # go to root of extension 1191 filepath = "/" + "/".join(filepath.split("/")[3:]) 1192 1193 return self.data["software_source"] + "/blob/" + self.data["software_version"] + filepath 1194 1195 def top_parent(self): 1196 """ 1197 Get root dataset 1198 1199 Traverses the tree of datasets this one is part of until it finds one 1200 with no source_dataset dataset, then returns that dataset. 1201 1202 :return Dataset: Parent dataset 1203 """ 1204 genealogy = self.get_genealogy() 1205 return genealogy[0] 1206 1207 def get_genealogy(self, inclusive=False): 1208 """ 1209 Get genealogy of this dataset 1210 1211 Creates a list of DataSet objects, with the first one being the 1212 'top' dataset, and each subsequent one being a child of the previous 1213 one, ending with the current dataset. 1214 1215 :return list: Dataset genealogy, oldest dataset first 1216 """ 1217 if self.genealogy and not inclusive: 1218 return self.genealogy 1219 1220 key_parent = self.key_parent 1221 genealogy = [] 1222 1223 while key_parent: 1224 try: 1225 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1226 except DataSetException: 1227 break 1228 1229 genealogy.append(parent) 1230 if parent.key_parent: 1231 key_parent = parent.key_parent 1232 else: 1233 break 1234 1235 genealogy.reverse() 1236 genealogy.append(self) 1237 1238 self.genealogy = genealogy 1239 return self.genealogy 1240 1241 def get_all_children(self, recursive=True): 1242 """ 1243 Get all children of this dataset 1244 1245 Results are returned as a non-hierarchical list, i.e. the result does 1246 not reflect the actual dataset hierarchy (but all datasets in the 1247 result will have the original dataset as an ancestor somewhere) 1248 1249 :return list: List of DataSets 1250 """ 1251 children = [DataSet(data=record, db=self.db, modules=self.modules) for record in self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))] 1252 results = children.copy() 1253 if recursive: 1254 for child in children: 1255 results += child.get_all_children(recursive) 1256 1257 return results 1258 1259 def nearest(self, type_filter): 1260 """ 1261 Return nearest dataset that matches the given type 1262 1263 Starting with this dataset, traverse the hierarchy upwards and return 1264 whichever dataset matches the given type. 1265 1266 :param str type_filter: Type filter. Can contain wildcards and is matched 1267 using `fnmatch.fnmatch`. 1268 :return: Earliest matching dataset, or `None` if none match. 1269 """ 1270 genealogy = self.get_genealogy(inclusive=True) 1271 for dataset in reversed(genealogy): 1272 if fnmatch.fnmatch(dataset.type, type_filter): 1273 return dataset 1274 1275 return None 1276 1277 def get_breadcrumbs(self): 1278 """ 1279 Get breadcrumbs navlink for use in permalinks 1280 1281 Returns a string representing this dataset's genealogy that may be used 1282 to uniquely identify it. 1283 1284 :return str: Nav link 1285 """ 1286 if self.genealogy: 1287 return ",".join([dataset.key for dataset in self.genealogy]) 1288 else: 1289 # Collect keys only 1290 key_parent = self.key # Start at the bottom 1291 genealogy = [] 1292 1293 while key_parent: 1294 try: 1295 parent = self.db.fetchone("SELECT key_parent FROM datasets WHERE key = %s", (key_parent,)) 1296 except TypeError: 1297 break 1298 1299 key_parent = parent["key_parent"] 1300 if key_parent: 1301 genealogy.append(key_parent) 1302 else: 1303 break 1304 1305 genealogy.reverse() 1306 genealogy.append(self.key) 1307 return ",".join(genealogy) 1308 1309 def get_compatible_processors(self, user=None): 1310 """ 1311 Get list of processors compatible with this dataset 1312 1313 Checks whether this dataset type is one that is listed as being accepted 1314 by the processor, for each known type: if the processor does not 1315 specify accepted types (via the `is_compatible_with` method), it is 1316 assumed it accepts any top-level datasets 1317 1318 :param str|User|None user: User to get compatibility for. If set, 1319 use the user-specific config settings where available. 1320 1321 :return dict: Compatible processors, `name => class` mapping 1322 """ 1323 processors = self.modules.processors 1324 1325 available = {} 1326 for processor_type, processor in processors.items(): 1327 if processor.is_from_collector(): 1328 continue 1329 1330 own_processor = self.get_own_processor() 1331 if own_processor and own_processor.exclude_followup_processors(processor_type): 1332 continue 1333 1334 # consider a processor compatible if its is_compatible_with 1335 # method returns True *or* if it has no explicit compatibility 1336 # check and this dataset is top-level (i.e. has no parent) 1337 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1338 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, user=user)): 1339 available[processor_type] = processor 1340 1341 return available 1342 1343 def get_place_in_queue(self, update=False): 1344 """ 1345 Determine dataset's position in queue 1346 1347 If the dataset is already finished, the position is -1. Else, the 1348 position is the amount of datasets to be completed before this one will 1349 be processed. A position of 0 would mean that the dataset is currently 1350 being executed, or that the backend is not running. 1351 1352 :param bool update: Update the queue position from database if True, else return cached value 1353 :return int: Queue position 1354 """ 1355 if self.is_finished() or not self.data.get("job"): 1356 self._queue_position = -1 1357 return self._queue_position 1358 elif not update and self._queue_position is not None: 1359 # Use cached value 1360 return self._queue_position 1361 else: 1362 # Collect queue position from database via the job 1363 try: 1364 job = Job.get_by_ID(self.data["job"], self.db) 1365 self._queue_position = job.get_place_in_queue() 1366 except JobNotFoundException: 1367 self._queue_position = -1 1368 1369 return self._queue_position 1370 1371 def get_modules(self): 1372 """ 1373 Get 4CAT modules 1374 1375 Is a function because loading them is not free, and this way we can 1376 cache the result. 1377 1378 :return: 1379 """ 1380 if not self.modules: 1381 self.modules = ModuleCollector() 1382 1383 return self.modules 1384 1385 def get_own_processor(self): 1386 """ 1387 Get the processor class that produced this dataset 1388 1389 :return: Processor class, or `None` if not available. 1390 """ 1391 processor_type = self.parameters.get("type", self.data.get("type")) 1392 1393 return self.modules.processors.get(processor_type) 1394 1395 def get_available_processors(self, user=None, exclude_hidden=False): 1396 """ 1397 Get list of processors that may be run for this dataset 1398 1399 Returns all compatible processors except for those that are already 1400 queued or finished and have no options. Processors that have been 1401 run but have options are included so they may be run again with a 1402 different configuration 1403 1404 :param str|User|None user: User to get compatibility for. If set, 1405 use the user-specific config settings where available. 1406 :param bool exclude_hidden: Exclude processors that should be displayed 1407 in the UI? If `False`, all processors are returned. 1408 1409 :return dict: Available processors, `name => properties` mapping 1410 """ 1411 if self.available_processors: 1412 # Update to reflect exclude_hidden parameter which may be different from last call 1413 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1414 return {processor_type: processor for processor_type, processor in self.available_processors.items() if not exclude_hidden or not processor.is_hidden} 1415 1416 processors = self.get_compatible_processors(user=user) 1417 1418 for analysis in self.children: 1419 if analysis.type not in processors: 1420 continue 1421 1422 if not processors[analysis.type].get_options(): 1423 del processors[analysis.type] 1424 continue 1425 1426 if exclude_hidden and processors[analysis.type].is_hidden: 1427 del processors[analysis.type] 1428 1429 self.available_processors = processors 1430 return processors 1431 1432 def link_job(self, job): 1433 """ 1434 Link this dataset to a job ID 1435 1436 Updates the dataset data to include a reference to the job that will be 1437 executing (or has already executed) this job. 1438 1439 Note that if no job can be found for this dataset, this method silently 1440 fails. 1441 1442 :param Job job: The job that will run this dataset 1443 1444 :todo: If the job column ever gets used, make sure it always contains 1445 a valid value, rather than silently failing this method. 1446 """ 1447 if type(job) != Job: 1448 raise TypeError("link_job requires a Job object as its argument") 1449 1450 if "id" not in job.data: 1451 try: 1452 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1453 except JobNotFoundException: 1454 return 1455 1456 self.db.update("datasets", where={"key": self.key}, data={"job": job.data["id"]}) 1457 1458 def link_parent(self, key_parent): 1459 """ 1460 Set source_dataset key for this dataset 1461 1462 :param key_parent: Parent key. Not checked for validity 1463 """ 1464 self.db.update("datasets", where={"key": self.key}, data={"key_parent": key_parent}) 1465 1466 def get_parent(self): 1467 """ 1468 Get parent dataset 1469 1470 :return DataSet: Parent dataset, or `None` if not applicable 1471 """ 1472 return DataSet(key=self.key_parent, db=self.db, modules=self.modules) if self.key_parent else None 1473 1474 def detach(self): 1475 """ 1476 Makes the datasets standalone, i.e. not having any source_dataset dataset 1477 """ 1478 self.link_parent("") 1479 1480 def is_dataset(self): 1481 """ 1482 Easy way to confirm this is a dataset. 1483 Used for checking processor and dataset compatibility, 1484 which needs to handle both processors and datasets. 1485 """ 1486 return True 1487 1488 def is_top_dataset(self): 1489 """ 1490 Easy way to confirm this is a top dataset. 1491 Used for checking processor and dataset compatibility, 1492 which needs to handle both processors and datasets. 1493 """ 1494 if self.key_parent: 1495 return False 1496 return True 1497 1498 def is_expiring(self, user=None): 1499 """ 1500 Determine if dataset is set to expire 1501 1502 Similar to `is_expired`, but checks if the dataset will be deleted in 1503 the future, not if it should be deleted right now. 1504 1505 :param user: User to use for configuration context. Provide to make 1506 sure configuration overrides for this user are taken into account. 1507 :return bool|int: `False`, or the expiration date as a Unix timestamp. 1508 """ 1509 # has someone opted out of deleting this? 1510 if self.parameters.get("keep"): 1511 return False 1512 1513 # is this dataset explicitly marked as expiring after a certain time? 1514 if self.parameters.get("expires-after"): 1515 return self.parameters.get("expires-after") 1516 1517 # is the data source configured to have its datasets expire? 1518 expiration = config.get("datasources.expiration", {}, user=user) 1519 if not expiration.get(self.parameters.get("datasource")): 1520 return False 1521 1522 # is there a timeout for this data source? 1523 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1524 return self.timestamp + expiration.get(self.parameters.get("datasource")).get("timeout") 1525 1526 return False 1527 1528 def is_expired(self, user=None): 1529 """ 1530 Determine if dataset should be deleted 1531 1532 Datasets can be set to expire, but when they should be deleted depends 1533 on a number of factor. This checks them all. 1534 1535 :param user: User to use for configuration context. Provide to make 1536 sure configuration overrides for this user are taken into account. 1537 :return bool: 1538 """ 1539 # has someone opted out of deleting this? 1540 if not self.is_expiring(): 1541 return False 1542 1543 # is this dataset explicitly marked as expiring after a certain time? 1544 future = time.time() + 3600 # ensure we don't delete datasets with invalid expiration times 1545 if self.parameters.get("expires-after") and convert_to_int(self.parameters["expires-after"], future) < time.time(): 1546 return True 1547 1548 # is the data source configured to have its datasets expire? 1549 expiration = config.get("datasources.expiration", {}, user=user) 1550 if not expiration.get(self.parameters.get("datasource")): 1551 return False 1552 1553 # is the dataset older than the set timeout? 1554 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1555 return self.timestamp + expiration[self.parameters.get("datasource")]["timeout"] < time.time() 1556 1557 return False 1558 1559 def is_from_collector(self): 1560 """ 1561 Check if this dataset was made by a processor that collects data, i.e. 1562 a search or import worker. 1563 1564 :return bool: 1565 """ 1566 return self.type.endswith("-search") or self.type.endswith("-import") 1567 1568 def get_extension(self): 1569 """ 1570 Gets the file extention this dataset produces. 1571 Also checks whether the results file exists. 1572 Used for checking processor and dataset compatibility. 1573 1574 :return str extension: Extension, e.g. `csv` 1575 """ 1576 if self.get_results_path().exists(): 1577 return self.get_results_path().suffix[1:] 1578 1579 return False 1580 1581 def get_media_type(self): 1582 """ 1583 Gets the media type of the dataset file. 1584 1585 :return str: media type, e.g., "text" 1586 """ 1587 own_processor = self.get_own_processor() 1588 if hasattr(self, "media_type"): 1589 # media type can be defined explicitly in the dataset; this is the priority 1590 return self.media_type 1591 elif own_processor is not None: 1592 # or media type can be defined in the processor 1593 # some processors can set different media types for different datasets (e.g., import_media) 1594 if hasattr(own_processor, "media_type"): 1595 return own_processor.media_type 1596 1597 # Default to text 1598 return self.parameters.get("media_type", "text") 1599 1600 def get_metadata(self): 1601 """ 1602 Get dataset metadata 1603 1604 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 1605 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 1606 update its database (and ensure compatibility with the exporting version of 4CAT). 1607 """ 1608 metadata = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,)) 1609 1610 # get 4CAT version (presumably to ensure export is compatible with import) 1611 metadata["current_4CAT_version"] = get_software_version() 1612 return metadata 1613 1614 def get_result_url(self): 1615 """ 1616 Gets the 4CAT frontend URL of a dataset file. 1617 1618 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 1619 SERVER_HTTPS) plus hardcoded '/result/'. 1620 TODO: create more dynamic method of obtaining url. 1621 """ 1622 filename = self.get_results_path().name 1623 url_to_file = ('https://' if config.get("flask.https") else 'http://') + \ 1624 config.get("flask.server_name") + '/result/' + filename 1625 return url_to_file 1626 1627 def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True): 1628 """ 1629 Log an item that is unable to be mapped and warn administrators. 1630 1631 :param int item_count: Item index 1632 :param Processor processor: Processor calling function8 1633 """ 1634 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 1635 1636 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 1637 closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self 1638 # Log error to dataset log 1639 closest_dataset.log(dataset_error_message) 1640 1641 if warn_admins: 1642 if processor is not None: 1643 processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.") 1644 elif hasattr(self.db, "log"): 1645 # borrow the database's log handler 1646 self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.") 1647 else: 1648 # No other log available 1649 raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn") 1650 1651 def __getattr__(self, attr): 1652 """ 1653 Getter so we don't have to use .data all the time 1654 1655 :param attr: Data key to get 1656 :return: Value 1657 """ 1658 1659 if attr in dir(self): 1660 # an explicitly defined attribute should always be called in favour 1661 # of this passthrough 1662 attribute = getattr(self, attr) 1663 return attribute 1664 elif attr in self.data: 1665 return self.data[attr] 1666 else: 1667 raise AttributeError("DataSet instance has no attribute %s" % attr) 1668 1669 def __setattr__(self, attr, value): 1670 """ 1671 Setter so we can flexibly update the database 1672 1673 Also updates internal data stores (.data etc). If the attribute is 1674 unknown, it is stored within the 'parameters' attribute. 1675 1676 :param str attr: Attribute to update 1677 :param value: New value 1678 """ 1679 1680 # don't override behaviour for *actual* class attributes 1681 if attr in dir(self): 1682 super().__setattr__(attr, value) 1683 return 1684 1685 if attr not in self.data: 1686 self.parameters[attr] = value 1687 attr = "parameters" 1688 value = self.parameters 1689 1690 if attr == "parameters": 1691 value = json.dumps(value) 1692 1693 self.db.update("datasets", where={"key": self.key}, data={attr: value}) 1694 1695 self.data[attr] = value 1696 1697 if attr == "parameters": 1698 self.parameters = json.loads(value)
26class DataSet(FourcatModule): 27 """ 28 Provide interface to safely register and run operations on a dataset 29 30 A dataset is a collection of: 31 - A unique identifier 32 - A set of parameters that demarcate the data contained within 33 - The data 34 35 The data is usually stored in a file on the disk; the parameters are stored 36 in a database. The handling of the data, et cetera, is done by other 37 workers; this class defines method to create and manipulate the dataset's 38 properties. 39 """ 40 # Attributes must be created here to ensure getattr and setattr work properly 41 data = None 42 key = "" 43 44 children = None 45 available_processors = None 46 genealogy = None 47 preset_parent = None 48 parameters = None 49 modules = None 50 51 owners = None 52 tagged_owners = None 53 54 db = None 55 folder = None 56 is_new = True 57 58 no_status_updates = False 59 staging_areas = None 60 _queue_position = None 61 62 def __init__(self, parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None, 63 type=None, is_private=True, owner="anonymous", modules=None): 64 """ 65 Create new dataset object 66 67 If the dataset is not in the database yet, it is added. 68 69 :param dict parameters: Only when creating a new dataset. Dataset 70 parameters, free-form dictionary. 71 :param str key: Dataset key. If given, dataset with this key is loaded. 72 :param int job: Job ID. If given, dataset corresponding to job is 73 loaded. 74 :param dict data: Dataset data, corresponding to a row in the datasets 75 database table. If not given, retrieved from database depending on key. 76 :param db: Database connection 77 :param str parent: Only when creating a new dataset. Parent dataset 78 key to which the one being created is a child. 79 :param str extension: Only when creating a new dataset. Extension of 80 dataset result file. 81 :param str type: Only when creating a new dataset. Type of the dataset, 82 corresponding to the type property of a processor class. 83 :param bool is_private: Only when creating a new dataset. Whether the 84 dataset is private or public. 85 :param str owner: Only when creating a new dataset. The user name of 86 the dataset's creator. 87 :param modules: Module cache. If not given, will be loaded when needed 88 (expensive). Used to figure out what processors are compatible with 89 this dataset. 90 """ 91 self.db = db 92 self.folder = config.get('PATH_ROOT').joinpath(config.get('PATH_DATA')) 93 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 94 self.data = {} 95 self.parameters = {} 96 self.children = [] 97 self.available_processors = {} 98 self.genealogy = [] 99 self.staging_areas = [] 100 self.modules = modules 101 102 if key is not None: 103 self.key = key 104 current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,)) 105 if not current: 106 raise DataSetNotFoundException("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key) 107 108 elif job is not None: 109 current = self.db.fetchone("SELECT * FROM datasets WHERE parameters::json->>'job' = %s", (job,)) 110 if not current: 111 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 112 113 self.key = current["key"] 114 elif data is not None: 115 current = data 116 if "query" not in data or "key" not in data or "parameters" not in data or "key_parent" not in data: 117 raise DataSetException("DataSet() requires a complete dataset record for its 'data' argument") 118 119 self.key = current["key"] 120 else: 121 if parameters is None: 122 raise DataSetException("DataSet() requires either 'key', or 'parameters' to be given") 123 124 if not type: 125 raise DataSetException("Datasets must have their type set explicitly") 126 127 query = self.get_label(parameters, default=type) 128 self.key = self.get_key(query, parameters, parent) 129 current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s AND query = %s", (self.key, query)) 130 131 if current: 132 self.data = current 133 self.parameters = json.loads(self.data["parameters"]) 134 self.is_new = False 135 else: 136 self.data = {"type": type} # get_own_processor needs this 137 own_processor = self.get_own_processor() 138 version = get_software_commit(own_processor) 139 self.data = { 140 "key": self.key, 141 "query": self.get_label(parameters, default=type), 142 "parameters": json.dumps(parameters), 143 "result_file": "", 144 "creator": owner, 145 "status": "", 146 "type": type, 147 "timestamp": int(time.time()), 148 "is_finished": False, 149 "is_private": is_private, 150 "software_version": version[0], 151 "software_source": version[1], 152 "software_file": "", 153 "num_rows": 0, 154 "progress": 0.0, 155 "key_parent": parent 156 } 157 self.parameters = parameters 158 159 self.db.insert("datasets", data=self.data) 160 self.refresh_owners() 161 self.add_owner(owner) 162 163 # Find desired extension from processor if not explicitly set 164 if extension is None: 165 if own_processor: 166 extension = own_processor.get_extension(parent_dataset=DataSet(key=parent, db=db, modules=self.modules) if parent else None) 167 # Still no extension, default to 'csv' 168 if not extension: 169 extension = "csv" 170 171 # Reserve filename and update data['result_file'] 172 self.reserve_result_file(parameters, extension) 173 174 # retrieve analyses and processors that may be run for this dataset 175 analyses = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", (self.key,)) 176 self.children = sorted([DataSet(data=analysis, db=self.db, modules=self.modules) for analysis in analyses], 177 key=lambda dataset: dataset.is_finished(), reverse=True) 178 179 self.refresh_owners() 180 181 def check_dataset_finished(self): 182 """ 183 Checks if dataset is finished. Returns path to results file is not empty, 184 or 'empty_file' when there were not matches. 185 186 Only returns a path if the dataset is complete. In other words, if this 187 method returns a path, a file with the complete results for this dataset 188 will exist at that location. 189 190 :return: A path to the results file, 'empty_file', or `None` 191 """ 192 if self.data["is_finished"] and self.data["num_rows"] > 0: 193 return self.folder.joinpath(self.data["result_file"]) 194 elif self.data["is_finished"] and self.data["num_rows"] == 0: 195 return 'empty' 196 else: 197 return None 198 199 def get_results_path(self): 200 """ 201 Get path to results file 202 203 Always returns a path, that will at some point contain the dataset 204 data, but may not do so yet. Use this to get the location to write 205 generated results to. 206 207 :return Path: A path to the results file 208 """ 209 return self.folder.joinpath(self.data["result_file"]) 210 211 def get_results_folder_path(self): 212 """ 213 Get path to folder containing accompanying results 214 215 Returns a path that may not yet be created 216 217 :return Path: A path to the results file 218 """ 219 return self.folder.joinpath("folder_" + self.key) 220 221 def get_log_path(self): 222 """ 223 Get path to dataset log file 224 225 Each dataset has a single log file that documents its creation. This 226 method returns the path to that file. It is identical to the path of 227 the dataset result file, with 'log' as its extension instead. 228 229 :return Path: A path to the log file 230 """ 231 return self.get_results_path().with_suffix(".log") 232 233 def clear_log(self): 234 """ 235 Clears the dataset log file 236 237 If the log file does not exist, it is created empty. The log file will 238 have the same file name as the dataset result file, with the 'log' 239 extension. 240 """ 241 log_path = self.get_log_path() 242 with log_path.open("w") as outfile: 243 pass 244 245 def log(self, log): 246 """ 247 Write log message to file 248 249 Writes the log message to the log file on a new line, including a 250 timestamp at the start of the line. Note that this assumes the log file 251 already exists - it should have been created/cleared with clear_log() 252 prior to calling this. 253 254 :param str log: Log message to write 255 """ 256 log_path = self.get_log_path() 257 with log_path.open("a", encoding="utf-8") as outfile: 258 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log)) 259 260 def _iterate_items(self, processor=None): 261 """ 262 A generator that iterates through a CSV or NDJSON file 263 264 This is an internal method and should not be called directly. Rather, 265 call iterate_items() and use the generated dictionary and its properties. 266 267 If a reference to a processor is provided, with every iteration, 268 the processor's 'interrupted' flag is checked, and if set a 269 ProcessorInterruptedException is raised, which by default is caught 270 in the worker and subsequently stops execution gracefully. 271 272 There are two file types that can be iterated (currently): CSV files 273 and NDJSON (newline-delimited JSON) files. In the future, one could 274 envision adding a pathway to retrieve items from e.g. a MongoDB 275 collection directly instead of from a static file 276 277 :param BasicProcessor processor: A reference to the processor 278 iterating the dataset. 279 :return generator: A generator that yields each item as a dictionary 280 """ 281 path = self.get_results_path() 282 283 # Yield through items one by one 284 if path.suffix.lower() == ".csv": 285 with path.open("rb") as infile: 286 wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8") 287 reader = csv.DictReader(wrapped_infile) 288 289 if not self.get_own_processor(): 290 # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not 291 first_item = next(reader) 292 if first_item is None or any([True for key in first_item if type(key) is not str]): 293 raise NotImplementedError(f"Cannot iterate through CSV file (deprecated processor {self.type})") 294 yield first_item 295 296 for item in reader: 297 if hasattr(processor, "interrupted") and processor.interrupted: 298 raise ProcessorInterruptedException("Processor interrupted while iterating through CSV file") 299 300 yield item 301 302 elif path.suffix.lower() == ".ndjson": 303 # In NDJSON format each line in the file is a self-contained JSON 304 with path.open(encoding="utf-8") as infile: 305 for line in infile: 306 if hasattr(processor, "interrupted") and processor.interrupted: 307 raise ProcessorInterruptedException("Processor interrupted while iterating through NDJSON file") 308 309 yield json.loads(line) 310 311 else: 312 raise NotImplementedError("Cannot iterate through %s file" % path.suffix) 313 314 def iterate_items(self, processor=None, warn_unmappable=True, map_missing="default"): 315 """ 316 Generate mapped dataset items 317 318 Wrapper for _iterate_items that returns a DatasetItem, which can be 319 accessed as a dict returning the original item or (if a mapper is 320 available) the mapped item. Mapped or original versions of the item can 321 also be accessed via the `original` and `mapped_object` properties of 322 the DatasetItem. 323 324 Processors can define a method called `map_item` that can be used to map 325 an item from the dataset file before it is processed any further. This is 326 slower than storing the data file in the right format to begin with but 327 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 328 are nested objects when retrieved from the twitter API that are easier 329 to store as a JSON file than as a flat CSV file, and it would be a shame 330 to throw away that data. 331 332 Note the two parameters warn_unmappable and map_missing. Items can be 333 unmappable in that their structure is too different to coerce into a 334 neat dictionary of the structure the data source expects. This makes it 335 'unmappable' and warn_unmappable determines what happens in this case. 336 It can also be of the right structure, but with some fields missing or 337 incomplete. map_missing determines what happens in that case. The 338 latter is for example possible when importing data via Zeeschuimer, 339 which produces unstably-structured data captured from social media 340 sites. 341 342 :param BasicProcessor processor: A reference to the processor 343 iterating the dataset. 344 :param bool warn_unmappable: If an item is not mappable, skip the item 345 and log a warning 346 :param map_missing: Indicates what to do with mapped items for which 347 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 348 - 'default': fill missing fields with the default passed by map_item 349 - 'abort': raise a MappedItemIncompleteException if a field is missing 350 - a callback: replace missing field with the return value of the 351 callback. The MappedItem object is passed to the callback as the 352 first argument and the name of the missing field as the second. 353 - a dictionary with a key for each possible missing field: replace missing 354 field with a strategy for that field ('default', 'abort', or a callback) 355 356 :return generator: A generator that yields DatasetItems 357 """ 358 unmapped_items = False 359 # Collect item_mapper for use with filter 360 item_mapper = False 361 own_processor = self.get_own_processor() 362 if own_processor and own_processor.map_item_method_available(dataset=self): 363 item_mapper = True 364 365 # missing field strategy can be for all fields at once, or per field 366 # if it is per field, it is a dictionary with field names and their strategy 367 # if it is for all fields, it is may be a callback, 'abort', or 'default' 368 default_strategy = "default" 369 if type(map_missing) is not dict: 370 default_strategy = map_missing 371 map_missing = {} 372 373 # Loop through items 374 for i, item in enumerate(self._iterate_items(processor)): 375 # Save original to yield 376 original_item = item.copy() 377 378 # Map item 379 if item_mapper: 380 try: 381 mapped_item = own_processor.get_mapped_item(item) 382 except MapItemException as e: 383 if warn_unmappable: 384 self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False) 385 unmapped_items = True 386 continue 387 388 # check if fields have been marked as 'missing' in the 389 # underlying data, and treat according to the chosen strategy 390 if mapped_item.get_missing_fields(): 391 for missing_field in mapped_item.get_missing_fields(): 392 strategy = map_missing.get(missing_field, default_strategy) 393 394 if callable(strategy): 395 # delegate handling to a callback 396 mapped_item.data[missing_field] = strategy(mapped_item.data, missing_field) 397 elif strategy == "abort": 398 # raise an exception to be handled at the processor level 399 raise MappedItemIncompleteException(f"Cannot process item, field {missing_field} missing in source data.") 400 elif strategy == "default": 401 # use whatever was passed to the object constructor 402 mapped_item.data[missing_field] = mapped_item.data[missing_field].value 403 else: 404 raise ValueError("map_missing must be 'abort', 'default', or a callback.") 405 406 else: 407 mapped_item = original_item 408 409 # yield a DatasetItem, which is a dict with some special properties 410 yield DatasetItem(mapper=item_mapper, original=original_item, mapped_object=mapped_item, **(mapped_item.get_item_data() if type(mapped_item) is MappedItem else mapped_item)) 411 412 def get_staging_area(self): 413 """ 414 Get path to a temporary folder in which files can be stored before 415 finishing 416 417 This folder must be created before use, but is guaranteed to not exist 418 yet. The folder may be used as a staging area for the dataset data 419 while it is being processed. 420 421 :return Path: Path to folder 422 """ 423 results_file = self.get_results_path() 424 425 results_dir_base = results_file.parent 426 results_dir = results_file.name.replace(".", "") + "-staging" 427 results_path = results_dir_base.joinpath(results_dir) 428 index = 1 429 while results_path.exists(): 430 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 431 index += 1 432 433 # create temporary folder 434 results_path.mkdir() 435 436 # Storing the staging area with the dataset so that it can be removed later 437 self.staging_areas.append(results_path) 438 439 return results_path 440 441 def remove_staging_areas(self): 442 """ 443 Remove any staging areas that were created and all files contained in them. 444 """ 445 # Remove DataSet staging areas 446 if self.staging_areas: 447 for staging_area in self.staging_areas: 448 if staging_area.is_dir(): 449 shutil.rmtree(staging_area) 450 451 def finish(self, num_rows=0): 452 """ 453 Declare the dataset finished 454 """ 455 if self.data["is_finished"]: 456 raise RuntimeError("Cannot finish a finished dataset again") 457 458 self.db.update("datasets", where={"key": self.data["key"]}, 459 data={"is_finished": True, "num_rows": num_rows, "progress": 1.0, "timestamp_finished": int(time.time())}) 460 self.data["is_finished"] = True 461 self.data["num_rows"] = num_rows 462 463 def copy(self, shallow=True): 464 """ 465 Copies the dataset, making a new version with a unique key 466 467 468 :param bool shallow: Shallow copy: does not copy the result file, but 469 instead refers to the same file as the original dataset did 470 :return Dataset: Copied dataset 471 """ 472 parameters = self.parameters.copy() 473 474 # a key is partially based on the parameters. so by setting these extra 475 # attributes, we also ensure a unique key will be generated for the 476 # copy 477 # possibly todo: don't use time for uniqueness (but one shouldn't be 478 # copying a dataset multiple times per microsecond, that's not what 479 # this is for) 480 parameters["copied_from"] = self.key 481 parameters["copied_at"] = time.time() 482 483 copy = DataSet(parameters=parameters, db=self.db, extension=self.result_file.split(".")[-1], type=self.type, modules=self.modules) 484 for field in self.data: 485 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 486 continue 487 488 copy.__setattr__(field, self.data[field]) 489 490 if shallow: 491 # use the same result file 492 copy.result_file = self.result_file 493 else: 494 # copy to new file with new key 495 shutil.copy(self.get_results_path(), copy.get_results_path()) 496 497 if self.is_finished(): 498 copy.finish(self.num_rows) 499 500 # make sure ownership is also copied 501 copy.copy_ownership_from(self) 502 503 return copy 504 505 def delete(self, commit=True, queue=None): 506 """ 507 Delete the dataset, and all its children 508 509 Deletes both database records and result files. Note that manipulating 510 a dataset object after it has been deleted is undefined behaviour. 511 512 :param bool commit: Commit SQL DELETE query? 513 """ 514 # first, recursively delete children 515 children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,)) 516 for child in children: 517 try: 518 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 519 child.delete(commit=commit) 520 except DataSetException: 521 # dataset already deleted - race condition? 522 pass 523 524 # delete any queued jobs for this dataset 525 try: 526 job = Job.get_by_remote_ID(self.key, self.db, self.type) 527 if job.is_claimed: 528 # tell API to stop any jobs running for this dataset 529 # level 2 = cancel job 530 # we're not interested in the result - if the API is available, 531 # it will do its thing, if it's not the backend is probably not 532 # running so the job also doesn't need to be interrupted 533 call_api( 534 "cancel-job", 535 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 536 False 537 ) 538 539 # this deletes the job from the database 540 job.finish(True) 541 542 except JobNotFoundException: 543 pass 544 545 # delete from database 546 self.db.delete("datasets", where={"key": self.key}, commit=commit) 547 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 548 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 549 550 # delete from drive 551 try: 552 if self.get_results_path().exists(): 553 self.get_results_path().unlink() 554 if self.get_results_path().with_suffix(".log").exists(): 555 self.get_results_path().with_suffix(".log").unlink() 556 if self.get_results_folder_path().exists(): 557 shutil.rmtree(self.get_results_folder_path()) 558 559 except FileNotFoundError: 560 # already deleted, apparently 561 pass 562 except PermissionError as e: 563 self.db.log.error(f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}") 564 565 def update_children(self, **kwargs): 566 """ 567 Update an attribute for all child datasets 568 569 Can be used to e.g. change the owner, version, finished status for all 570 datasets in a tree 571 572 :param kwargs: Parameters corresponding to known dataset attributes 573 """ 574 children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,)) 575 for child in children: 576 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 577 for attr, value in kwargs.items(): 578 child.__setattr__(attr, value) 579 580 child.update_children(**kwargs) 581 582 def is_finished(self): 583 """ 584 Check if dataset is finished 585 :return bool: 586 """ 587 return self.data["is_finished"] == True 588 589 def is_rankable(self, multiple_items=True): 590 """ 591 Determine if a dataset is rankable 592 593 Rankable means that it is a CSV file with 'date' and 'value' columns 594 as well as one or more item label columns 595 596 :param bool multiple_items: Consider datasets with multiple items per 597 item (e.g. word_1, word_2, etc)? 598 599 :return bool: Whether the dataset is rankable or not 600 """ 601 if self.get_results_path().suffix != ".csv" or not self.get_results_path().exists(): 602 return False 603 604 column_options = {"date", "value", "item"} 605 if multiple_items: 606 column_options.add("word_1") 607 608 with self.get_results_path().open(encoding="utf-8") as infile: 609 reader = csv.DictReader(infile) 610 try: 611 return len(set(reader.fieldnames) & column_options) >= 3 612 except (TypeError, ValueError): 613 return False 614 615 def is_accessible_by(self, username, role="owner"): 616 """ 617 Check if dataset has given user as owner 618 619 :param str|User username: Username to check for 620 :return bool: 621 """ 622 if type(username) is not str: 623 if hasattr(username, "get_id"): 624 username = username.get_id() 625 else: 626 raise TypeError("User must be a str or User object") 627 628 # 'normal' owners 629 if username in [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]: 630 return True 631 632 # owners that are owner by being part of a tag 633 if username in itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if (role is None or self.owners[f"tag:{tag}"]["role"] == role)]): 634 return True 635 636 return False 637 638 def get_owners_users(self, role="owner"): 639 """ 640 Get list of dataset owners 641 642 This returns a list of *users* that are considered owners. Tags are 643 transparently replaced with the users with that tag. 644 645 :param str|None role: Role to check for. If `None`, all owners are 646 returned regardless of role. 647 648 :return set: De-duplicated owner list 649 """ 650 # 'normal' owners 651 owners = [owner for owner, meta in self.owners.items() if 652 (role is None or meta["role"] == role) and not owner.startswith("tag:")] 653 654 # owners that are owner by being part of a tag 655 owners.extend(itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if 656 role is None or self.owners[f"tag:{tag}"]["role"] == role])) 657 658 # de-duplicate before returning 659 return set(owners) 660 661 def get_owners(self, role="owner"): 662 """ 663 Get list of dataset owners 664 665 This returns a list of all owners, and does not transparently resolve 666 tags (like `get_owners_users` does). 667 668 :param str|None role: Role to check for. If `None`, all owners are 669 returned regardless of role. 670 671 :return set: De-duplicated owner list 672 """ 673 return [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)] 674 675 def add_owner(self, username, role="owner"): 676 """ 677 Set dataset owner 678 679 If the user is already an owner, but with a different role, the role is 680 updated. If the user is already an owner with the same role, nothing happens. 681 682 :param str|User username: Username to set as owner 683 :param str|None role: Role to add user with. 684 """ 685 if type(username) is not str: 686 if hasattr(username, "get_id"): 687 username = username.get_id() 688 else: 689 raise TypeError("User must be a str or User object") 690 691 if username not in self.owners: 692 self.owners[username] = { 693 "name": username, 694 "key": self.key, 695 "role": role 696 } 697 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 698 699 elif username in self.owners and self.owners[username]["role"] != role: 700 self.db.update("datasets_owners", data={"role": role}, where={"name": username, "key": self.key}) 701 self.owners[username]["role"] = role 702 703 if username.startswith("tag:"): 704 # this is a bit more complicated than just adding to the list of 705 # owners, so do a full refresh 706 self.refresh_owners() 707 708 # make sure children's owners remain in sync 709 for child in self.children: 710 child.add_owner(username, role) 711 # not recursive, since we're calling it from recursive code! 712 child.copy_ownership_from(self, recursive=False) 713 714 def remove_owner(self, username): 715 """ 716 Remove dataset owner 717 718 If no owner is set, the dataset is assigned to the anonymous user. 719 If the user is not an owner, nothing happens. 720 721 :param str|User username: Username to set as owner 722 """ 723 if type(username) is not str: 724 if hasattr(username, "get_id"): 725 username = username.get_id() 726 else: 727 raise TypeError("User must be a str or User object") 728 729 if username in self.owners: 730 del self.owners[username] 731 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 732 733 if not self.owners: 734 self.add_owner("anonymous") 735 736 if username in self.tagged_owners: 737 del self.tagged_owners[username] 738 739 # make sure children's owners remain in sync 740 for child in self.children: 741 child.remove_owner(username) 742 # not recursive, since we're calling it from recursive code! 743 child.copy_ownership_from(self, recursive=False) 744 745 def refresh_owners(self): 746 """ 747 Update internal owner cache 748 749 This makes sure that the list of *users* and *tags* which can access the 750 dataset is up to date. 751 """ 752 self.owners = {owner["name"]: owner for owner in self.db.fetchall("SELECT * FROM datasets_owners WHERE key = %s", (self.key,))} 753 754 # determine which users (if any) are owners of the dataset by having a 755 # tag that is listed as an owner 756 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 757 if owner_tags: 758 tagged_owners = self.db.fetchall("SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)) 759 self.tagged_owners = { 760 owner_tag: [user["name"] for user in tagged_owners if owner_tag in user["tags"]] 761 for owner_tag in owner_tags 762 } 763 else: 764 self.tagged_owners = {} 765 766 def copy_ownership_from(self, dataset, recursive=True): 767 """ 768 Copy ownership 769 770 This is useful to e.g. make sure a dataset's ownership stays in sync 771 with its parent 772 773 :param Dataset dataset: Parent to copy from 774 :return: 775 """ 776 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 777 778 for role in ("owner", "viewer"): 779 owners = dataset.get_owners(role=role) 780 for owner in owners: 781 self.db.insert("datasets_owners", data={"key": self.key, "name": owner, "role": role}, commit=False, safe=True) 782 783 self.db.commit() 784 if recursive: 785 for child in self.children: 786 child.copy_ownership_from(self, recursive=recursive) 787 788 def get_parameters(self): 789 """ 790 Get dataset parameters 791 792 The dataset parameters are stored as JSON in the database - parse them 793 and return the resulting object 794 795 :return: Dataset parameters as originally stored 796 """ 797 try: 798 return json.loads(self.data["parameters"]) 799 except json.JSONDecodeError: 800 return {} 801 802 def get_columns(self): 803 """ 804 Returns the dataset columns. 805 806 Useful for processor input forms. Can deal with both CSV and NDJSON 807 files, the latter only if a `map_item` function is available in the 808 processor that generated it. While in other cases one could use the 809 keys of the JSON object, this is not always possible in follow-up code 810 that uses the 'column' names, so for consistency this function acts as 811 if no column can be parsed if no `map_item` function exists. 812 813 :return list: List of dataset columns; empty list if unable to parse 814 """ 815 if not self.get_results_path().exists(): 816 # no file to get columns from 817 return [] 818 819 if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)): 820 items = self.iterate_items(warn_unmappable=False) 821 try: 822 keys = list(items.__next__().keys()) 823 except (StopIteration, NotImplementedError): 824 # No items or otherwise unable to iterate 825 return [] 826 finally: 827 del items 828 return keys 829 else: 830 # Filetype not CSV or an NDJSON with `map_item` 831 return [] 832 833 def get_annotation_fields(self): 834 """ 835 Retrieves the saved annotation fields for this dataset. 836 :return dict: The saved annotation fields. 837 """ 838 839 annotation_fields = self.db.fetchone("SELECT annotation_fields FROM datasets WHERE key = %s;", (self.top_parent().key,)) 840 841 if annotation_fields and annotation_fields.get("annotation_fields"): 842 annotation_fields = json.loads(annotation_fields["annotation_fields"]) 843 else: 844 annotation_fields = {} 845 846 return annotation_fields 847 848 def get_annotations(self): 849 """ 850 Retrieves the annotations for this dataset. 851 return dict: The annotations 852 """ 853 854 annotations = self.db.fetchone("SELECT annotations FROM annotations WHERE key = %s;", (self.top_parent().key,)) 855 856 if annotations and annotations.get("annotations"): 857 return json.loads(annotations["annotations"]) 858 else: 859 return None 860 861 def update_label(self, label): 862 """ 863 Update label for this dataset 864 865 :param str label: New label 866 :return str: The new label, as returned by get_label 867 """ 868 self.parameters["label"] = label 869 870 self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key}) 871 return self.get_label() 872 873 def get_label(self, parameters=None, default="Query"): 874 """ 875 Generate a readable label for the dataset 876 877 :param dict parameters: Parameters of the dataset 878 :param str default: Label to use if it cannot be inferred from the 879 parameters 880 881 :return str: Label 882 """ 883 if not parameters: 884 parameters = self.parameters 885 886 if parameters.get("label"): 887 return parameters["label"] 888 elif parameters.get("body_query") and parameters["body_query"] != "empty": 889 return parameters["body_query"] 890 elif parameters.get("body_match") and parameters["body_match"] != "empty": 891 return parameters["body_match"] 892 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 893 return parameters["subject_query"] 894 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 895 return parameters["subject_match"] 896 elif parameters.get("query"): 897 label = parameters["query"] 898 # Some legacy datasets have lists as query data 899 if isinstance(label, list): 900 label = ", ".join(label) 901 902 label = label if len(label) < 30 else label[:25] + "..." 903 label = label.strip().replace("\n", ", ") 904 return label 905 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 906 return "Flag: %s" % parameters["country_flag"] 907 elif parameters.get("country_name") and parameters["country_name"] != "all": 908 return "Country: %s" % parameters["country_name"] 909 elif parameters.get("filename"): 910 return parameters["filename"] 911 elif parameters.get("board") and "datasource" in parameters: 912 return parameters["datasource"] + "/" + parameters["board"] 913 elif "datasource" in parameters and parameters["datasource"] in self.modules.datasources: 914 return self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 915 else: 916 return default 917 918 def change_datasource(self, datasource): 919 """ 920 Change the datasource type for this dataset 921 922 :param str label: New datasource type 923 :return str: The new datasource type 924 """ 925 926 self.parameters["datasource"] = datasource 927 928 self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key}) 929 return datasource 930 931 def reserve_result_file(self, parameters=None, extension="csv"): 932 """ 933 Generate a unique path to the results file for this dataset 934 935 This generates a file name for the data file of this dataset, and makes sure 936 no file exists or will exist at that location other than the file we 937 expect (i.e. the data for this particular dataset). 938 939 :param str extension: File extension, "csv" by default 940 :param parameters: Dataset parameters 941 :return bool: Whether the file path was successfully reserved 942 """ 943 if self.data["is_finished"]: 944 raise RuntimeError("Cannot reserve results file for a finished dataset") 945 946 # Use 'random' for random post queries 947 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 948 file = 'random-' + str(parameters["random_amount"]) + '-' + self.data["key"] 949 # Use country code for country flag queries 950 elif "country_flag" in parameters and parameters["country_flag"] != 'all': 951 file = 'countryflag-' + str(parameters["country_flag"]) + '-' + self.data["key"] 952 # Use the query string for all other queries 953 else: 954 query_bit = self.data["query"].replace(" ", "-").lower() 955 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 956 query_bit = query_bit[:100] # Crop to avoid OSError 957 file = query_bit + "-" + self.data["key"] 958 file = re.sub(r"[-]+", "-", file) 959 960 path = self.folder.joinpath(file + "." + extension.lower()) 961 index = 1 962 while path.is_file(): 963 path = self.folder.joinpath(file + "-" + str(index) + "." + extension.lower()) 964 index += 1 965 966 file = path.name 967 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 968 data={"result_file": file}) 969 self.data["result_file"] = file 970 return updated > 0 971 972 def get_key(self, query, parameters, parent="", time_offset=0): 973 """ 974 Generate a unique key for this dataset that can be used to identify it 975 976 The key is a hash of a combination of the query string and parameters. 977 You never need to call this, really: it's used internally. 978 979 :param str query: Query string 980 :param parameters: Dataset parameters 981 :param parent: Parent dataset's key (if applicable) 982 :param time_offset: Offset to add to the time component of the dataset 983 key. This can be used to ensure a unique key even if the parameters and 984 timing is otherwise identical to an existing dataset's 985 986 :return str: Dataset key 987 """ 988 # Return a hash based on parameters 989 # we're going to use the hash of the parameters to uniquely identify 990 # the dataset, so make sure it's always in the same order, or we might 991 # end up creating multiple keys for the same dataset if python 992 # decides to return the dict in a different order 993 param_key = collections.OrderedDict() 994 for key in sorted(parameters): 995 param_key[key] = parameters[key] 996 997 # we additionally use the current time as a salt - this should usually 998 # ensure a unique key for the dataset. if for some reason there is a 999 # hash collision 1000 param_key["_salt"] = int(time.time()) + time_offset 1001 1002 parent_key = str(parent) if parent else "" 1003 plain_key = repr(param_key) + str(query) + parent_key 1004 hashed_key = hashlib.md5(plain_key.encode("utf-8")).hexdigest() 1005 1006 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1007 # key exists, generate a new one 1008 return self.get_key(query, parameters, parent, time_offset=random.randint(1,10)) 1009 else: 1010 return hashed_key 1011 1012 def set_key(self, key): 1013 """ 1014 Change dataset key 1015 1016 In principe, keys should never be changed. But there are rare cases 1017 where it is useful to do so, in particular when importing a dataset 1018 from another 4CAT instance; in that case it makes sense to try and 1019 ensure that the key is the same as it was before. This function sets 1020 the dataset key and updates any dataset references to it. 1021 1022 :param str key: Key to set 1023 :return str: Key that was set. If the desired key already exists, the 1024 original key is kept. 1025 """ 1026 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1027 if key_exists or not key: 1028 return self.key 1029 1030 old_key = self.key 1031 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1032 1033 # update references 1034 self.db.update("datasets", data={"key_parent": key}, where={"key_parent": old_key}) 1035 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1036 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1037 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1038 1039 # for good measure 1040 self.db.commit() 1041 self.key = key 1042 1043 return self.key 1044 1045 def get_status(self): 1046 """ 1047 Get Dataset status 1048 1049 :return string: Dataset status 1050 """ 1051 return self.data["status"] 1052 1053 def update_status(self, status, is_final=False): 1054 """ 1055 Update dataset status 1056 1057 The status is a string that may be displayed to a user to keep them 1058 updated and informed about the progress of a dataset. No memory is kept 1059 of earlier dataset statuses; the current status is overwritten when 1060 updated. 1061 1062 Statuses are also written to the dataset log file. 1063 1064 :param string status: Dataset status 1065 :param bool is_final: If this is `True`, subsequent calls to this 1066 method while the object is instantiated will not update the dataset 1067 status. 1068 :return bool: Status update successful? 1069 """ 1070 if self.no_status_updates: 1071 return 1072 1073 # for presets, copy the updated status to the preset(s) this is part of 1074 if self.preset_parent is None: 1075 self.preset_parent = [parent for parent in self.get_genealogy() if parent.type.find("preset-") == 0 and parent.key != self.key][:1] 1076 1077 if self.preset_parent: 1078 for preset_parent in self.preset_parent: 1079 if not preset_parent.is_finished(): 1080 preset_parent.update_status(status) 1081 1082 self.data["status"] = status 1083 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"status": status}) 1084 1085 if is_final: 1086 self.no_status_updates = True 1087 1088 self.log(status) 1089 1090 return updated > 0 1091 1092 def update_progress(self, progress): 1093 """ 1094 Update dataset progress 1095 1096 The progress can be used to indicate to a user how close the dataset 1097 is to completion. 1098 1099 :param float progress: Between 0 and 1. 1100 :return: 1101 """ 1102 progress = min(1, max(0, progress)) # clamp 1103 if type(progress) is int: 1104 progress = float(progress) 1105 1106 self.data["progress"] = progress 1107 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"progress": progress}) 1108 return updated > 0 1109 1110 def get_progress(self): 1111 """ 1112 Get dataset progress 1113 1114 :return float: Progress, between 0 and 1 1115 """ 1116 return self.data["progress"] 1117 1118 def finish_with_error(self, error): 1119 """ 1120 Set error as final status, and finish with 0 results 1121 1122 This is a convenience function to avoid having to repeat 1123 "update_status" and "finish" a lot. 1124 1125 :param str error: Error message for final dataset status. 1126 :return: 1127 """ 1128 self.update_status(error, is_final=True) 1129 self.finish(0) 1130 1131 return None 1132 1133 def update_version(self, version): 1134 """ 1135 Update software version used for this dataset 1136 1137 This can be used to verify the code that was used to process this dataset. 1138 1139 :param string version: Version identifier 1140 :return bool: Update successul? 1141 """ 1142 try: 1143 # this fails if the processor type is unknown 1144 # edge case, but let's not crash... 1145 processor_path = self.modules.processors.get(self.data["type"]).filepath 1146 except AttributeError: 1147 processor_path = "" 1148 1149 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={ 1150 "software_version": version[0], 1151 "software_source": version[1], 1152 "software_file": processor_path 1153 }) 1154 1155 return updated > 0 1156 1157 def delete_parameter(self, parameter, instant=True): 1158 """ 1159 Delete a parameter from the dataset metadata 1160 1161 :param string parameter: Parameter to delete 1162 :param bool instant: Also delete parameters in this instance object? 1163 :return bool: Update successul? 1164 """ 1165 parameters = self.parameters.copy() 1166 if parameter in parameters: 1167 del parameters[parameter] 1168 else: 1169 return False 1170 1171 updated = self.db.update("datasets", where={"key": self.data["key"]}, 1172 data={"parameters": json.dumps(parameters)}) 1173 1174 if instant: 1175 self.parameters = parameters 1176 1177 return updated > 0 1178 1179 def get_version_url(self, file): 1180 """ 1181 Get a versioned github URL for the version this dataset was processed with 1182 1183 :param file: File to link within the repository 1184 :return: URL, or an empty string 1185 """ 1186 if not self.data["software_source"]: 1187 return "" 1188 1189 filepath = self.data.get("software_file", "") 1190 if filepath.startswith("/extensions/"): 1191 # go to root of extension 1192 filepath = "/" + "/".join(filepath.split("/")[3:]) 1193 1194 return self.data["software_source"] + "/blob/" + self.data["software_version"] + filepath 1195 1196 def top_parent(self): 1197 """ 1198 Get root dataset 1199 1200 Traverses the tree of datasets this one is part of until it finds one 1201 with no source_dataset dataset, then returns that dataset. 1202 1203 :return Dataset: Parent dataset 1204 """ 1205 genealogy = self.get_genealogy() 1206 return genealogy[0] 1207 1208 def get_genealogy(self, inclusive=False): 1209 """ 1210 Get genealogy of this dataset 1211 1212 Creates a list of DataSet objects, with the first one being the 1213 'top' dataset, and each subsequent one being a child of the previous 1214 one, ending with the current dataset. 1215 1216 :return list: Dataset genealogy, oldest dataset first 1217 """ 1218 if self.genealogy and not inclusive: 1219 return self.genealogy 1220 1221 key_parent = self.key_parent 1222 genealogy = [] 1223 1224 while key_parent: 1225 try: 1226 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1227 except DataSetException: 1228 break 1229 1230 genealogy.append(parent) 1231 if parent.key_parent: 1232 key_parent = parent.key_parent 1233 else: 1234 break 1235 1236 genealogy.reverse() 1237 genealogy.append(self) 1238 1239 self.genealogy = genealogy 1240 return self.genealogy 1241 1242 def get_all_children(self, recursive=True): 1243 """ 1244 Get all children of this dataset 1245 1246 Results are returned as a non-hierarchical list, i.e. the result does 1247 not reflect the actual dataset hierarchy (but all datasets in the 1248 result will have the original dataset as an ancestor somewhere) 1249 1250 :return list: List of DataSets 1251 """ 1252 children = [DataSet(data=record, db=self.db, modules=self.modules) for record in self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))] 1253 results = children.copy() 1254 if recursive: 1255 for child in children: 1256 results += child.get_all_children(recursive) 1257 1258 return results 1259 1260 def nearest(self, type_filter): 1261 """ 1262 Return nearest dataset that matches the given type 1263 1264 Starting with this dataset, traverse the hierarchy upwards and return 1265 whichever dataset matches the given type. 1266 1267 :param str type_filter: Type filter. Can contain wildcards and is matched 1268 using `fnmatch.fnmatch`. 1269 :return: Earliest matching dataset, or `None` if none match. 1270 """ 1271 genealogy = self.get_genealogy(inclusive=True) 1272 for dataset in reversed(genealogy): 1273 if fnmatch.fnmatch(dataset.type, type_filter): 1274 return dataset 1275 1276 return None 1277 1278 def get_breadcrumbs(self): 1279 """ 1280 Get breadcrumbs navlink for use in permalinks 1281 1282 Returns a string representing this dataset's genealogy that may be used 1283 to uniquely identify it. 1284 1285 :return str: Nav link 1286 """ 1287 if self.genealogy: 1288 return ",".join([dataset.key for dataset in self.genealogy]) 1289 else: 1290 # Collect keys only 1291 key_parent = self.key # Start at the bottom 1292 genealogy = [] 1293 1294 while key_parent: 1295 try: 1296 parent = self.db.fetchone("SELECT key_parent FROM datasets WHERE key = %s", (key_parent,)) 1297 except TypeError: 1298 break 1299 1300 key_parent = parent["key_parent"] 1301 if key_parent: 1302 genealogy.append(key_parent) 1303 else: 1304 break 1305 1306 genealogy.reverse() 1307 genealogy.append(self.key) 1308 return ",".join(genealogy) 1309 1310 def get_compatible_processors(self, user=None): 1311 """ 1312 Get list of processors compatible with this dataset 1313 1314 Checks whether this dataset type is one that is listed as being accepted 1315 by the processor, for each known type: if the processor does not 1316 specify accepted types (via the `is_compatible_with` method), it is 1317 assumed it accepts any top-level datasets 1318 1319 :param str|User|None user: User to get compatibility for. If set, 1320 use the user-specific config settings where available. 1321 1322 :return dict: Compatible processors, `name => class` mapping 1323 """ 1324 processors = self.modules.processors 1325 1326 available = {} 1327 for processor_type, processor in processors.items(): 1328 if processor.is_from_collector(): 1329 continue 1330 1331 own_processor = self.get_own_processor() 1332 if own_processor and own_processor.exclude_followup_processors(processor_type): 1333 continue 1334 1335 # consider a processor compatible if its is_compatible_with 1336 # method returns True *or* if it has no explicit compatibility 1337 # check and this dataset is top-level (i.e. has no parent) 1338 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1339 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, user=user)): 1340 available[processor_type] = processor 1341 1342 return available 1343 1344 def get_place_in_queue(self, update=False): 1345 """ 1346 Determine dataset's position in queue 1347 1348 If the dataset is already finished, the position is -1. Else, the 1349 position is the amount of datasets to be completed before this one will 1350 be processed. A position of 0 would mean that the dataset is currently 1351 being executed, or that the backend is not running. 1352 1353 :param bool update: Update the queue position from database if True, else return cached value 1354 :return int: Queue position 1355 """ 1356 if self.is_finished() or not self.data.get("job"): 1357 self._queue_position = -1 1358 return self._queue_position 1359 elif not update and self._queue_position is not None: 1360 # Use cached value 1361 return self._queue_position 1362 else: 1363 # Collect queue position from database via the job 1364 try: 1365 job = Job.get_by_ID(self.data["job"], self.db) 1366 self._queue_position = job.get_place_in_queue() 1367 except JobNotFoundException: 1368 self._queue_position = -1 1369 1370 return self._queue_position 1371 1372 def get_modules(self): 1373 """ 1374 Get 4CAT modules 1375 1376 Is a function because loading them is not free, and this way we can 1377 cache the result. 1378 1379 :return: 1380 """ 1381 if not self.modules: 1382 self.modules = ModuleCollector() 1383 1384 return self.modules 1385 1386 def get_own_processor(self): 1387 """ 1388 Get the processor class that produced this dataset 1389 1390 :return: Processor class, or `None` if not available. 1391 """ 1392 processor_type = self.parameters.get("type", self.data.get("type")) 1393 1394 return self.modules.processors.get(processor_type) 1395 1396 def get_available_processors(self, user=None, exclude_hidden=False): 1397 """ 1398 Get list of processors that may be run for this dataset 1399 1400 Returns all compatible processors except for those that are already 1401 queued or finished and have no options. Processors that have been 1402 run but have options are included so they may be run again with a 1403 different configuration 1404 1405 :param str|User|None user: User to get compatibility for. If set, 1406 use the user-specific config settings where available. 1407 :param bool exclude_hidden: Exclude processors that should be displayed 1408 in the UI? If `False`, all processors are returned. 1409 1410 :return dict: Available processors, `name => properties` mapping 1411 """ 1412 if self.available_processors: 1413 # Update to reflect exclude_hidden parameter which may be different from last call 1414 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1415 return {processor_type: processor for processor_type, processor in self.available_processors.items() if not exclude_hidden or not processor.is_hidden} 1416 1417 processors = self.get_compatible_processors(user=user) 1418 1419 for analysis in self.children: 1420 if analysis.type not in processors: 1421 continue 1422 1423 if not processors[analysis.type].get_options(): 1424 del processors[analysis.type] 1425 continue 1426 1427 if exclude_hidden and processors[analysis.type].is_hidden: 1428 del processors[analysis.type] 1429 1430 self.available_processors = processors 1431 return processors 1432 1433 def link_job(self, job): 1434 """ 1435 Link this dataset to a job ID 1436 1437 Updates the dataset data to include a reference to the job that will be 1438 executing (or has already executed) this job. 1439 1440 Note that if no job can be found for this dataset, this method silently 1441 fails. 1442 1443 :param Job job: The job that will run this dataset 1444 1445 :todo: If the job column ever gets used, make sure it always contains 1446 a valid value, rather than silently failing this method. 1447 """ 1448 if type(job) != Job: 1449 raise TypeError("link_job requires a Job object as its argument") 1450 1451 if "id" not in job.data: 1452 try: 1453 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1454 except JobNotFoundException: 1455 return 1456 1457 self.db.update("datasets", where={"key": self.key}, data={"job": job.data["id"]}) 1458 1459 def link_parent(self, key_parent): 1460 """ 1461 Set source_dataset key for this dataset 1462 1463 :param key_parent: Parent key. Not checked for validity 1464 """ 1465 self.db.update("datasets", where={"key": self.key}, data={"key_parent": key_parent}) 1466 1467 def get_parent(self): 1468 """ 1469 Get parent dataset 1470 1471 :return DataSet: Parent dataset, or `None` if not applicable 1472 """ 1473 return DataSet(key=self.key_parent, db=self.db, modules=self.modules) if self.key_parent else None 1474 1475 def detach(self): 1476 """ 1477 Makes the datasets standalone, i.e. not having any source_dataset dataset 1478 """ 1479 self.link_parent("") 1480 1481 def is_dataset(self): 1482 """ 1483 Easy way to confirm this is a dataset. 1484 Used for checking processor and dataset compatibility, 1485 which needs to handle both processors and datasets. 1486 """ 1487 return True 1488 1489 def is_top_dataset(self): 1490 """ 1491 Easy way to confirm this is a top dataset. 1492 Used for checking processor and dataset compatibility, 1493 which needs to handle both processors and datasets. 1494 """ 1495 if self.key_parent: 1496 return False 1497 return True 1498 1499 def is_expiring(self, user=None): 1500 """ 1501 Determine if dataset is set to expire 1502 1503 Similar to `is_expired`, but checks if the dataset will be deleted in 1504 the future, not if it should be deleted right now. 1505 1506 :param user: User to use for configuration context. Provide to make 1507 sure configuration overrides for this user are taken into account. 1508 :return bool|int: `False`, or the expiration date as a Unix timestamp. 1509 """ 1510 # has someone opted out of deleting this? 1511 if self.parameters.get("keep"): 1512 return False 1513 1514 # is this dataset explicitly marked as expiring after a certain time? 1515 if self.parameters.get("expires-after"): 1516 return self.parameters.get("expires-after") 1517 1518 # is the data source configured to have its datasets expire? 1519 expiration = config.get("datasources.expiration", {}, user=user) 1520 if not expiration.get(self.parameters.get("datasource")): 1521 return False 1522 1523 # is there a timeout for this data source? 1524 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1525 return self.timestamp + expiration.get(self.parameters.get("datasource")).get("timeout") 1526 1527 return False 1528 1529 def is_expired(self, user=None): 1530 """ 1531 Determine if dataset should be deleted 1532 1533 Datasets can be set to expire, but when they should be deleted depends 1534 on a number of factor. This checks them all. 1535 1536 :param user: User to use for configuration context. Provide to make 1537 sure configuration overrides for this user are taken into account. 1538 :return bool: 1539 """ 1540 # has someone opted out of deleting this? 1541 if not self.is_expiring(): 1542 return False 1543 1544 # is this dataset explicitly marked as expiring after a certain time? 1545 future = time.time() + 3600 # ensure we don't delete datasets with invalid expiration times 1546 if self.parameters.get("expires-after") and convert_to_int(self.parameters["expires-after"], future) < time.time(): 1547 return True 1548 1549 # is the data source configured to have its datasets expire? 1550 expiration = config.get("datasources.expiration", {}, user=user) 1551 if not expiration.get(self.parameters.get("datasource")): 1552 return False 1553 1554 # is the dataset older than the set timeout? 1555 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1556 return self.timestamp + expiration[self.parameters.get("datasource")]["timeout"] < time.time() 1557 1558 return False 1559 1560 def is_from_collector(self): 1561 """ 1562 Check if this dataset was made by a processor that collects data, i.e. 1563 a search or import worker. 1564 1565 :return bool: 1566 """ 1567 return self.type.endswith("-search") or self.type.endswith("-import") 1568 1569 def get_extension(self): 1570 """ 1571 Gets the file extention this dataset produces. 1572 Also checks whether the results file exists. 1573 Used for checking processor and dataset compatibility. 1574 1575 :return str extension: Extension, e.g. `csv` 1576 """ 1577 if self.get_results_path().exists(): 1578 return self.get_results_path().suffix[1:] 1579 1580 return False 1581 1582 def get_media_type(self): 1583 """ 1584 Gets the media type of the dataset file. 1585 1586 :return str: media type, e.g., "text" 1587 """ 1588 own_processor = self.get_own_processor() 1589 if hasattr(self, "media_type"): 1590 # media type can be defined explicitly in the dataset; this is the priority 1591 return self.media_type 1592 elif own_processor is not None: 1593 # or media type can be defined in the processor 1594 # some processors can set different media types for different datasets (e.g., import_media) 1595 if hasattr(own_processor, "media_type"): 1596 return own_processor.media_type 1597 1598 # Default to text 1599 return self.parameters.get("media_type", "text") 1600 1601 def get_metadata(self): 1602 """ 1603 Get dataset metadata 1604 1605 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 1606 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 1607 update its database (and ensure compatibility with the exporting version of 4CAT). 1608 """ 1609 metadata = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,)) 1610 1611 # get 4CAT version (presumably to ensure export is compatible with import) 1612 metadata["current_4CAT_version"] = get_software_version() 1613 return metadata 1614 1615 def get_result_url(self): 1616 """ 1617 Gets the 4CAT frontend URL of a dataset file. 1618 1619 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 1620 SERVER_HTTPS) plus hardcoded '/result/'. 1621 TODO: create more dynamic method of obtaining url. 1622 """ 1623 filename = self.get_results_path().name 1624 url_to_file = ('https://' if config.get("flask.https") else 'http://') + \ 1625 config.get("flask.server_name") + '/result/' + filename 1626 return url_to_file 1627 1628 def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True): 1629 """ 1630 Log an item that is unable to be mapped and warn administrators. 1631 1632 :param int item_count: Item index 1633 :param Processor processor: Processor calling function8 1634 """ 1635 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 1636 1637 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 1638 closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self 1639 # Log error to dataset log 1640 closest_dataset.log(dataset_error_message) 1641 1642 if warn_admins: 1643 if processor is not None: 1644 processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.") 1645 elif hasattr(self.db, "log"): 1646 # borrow the database's log handler 1647 self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.") 1648 else: 1649 # No other log available 1650 raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn") 1651 1652 def __getattr__(self, attr): 1653 """ 1654 Getter so we don't have to use .data all the time 1655 1656 :param attr: Data key to get 1657 :return: Value 1658 """ 1659 1660 if attr in dir(self): 1661 # an explicitly defined attribute should always be called in favour 1662 # of this passthrough 1663 attribute = getattr(self, attr) 1664 return attribute 1665 elif attr in self.data: 1666 return self.data[attr] 1667 else: 1668 raise AttributeError("DataSet instance has no attribute %s" % attr) 1669 1670 def __setattr__(self, attr, value): 1671 """ 1672 Setter so we can flexibly update the database 1673 1674 Also updates internal data stores (.data etc). If the attribute is 1675 unknown, it is stored within the 'parameters' attribute. 1676 1677 :param str attr: Attribute to update 1678 :param value: New value 1679 """ 1680 1681 # don't override behaviour for *actual* class attributes 1682 if attr in dir(self): 1683 super().__setattr__(attr, value) 1684 return 1685 1686 if attr not in self.data: 1687 self.parameters[attr] = value 1688 attr = "parameters" 1689 value = self.parameters 1690 1691 if attr == "parameters": 1692 value = json.dumps(value) 1693 1694 self.db.update("datasets", where={"key": self.key}, data={attr: value}) 1695 1696 self.data[attr] = value 1697 1698 if attr == "parameters": 1699 self.parameters = json.loads(value)
Provide interface to safely register and run operations on a dataset
A dataset is a collection of:
- A unique identifier
- A set of parameters that demarcate the data contained within
- The data
The data is usually stored in a file on the disk; the parameters are stored in a database. The handling of the data, et cetera, is done by other workers; this class defines method to create and manipulate the dataset's properties.
62 def __init__(self, parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None, 63 type=None, is_private=True, owner="anonymous", modules=None): 64 """ 65 Create new dataset object 66 67 If the dataset is not in the database yet, it is added. 68 69 :param dict parameters: Only when creating a new dataset. Dataset 70 parameters, free-form dictionary. 71 :param str key: Dataset key. If given, dataset with this key is loaded. 72 :param int job: Job ID. If given, dataset corresponding to job is 73 loaded. 74 :param dict data: Dataset data, corresponding to a row in the datasets 75 database table. If not given, retrieved from database depending on key. 76 :param db: Database connection 77 :param str parent: Only when creating a new dataset. Parent dataset 78 key to which the one being created is a child. 79 :param str extension: Only when creating a new dataset. Extension of 80 dataset result file. 81 :param str type: Only when creating a new dataset. Type of the dataset, 82 corresponding to the type property of a processor class. 83 :param bool is_private: Only when creating a new dataset. Whether the 84 dataset is private or public. 85 :param str owner: Only when creating a new dataset. The user name of 86 the dataset's creator. 87 :param modules: Module cache. If not given, will be loaded when needed 88 (expensive). Used to figure out what processors are compatible with 89 this dataset. 90 """ 91 self.db = db 92 self.folder = config.get('PATH_ROOT').joinpath(config.get('PATH_DATA')) 93 # Ensure mutable attributes are set in __init__ as they are unique to each DataSet 94 self.data = {} 95 self.parameters = {} 96 self.children = [] 97 self.available_processors = {} 98 self.genealogy = [] 99 self.staging_areas = [] 100 self.modules = modules 101 102 if key is not None: 103 self.key = key 104 current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,)) 105 if not current: 106 raise DataSetNotFoundException("DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given" % key) 107 108 elif job is not None: 109 current = self.db.fetchone("SELECT * FROM datasets WHERE parameters::json->>'job' = %s", (job,)) 110 if not current: 111 raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument") 112 113 self.key = current["key"] 114 elif data is not None: 115 current = data 116 if "query" not in data or "key" not in data or "parameters" not in data or "key_parent" not in data: 117 raise DataSetException("DataSet() requires a complete dataset record for its 'data' argument") 118 119 self.key = current["key"] 120 else: 121 if parameters is None: 122 raise DataSetException("DataSet() requires either 'key', or 'parameters' to be given") 123 124 if not type: 125 raise DataSetException("Datasets must have their type set explicitly") 126 127 query = self.get_label(parameters, default=type) 128 self.key = self.get_key(query, parameters, parent) 129 current = self.db.fetchone("SELECT * FROM datasets WHERE key = %s AND query = %s", (self.key, query)) 130 131 if current: 132 self.data = current 133 self.parameters = json.loads(self.data["parameters"]) 134 self.is_new = False 135 else: 136 self.data = {"type": type} # get_own_processor needs this 137 own_processor = self.get_own_processor() 138 version = get_software_commit(own_processor) 139 self.data = { 140 "key": self.key, 141 "query": self.get_label(parameters, default=type), 142 "parameters": json.dumps(parameters), 143 "result_file": "", 144 "creator": owner, 145 "status": "", 146 "type": type, 147 "timestamp": int(time.time()), 148 "is_finished": False, 149 "is_private": is_private, 150 "software_version": version[0], 151 "software_source": version[1], 152 "software_file": "", 153 "num_rows": 0, 154 "progress": 0.0, 155 "key_parent": parent 156 } 157 self.parameters = parameters 158 159 self.db.insert("datasets", data=self.data) 160 self.refresh_owners() 161 self.add_owner(owner) 162 163 # Find desired extension from processor if not explicitly set 164 if extension is None: 165 if own_processor: 166 extension = own_processor.get_extension(parent_dataset=DataSet(key=parent, db=db, modules=self.modules) if parent else None) 167 # Still no extension, default to 'csv' 168 if not extension: 169 extension = "csv" 170 171 # Reserve filename and update data['result_file'] 172 self.reserve_result_file(parameters, extension) 173 174 # retrieve analyses and processors that may be run for this dataset 175 analyses = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC", (self.key,)) 176 self.children = sorted([DataSet(data=analysis, db=self.db, modules=self.modules) for analysis in analyses], 177 key=lambda dataset: dataset.is_finished(), reverse=True) 178 179 self.refresh_owners()
Create new dataset object
If the dataset is not in the database yet, it is added.
Parameters
- dict parameters: Only when creating a new dataset. Dataset parameters, free-form dictionary.
- str key: Dataset key. If given, dataset with this key is loaded.
- int job: Job ID. If given, dataset corresponding to job is loaded.
- dict data: Dataset data, corresponding to a row in the datasets database table. If not given, retrieved from database depending on key.
- db: Database connection
- str parent: Only when creating a new dataset. Parent dataset key to which the one being created is a child.
- str extension: Only when creating a new dataset. Extension of dataset result file.
- str type: Only when creating a new dataset. Type of the dataset, corresponding to the type property of a processor class.
- bool is_private: Only when creating a new dataset. Whether the dataset is private or public.
- str owner: Only when creating a new dataset. The user name of the dataset's creator.
- modules: Module cache. If not given, will be loaded when needed (expensive). Used to figure out what processors are compatible with this dataset.
181 def check_dataset_finished(self): 182 """ 183 Checks if dataset is finished. Returns path to results file is not empty, 184 or 'empty_file' when there were not matches. 185 186 Only returns a path if the dataset is complete. In other words, if this 187 method returns a path, a file with the complete results for this dataset 188 will exist at that location. 189 190 :return: A path to the results file, 'empty_file', or `None` 191 """ 192 if self.data["is_finished"] and self.data["num_rows"] > 0: 193 return self.folder.joinpath(self.data["result_file"]) 194 elif self.data["is_finished"] and self.data["num_rows"] == 0: 195 return 'empty' 196 else: 197 return None
Checks if dataset is finished. Returns path to results file is not empty, or 'empty_file' when there were not matches.
Only returns a path if the dataset is complete. In other words, if this method returns a path, a file with the complete results for this dataset will exist at that location.
Returns
A path to the results file, 'empty_file', or
None
199 def get_results_path(self): 200 """ 201 Get path to results file 202 203 Always returns a path, that will at some point contain the dataset 204 data, but may not do so yet. Use this to get the location to write 205 generated results to. 206 207 :return Path: A path to the results file 208 """ 209 return self.folder.joinpath(self.data["result_file"])
Get path to results file
Always returns a path, that will at some point contain the dataset data, but may not do so yet. Use this to get the location to write generated results to.
Returns
A path to the results file
211 def get_results_folder_path(self): 212 """ 213 Get path to folder containing accompanying results 214 215 Returns a path that may not yet be created 216 217 :return Path: A path to the results file 218 """ 219 return self.folder.joinpath("folder_" + self.key)
Get path to folder containing accompanying results
Returns a path that may not yet be created
Returns
A path to the results file
221 def get_log_path(self): 222 """ 223 Get path to dataset log file 224 225 Each dataset has a single log file that documents its creation. This 226 method returns the path to that file. It is identical to the path of 227 the dataset result file, with 'log' as its extension instead. 228 229 :return Path: A path to the log file 230 """ 231 return self.get_results_path().with_suffix(".log")
Get path to dataset log file
Each dataset has a single log file that documents its creation. This method returns the path to that file. It is identical to the path of the dataset result file, with 'log' as its extension instead.
Returns
A path to the log file
233 def clear_log(self): 234 """ 235 Clears the dataset log file 236 237 If the log file does not exist, it is created empty. The log file will 238 have the same file name as the dataset result file, with the 'log' 239 extension. 240 """ 241 log_path = self.get_log_path() 242 with log_path.open("w") as outfile: 243 pass
Clears the dataset log file
If the log file does not exist, it is created empty. The log file will have the same file name as the dataset result file, with the 'log' extension.
245 def log(self, log): 246 """ 247 Write log message to file 248 249 Writes the log message to the log file on a new line, including a 250 timestamp at the start of the line. Note that this assumes the log file 251 already exists - it should have been created/cleared with clear_log() 252 prior to calling this. 253 254 :param str log: Log message to write 255 """ 256 log_path = self.get_log_path() 257 with log_path.open("a", encoding="utf-8") as outfile: 258 outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
Write log message to file
Writes the log message to the log file on a new line, including a timestamp at the start of the line. Note that this assumes the log file already exists - it should have been created/cleared with clear_log() prior to calling this.
Parameters
- str log: Log message to write
314 def iterate_items(self, processor=None, warn_unmappable=True, map_missing="default"): 315 """ 316 Generate mapped dataset items 317 318 Wrapper for _iterate_items that returns a DatasetItem, which can be 319 accessed as a dict returning the original item or (if a mapper is 320 available) the mapped item. Mapped or original versions of the item can 321 also be accessed via the `original` and `mapped_object` properties of 322 the DatasetItem. 323 324 Processors can define a method called `map_item` that can be used to map 325 an item from the dataset file before it is processed any further. This is 326 slower than storing the data file in the right format to begin with but 327 not all data sources allow for easy 'flat' mapping of items, e.g. tweets 328 are nested objects when retrieved from the twitter API that are easier 329 to store as a JSON file than as a flat CSV file, and it would be a shame 330 to throw away that data. 331 332 Note the two parameters warn_unmappable and map_missing. Items can be 333 unmappable in that their structure is too different to coerce into a 334 neat dictionary of the structure the data source expects. This makes it 335 'unmappable' and warn_unmappable determines what happens in this case. 336 It can also be of the right structure, but with some fields missing or 337 incomplete. map_missing determines what happens in that case. The 338 latter is for example possible when importing data via Zeeschuimer, 339 which produces unstably-structured data captured from social media 340 sites. 341 342 :param BasicProcessor processor: A reference to the processor 343 iterating the dataset. 344 :param bool warn_unmappable: If an item is not mappable, skip the item 345 and log a warning 346 :param map_missing: Indicates what to do with mapped items for which 347 some fields could not be mapped. Defaults to 'empty_str'. Must be one of: 348 - 'default': fill missing fields with the default passed by map_item 349 - 'abort': raise a MappedItemIncompleteException if a field is missing 350 - a callback: replace missing field with the return value of the 351 callback. The MappedItem object is passed to the callback as the 352 first argument and the name of the missing field as the second. 353 - a dictionary with a key for each possible missing field: replace missing 354 field with a strategy for that field ('default', 'abort', or a callback) 355 356 :return generator: A generator that yields DatasetItems 357 """ 358 unmapped_items = False 359 # Collect item_mapper for use with filter 360 item_mapper = False 361 own_processor = self.get_own_processor() 362 if own_processor and own_processor.map_item_method_available(dataset=self): 363 item_mapper = True 364 365 # missing field strategy can be for all fields at once, or per field 366 # if it is per field, it is a dictionary with field names and their strategy 367 # if it is for all fields, it is may be a callback, 'abort', or 'default' 368 default_strategy = "default" 369 if type(map_missing) is not dict: 370 default_strategy = map_missing 371 map_missing = {} 372 373 # Loop through items 374 for i, item in enumerate(self._iterate_items(processor)): 375 # Save original to yield 376 original_item = item.copy() 377 378 # Map item 379 if item_mapper: 380 try: 381 mapped_item = own_processor.get_mapped_item(item) 382 except MapItemException as e: 383 if warn_unmappable: 384 self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False) 385 unmapped_items = True 386 continue 387 388 # check if fields have been marked as 'missing' in the 389 # underlying data, and treat according to the chosen strategy 390 if mapped_item.get_missing_fields(): 391 for missing_field in mapped_item.get_missing_fields(): 392 strategy = map_missing.get(missing_field, default_strategy) 393 394 if callable(strategy): 395 # delegate handling to a callback 396 mapped_item.data[missing_field] = strategy(mapped_item.data, missing_field) 397 elif strategy == "abort": 398 # raise an exception to be handled at the processor level 399 raise MappedItemIncompleteException(f"Cannot process item, field {missing_field} missing in source data.") 400 elif strategy == "default": 401 # use whatever was passed to the object constructor 402 mapped_item.data[missing_field] = mapped_item.data[missing_field].value 403 else: 404 raise ValueError("map_missing must be 'abort', 'default', or a callback.") 405 406 else: 407 mapped_item = original_item 408 409 # yield a DatasetItem, which is a dict with some special properties 410 yield DatasetItem(mapper=item_mapper, original=original_item, mapped_object=mapped_item, **(mapped_item.get_item_data() if type(mapped_item) is MappedItem else mapped_item))
Generate mapped dataset items
Wrapper for _iterate_items that returns a DatasetItem, which can be
accessed as a dict returning the original item or (if a mapper is
available) the mapped item. Mapped or original versions of the item can
also be accessed via the original
and mapped_object
properties of
the DatasetItem.
Processors can define a method called map_item
that can be used to map
an item from the dataset file before it is processed any further. This is
slower than storing the data file in the right format to begin with but
not all data sources allow for easy 'flat' mapping of items, e.g. tweets
are nested objects when retrieved from the twitter API that are easier
to store as a JSON file than as a flat CSV file, and it would be a shame
to throw away that data.
Note the two parameters warn_unmappable and map_missing. Items can be unmappable in that their structure is too different to coerce into a neat dictionary of the structure the data source expects. This makes it 'unmappable' and warn_unmappable determines what happens in this case. It can also be of the right structure, but with some fields missing or incomplete. map_missing determines what happens in that case. The latter is for example possible when importing data via Zeeschuimer, which produces unstably-structured data captured from social media sites.
Parameters
- BasicProcessor processor: A reference to the processor iterating the dataset.
- bool warn_unmappable: If an item is not mappable, skip the item and log a warning
- map_missing: Indicates what to do with mapped items for which
some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
- 'default': fill missing fields with the default passed by map_item
- 'abort': raise a MappedItemIncompleteException if a field is missing
- a callback: replace missing field with the return value of the callback. The MappedItem object is passed to the callback as the first argument and the name of the missing field as the second.
- a dictionary with a key for each possible missing field: replace missing field with a strategy for that field ('default', 'abort', or a callback)
Returns
A generator that yields DatasetItems
412 def get_staging_area(self): 413 """ 414 Get path to a temporary folder in which files can be stored before 415 finishing 416 417 This folder must be created before use, but is guaranteed to not exist 418 yet. The folder may be used as a staging area for the dataset data 419 while it is being processed. 420 421 :return Path: Path to folder 422 """ 423 results_file = self.get_results_path() 424 425 results_dir_base = results_file.parent 426 results_dir = results_file.name.replace(".", "") + "-staging" 427 results_path = results_dir_base.joinpath(results_dir) 428 index = 1 429 while results_path.exists(): 430 results_path = results_dir_base.joinpath(results_dir + "-" + str(index)) 431 index += 1 432 433 # create temporary folder 434 results_path.mkdir() 435 436 # Storing the staging area with the dataset so that it can be removed later 437 self.staging_areas.append(results_path) 438 439 return results_path
Get path to a temporary folder in which files can be stored before finishing
This folder must be created before use, but is guaranteed to not exist yet. The folder may be used as a staging area for the dataset data while it is being processed.
Returns
Path to folder
441 def remove_staging_areas(self): 442 """ 443 Remove any staging areas that were created and all files contained in them. 444 """ 445 # Remove DataSet staging areas 446 if self.staging_areas: 447 for staging_area in self.staging_areas: 448 if staging_area.is_dir(): 449 shutil.rmtree(staging_area)
Remove any staging areas that were created and all files contained in them.
451 def finish(self, num_rows=0): 452 """ 453 Declare the dataset finished 454 """ 455 if self.data["is_finished"]: 456 raise RuntimeError("Cannot finish a finished dataset again") 457 458 self.db.update("datasets", where={"key": self.data["key"]}, 459 data={"is_finished": True, "num_rows": num_rows, "progress": 1.0, "timestamp_finished": int(time.time())}) 460 self.data["is_finished"] = True 461 self.data["num_rows"] = num_rows
Declare the dataset finished
463 def copy(self, shallow=True): 464 """ 465 Copies the dataset, making a new version with a unique key 466 467 468 :param bool shallow: Shallow copy: does not copy the result file, but 469 instead refers to the same file as the original dataset did 470 :return Dataset: Copied dataset 471 """ 472 parameters = self.parameters.copy() 473 474 # a key is partially based on the parameters. so by setting these extra 475 # attributes, we also ensure a unique key will be generated for the 476 # copy 477 # possibly todo: don't use time for uniqueness (but one shouldn't be 478 # copying a dataset multiple times per microsecond, that's not what 479 # this is for) 480 parameters["copied_from"] = self.key 481 parameters["copied_at"] = time.time() 482 483 copy = DataSet(parameters=parameters, db=self.db, extension=self.result_file.split(".")[-1], type=self.type, modules=self.modules) 484 for field in self.data: 485 if field in ("id", "key", "timestamp", "job", "parameters", "result_file"): 486 continue 487 488 copy.__setattr__(field, self.data[field]) 489 490 if shallow: 491 # use the same result file 492 copy.result_file = self.result_file 493 else: 494 # copy to new file with new key 495 shutil.copy(self.get_results_path(), copy.get_results_path()) 496 497 if self.is_finished(): 498 copy.finish(self.num_rows) 499 500 # make sure ownership is also copied 501 copy.copy_ownership_from(self) 502 503 return copy
Copies the dataset, making a new version with a unique key
Parameters
- bool shallow: Shallow copy: does not copy the result file, but instead refers to the same file as the original dataset did
Returns
Copied dataset
505 def delete(self, commit=True, queue=None): 506 """ 507 Delete the dataset, and all its children 508 509 Deletes both database records and result files. Note that manipulating 510 a dataset object after it has been deleted is undefined behaviour. 511 512 :param bool commit: Commit SQL DELETE query? 513 """ 514 # first, recursively delete children 515 children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,)) 516 for child in children: 517 try: 518 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 519 child.delete(commit=commit) 520 except DataSetException: 521 # dataset already deleted - race condition? 522 pass 523 524 # delete any queued jobs for this dataset 525 try: 526 job = Job.get_by_remote_ID(self.key, self.db, self.type) 527 if job.is_claimed: 528 # tell API to stop any jobs running for this dataset 529 # level 2 = cancel job 530 # we're not interested in the result - if the API is available, 531 # it will do its thing, if it's not the backend is probably not 532 # running so the job also doesn't need to be interrupted 533 call_api( 534 "cancel-job", 535 {"remote_id": self.key, "jobtype": self.type, "level": 2}, 536 False 537 ) 538 539 # this deletes the job from the database 540 job.finish(True) 541 542 except JobNotFoundException: 543 pass 544 545 # delete from database 546 self.db.delete("datasets", where={"key": self.key}, commit=commit) 547 self.db.delete("datasets_owners", where={"key": self.key}, commit=commit) 548 self.db.delete("users_favourites", where={"key": self.key}, commit=commit) 549 550 # delete from drive 551 try: 552 if self.get_results_path().exists(): 553 self.get_results_path().unlink() 554 if self.get_results_path().with_suffix(".log").exists(): 555 self.get_results_path().with_suffix(".log").unlink() 556 if self.get_results_folder_path().exists(): 557 shutil.rmtree(self.get_results_folder_path()) 558 559 except FileNotFoundError: 560 # already deleted, apparently 561 pass 562 except PermissionError as e: 563 self.db.log.error(f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}")
Delete the dataset, and all its children
Deletes both database records and result files. Note that manipulating a dataset object after it has been deleted is undefined behaviour.
Parameters
- bool commit: Commit SQL DELETE query?
565 def update_children(self, **kwargs): 566 """ 567 Update an attribute for all child datasets 568 569 Can be used to e.g. change the owner, version, finished status for all 570 datasets in a tree 571 572 :param kwargs: Parameters corresponding to known dataset attributes 573 """ 574 children = self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,)) 575 for child in children: 576 child = DataSet(key=child["key"], db=self.db, modules=self.modules) 577 for attr, value in kwargs.items(): 578 child.__setattr__(attr, value) 579 580 child.update_children(**kwargs)
Update an attribute for all child datasets
Can be used to e.g. change the owner, version, finished status for all datasets in a tree
Parameters
- kwargs: Parameters corresponding to known dataset attributes
582 def is_finished(self): 583 """ 584 Check if dataset is finished 585 :return bool: 586 """ 587 return self.data["is_finished"] == True
Check if dataset is finished
Returns
589 def is_rankable(self, multiple_items=True): 590 """ 591 Determine if a dataset is rankable 592 593 Rankable means that it is a CSV file with 'date' and 'value' columns 594 as well as one or more item label columns 595 596 :param bool multiple_items: Consider datasets with multiple items per 597 item (e.g. word_1, word_2, etc)? 598 599 :return bool: Whether the dataset is rankable or not 600 """ 601 if self.get_results_path().suffix != ".csv" or not self.get_results_path().exists(): 602 return False 603 604 column_options = {"date", "value", "item"} 605 if multiple_items: 606 column_options.add("word_1") 607 608 with self.get_results_path().open(encoding="utf-8") as infile: 609 reader = csv.DictReader(infile) 610 try: 611 return len(set(reader.fieldnames) & column_options) >= 3 612 except (TypeError, ValueError): 613 return False
Determine if a dataset is rankable
Rankable means that it is a CSV file with 'date' and 'value' columns as well as one or more item label columns
Parameters
- bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)?
Returns
Whether the dataset is rankable or not
615 def is_accessible_by(self, username, role="owner"): 616 """ 617 Check if dataset has given user as owner 618 619 :param str|User username: Username to check for 620 :return bool: 621 """ 622 if type(username) is not str: 623 if hasattr(username, "get_id"): 624 username = username.get_id() 625 else: 626 raise TypeError("User must be a str or User object") 627 628 # 'normal' owners 629 if username in [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]: 630 return True 631 632 # owners that are owner by being part of a tag 633 if username in itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if (role is None or self.owners[f"tag:{tag}"]["role"] == role)]): 634 return True 635 636 return False
Check if dataset has given user as owner
Parameters
- str|User username: Username to check for
Returns
638 def get_owners_users(self, role="owner"): 639 """ 640 Get list of dataset owners 641 642 This returns a list of *users* that are considered owners. Tags are 643 transparently replaced with the users with that tag. 644 645 :param str|None role: Role to check for. If `None`, all owners are 646 returned regardless of role. 647 648 :return set: De-duplicated owner list 649 """ 650 # 'normal' owners 651 owners = [owner for owner, meta in self.owners.items() if 652 (role is None or meta["role"] == role) and not owner.startswith("tag:")] 653 654 # owners that are owner by being part of a tag 655 owners.extend(itertools.chain(*[tagged_owners for tag, tagged_owners in self.tagged_owners.items() if 656 role is None or self.owners[f"tag:{tag}"]["role"] == role])) 657 658 # de-duplicate before returning 659 return set(owners)
Get list of dataset owners
This returns a list of users that are considered owners. Tags are transparently replaced with the users with that tag.
Parameters
- str|None role: Role to check for. If
None
, all owners are returned regardless of role.
Returns
De-duplicated owner list
661 def get_owners(self, role="owner"): 662 """ 663 Get list of dataset owners 664 665 This returns a list of all owners, and does not transparently resolve 666 tags (like `get_owners_users` does). 667 668 :param str|None role: Role to check for. If `None`, all owners are 669 returned regardless of role. 670 671 :return set: De-duplicated owner list 672 """ 673 return [owner for owner, meta in self.owners.items() if (role is None or meta["role"] == role)]
Get list of dataset owners
This returns a list of all owners, and does not transparently resolve
tags (like get_owners_users
does).
Parameters
- str|None role: Role to check for. If
None
, all owners are returned regardless of role.
Returns
De-duplicated owner list
675 def add_owner(self, username, role="owner"): 676 """ 677 Set dataset owner 678 679 If the user is already an owner, but with a different role, the role is 680 updated. If the user is already an owner with the same role, nothing happens. 681 682 :param str|User username: Username to set as owner 683 :param str|None role: Role to add user with. 684 """ 685 if type(username) is not str: 686 if hasattr(username, "get_id"): 687 username = username.get_id() 688 else: 689 raise TypeError("User must be a str or User object") 690 691 if username not in self.owners: 692 self.owners[username] = { 693 "name": username, 694 "key": self.key, 695 "role": role 696 } 697 self.db.insert("datasets_owners", data=self.owners[username], safe=True) 698 699 elif username in self.owners and self.owners[username]["role"] != role: 700 self.db.update("datasets_owners", data={"role": role}, where={"name": username, "key": self.key}) 701 self.owners[username]["role"] = role 702 703 if username.startswith("tag:"): 704 # this is a bit more complicated than just adding to the list of 705 # owners, so do a full refresh 706 self.refresh_owners() 707 708 # make sure children's owners remain in sync 709 for child in self.children: 710 child.add_owner(username, role) 711 # not recursive, since we're calling it from recursive code! 712 child.copy_ownership_from(self, recursive=False)
Set dataset owner
If the user is already an owner, but with a different role, the role is updated. If the user is already an owner with the same role, nothing happens.
Parameters
- str|User username: Username to set as owner
- str|None role: Role to add user with.
714 def remove_owner(self, username): 715 """ 716 Remove dataset owner 717 718 If no owner is set, the dataset is assigned to the anonymous user. 719 If the user is not an owner, nothing happens. 720 721 :param str|User username: Username to set as owner 722 """ 723 if type(username) is not str: 724 if hasattr(username, "get_id"): 725 username = username.get_id() 726 else: 727 raise TypeError("User must be a str or User object") 728 729 if username in self.owners: 730 del self.owners[username] 731 self.db.delete("datasets_owners", where={"name": username, "key": self.key}) 732 733 if not self.owners: 734 self.add_owner("anonymous") 735 736 if username in self.tagged_owners: 737 del self.tagged_owners[username] 738 739 # make sure children's owners remain in sync 740 for child in self.children: 741 child.remove_owner(username) 742 # not recursive, since we're calling it from recursive code! 743 child.copy_ownership_from(self, recursive=False)
Remove dataset owner
If no owner is set, the dataset is assigned to the anonymous user. If the user is not an owner, nothing happens.
Parameters
- str|User username: Username to set as owner
745 def refresh_owners(self): 746 """ 747 Update internal owner cache 748 749 This makes sure that the list of *users* and *tags* which can access the 750 dataset is up to date. 751 """ 752 self.owners = {owner["name"]: owner for owner in self.db.fetchall("SELECT * FROM datasets_owners WHERE key = %s", (self.key,))} 753 754 # determine which users (if any) are owners of the dataset by having a 755 # tag that is listed as an owner 756 owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")] 757 if owner_tags: 758 tagged_owners = self.db.fetchall("SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)) 759 self.tagged_owners = { 760 owner_tag: [user["name"] for user in tagged_owners if owner_tag in user["tags"]] 761 for owner_tag in owner_tags 762 } 763 else: 764 self.tagged_owners = {}
Update internal owner cache
This makes sure that the list of users and tags which can access the dataset is up to date.
766 def copy_ownership_from(self, dataset, recursive=True): 767 """ 768 Copy ownership 769 770 This is useful to e.g. make sure a dataset's ownership stays in sync 771 with its parent 772 773 :param Dataset dataset: Parent to copy from 774 :return: 775 """ 776 self.db.delete("datasets_owners", where={"key": self.key}, commit=False) 777 778 for role in ("owner", "viewer"): 779 owners = dataset.get_owners(role=role) 780 for owner in owners: 781 self.db.insert("datasets_owners", data={"key": self.key, "name": owner, "role": role}, commit=False, safe=True) 782 783 self.db.commit() 784 if recursive: 785 for child in self.children: 786 child.copy_ownership_from(self, recursive=recursive)
Copy ownership
This is useful to e.g. make sure a dataset's ownership stays in sync with its parent
Parameters
- Dataset dataset: Parent to copy from
Returns
788 def get_parameters(self): 789 """ 790 Get dataset parameters 791 792 The dataset parameters are stored as JSON in the database - parse them 793 and return the resulting object 794 795 :return: Dataset parameters as originally stored 796 """ 797 try: 798 return json.loads(self.data["parameters"]) 799 except json.JSONDecodeError: 800 return {}
Get dataset parameters
The dataset parameters are stored as JSON in the database - parse them and return the resulting object
Returns
Dataset parameters as originally stored
802 def get_columns(self): 803 """ 804 Returns the dataset columns. 805 806 Useful for processor input forms. Can deal with both CSV and NDJSON 807 files, the latter only if a `map_item` function is available in the 808 processor that generated it. While in other cases one could use the 809 keys of the JSON object, this is not always possible in follow-up code 810 that uses the 'column' names, so for consistency this function acts as 811 if no column can be parsed if no `map_item` function exists. 812 813 :return list: List of dataset columns; empty list if unable to parse 814 """ 815 if not self.get_results_path().exists(): 816 # no file to get columns from 817 return [] 818 819 if (self.get_results_path().suffix.lower() == ".csv") or (self.get_results_path().suffix.lower() == ".ndjson" and self.get_own_processor() is not None and self.get_own_processor().map_item_method_available(dataset=self)): 820 items = self.iterate_items(warn_unmappable=False) 821 try: 822 keys = list(items.__next__().keys()) 823 except (StopIteration, NotImplementedError): 824 # No items or otherwise unable to iterate 825 return [] 826 finally: 827 del items 828 return keys 829 else: 830 # Filetype not CSV or an NDJSON with `map_item` 831 return []
Returns the dataset columns.
Useful for processor input forms. Can deal with both CSV and NDJSON
files, the latter only if a map_item
function is available in the
processor that generated it. While in other cases one could use the
keys of the JSON object, this is not always possible in follow-up code
that uses the 'column' names, so for consistency this function acts as
if no column can be parsed if no map_item
function exists.
Returns
List of dataset columns; empty list if unable to parse
833 def get_annotation_fields(self): 834 """ 835 Retrieves the saved annotation fields for this dataset. 836 :return dict: The saved annotation fields. 837 """ 838 839 annotation_fields = self.db.fetchone("SELECT annotation_fields FROM datasets WHERE key = %s;", (self.top_parent().key,)) 840 841 if annotation_fields and annotation_fields.get("annotation_fields"): 842 annotation_fields = json.loads(annotation_fields["annotation_fields"]) 843 else: 844 annotation_fields = {} 845 846 return annotation_fields
Retrieves the saved annotation fields for this dataset.
Returns
The saved annotation fields.
848 def get_annotations(self): 849 """ 850 Retrieves the annotations for this dataset. 851 return dict: The annotations 852 """ 853 854 annotations = self.db.fetchone("SELECT annotations FROM annotations WHERE key = %s;", (self.top_parent().key,)) 855 856 if annotations and annotations.get("annotations"): 857 return json.loads(annotations["annotations"]) 858 else: 859 return None
Retrieves the annotations for this dataset. return dict: The annotations
861 def update_label(self, label): 862 """ 863 Update label for this dataset 864 865 :param str label: New label 866 :return str: The new label, as returned by get_label 867 """ 868 self.parameters["label"] = label 869 870 self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key}) 871 return self.get_label()
Update label for this dataset
Parameters
- str label: New label
Returns
The new label, as returned by get_label
873 def get_label(self, parameters=None, default="Query"): 874 """ 875 Generate a readable label for the dataset 876 877 :param dict parameters: Parameters of the dataset 878 :param str default: Label to use if it cannot be inferred from the 879 parameters 880 881 :return str: Label 882 """ 883 if not parameters: 884 parameters = self.parameters 885 886 if parameters.get("label"): 887 return parameters["label"] 888 elif parameters.get("body_query") and parameters["body_query"] != "empty": 889 return parameters["body_query"] 890 elif parameters.get("body_match") and parameters["body_match"] != "empty": 891 return parameters["body_match"] 892 elif parameters.get("subject_query") and parameters["subject_query"] != "empty": 893 return parameters["subject_query"] 894 elif parameters.get("subject_match") and parameters["subject_match"] != "empty": 895 return parameters["subject_match"] 896 elif parameters.get("query"): 897 label = parameters["query"] 898 # Some legacy datasets have lists as query data 899 if isinstance(label, list): 900 label = ", ".join(label) 901 902 label = label if len(label) < 30 else label[:25] + "..." 903 label = label.strip().replace("\n", ", ") 904 return label 905 elif parameters.get("country_flag") and parameters["country_flag"] != "all": 906 return "Flag: %s" % parameters["country_flag"] 907 elif parameters.get("country_name") and parameters["country_name"] != "all": 908 return "Country: %s" % parameters["country_name"] 909 elif parameters.get("filename"): 910 return parameters["filename"] 911 elif parameters.get("board") and "datasource" in parameters: 912 return parameters["datasource"] + "/" + parameters["board"] 913 elif "datasource" in parameters and parameters["datasource"] in self.modules.datasources: 914 return self.modules.datasources[parameters["datasource"]]["name"] + " Dataset" 915 else: 916 return default
Generate a readable label for the dataset
Parameters
- dict parameters: Parameters of the dataset
- str default: Label to use if it cannot be inferred from the parameters
Returns
Label
918 def change_datasource(self, datasource): 919 """ 920 Change the datasource type for this dataset 921 922 :param str label: New datasource type 923 :return str: The new datasource type 924 """ 925 926 self.parameters["datasource"] = datasource 927 928 self.db.update("datasets", data={"parameters": json.dumps(self.parameters)}, where={"key": self.key}) 929 return datasource
Change the datasource type for this dataset
Parameters
- str label: New datasource type
Returns
The new datasource type
931 def reserve_result_file(self, parameters=None, extension="csv"): 932 """ 933 Generate a unique path to the results file for this dataset 934 935 This generates a file name for the data file of this dataset, and makes sure 936 no file exists or will exist at that location other than the file we 937 expect (i.e. the data for this particular dataset). 938 939 :param str extension: File extension, "csv" by default 940 :param parameters: Dataset parameters 941 :return bool: Whether the file path was successfully reserved 942 """ 943 if self.data["is_finished"]: 944 raise RuntimeError("Cannot reserve results file for a finished dataset") 945 946 # Use 'random' for random post queries 947 if "random_amount" in parameters and int(parameters["random_amount"]) > 0: 948 file = 'random-' + str(parameters["random_amount"]) + '-' + self.data["key"] 949 # Use country code for country flag queries 950 elif "country_flag" in parameters and parameters["country_flag"] != 'all': 951 file = 'countryflag-' + str(parameters["country_flag"]) + '-' + self.data["key"] 952 # Use the query string for all other queries 953 else: 954 query_bit = self.data["query"].replace(" ", "-").lower() 955 query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit) 956 query_bit = query_bit[:100] # Crop to avoid OSError 957 file = query_bit + "-" + self.data["key"] 958 file = re.sub(r"[-]+", "-", file) 959 960 path = self.folder.joinpath(file + "." + extension.lower()) 961 index = 1 962 while path.is_file(): 963 path = self.folder.joinpath(file + "-" + str(index) + "." + extension.lower()) 964 index += 1 965 966 file = path.name 967 updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]}, 968 data={"result_file": file}) 969 self.data["result_file"] = file 970 return updated > 0
Generate a unique path to the results file for this dataset
This generates a file name for the data file of this dataset, and makes sure no file exists or will exist at that location other than the file we expect (i.e. the data for this particular dataset).
Parameters
- str extension: File extension, "csv" by default
- parameters: Dataset parameters
Returns
Whether the file path was successfully reserved
972 def get_key(self, query, parameters, parent="", time_offset=0): 973 """ 974 Generate a unique key for this dataset that can be used to identify it 975 976 The key is a hash of a combination of the query string and parameters. 977 You never need to call this, really: it's used internally. 978 979 :param str query: Query string 980 :param parameters: Dataset parameters 981 :param parent: Parent dataset's key (if applicable) 982 :param time_offset: Offset to add to the time component of the dataset 983 key. This can be used to ensure a unique key even if the parameters and 984 timing is otherwise identical to an existing dataset's 985 986 :return str: Dataset key 987 """ 988 # Return a hash based on parameters 989 # we're going to use the hash of the parameters to uniquely identify 990 # the dataset, so make sure it's always in the same order, or we might 991 # end up creating multiple keys for the same dataset if python 992 # decides to return the dict in a different order 993 param_key = collections.OrderedDict() 994 for key in sorted(parameters): 995 param_key[key] = parameters[key] 996 997 # we additionally use the current time as a salt - this should usually 998 # ensure a unique key for the dataset. if for some reason there is a 999 # hash collision 1000 param_key["_salt"] = int(time.time()) + time_offset 1001 1002 parent_key = str(parent) if parent else "" 1003 plain_key = repr(param_key) + str(query) + parent_key 1004 hashed_key = hashlib.md5(plain_key.encode("utf-8")).hexdigest() 1005 1006 if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)): 1007 # key exists, generate a new one 1008 return self.get_key(query, parameters, parent, time_offset=random.randint(1,10)) 1009 else: 1010 return hashed_key
Generate a unique key for this dataset that can be used to identify it
The key is a hash of a combination of the query string and parameters. You never need to call this, really: it's used internally.
Parameters
- str query: Query string
- parameters: Dataset parameters
- parent: Parent dataset's key (if applicable)
- time_offset: Offset to add to the time component of the dataset key. This can be used to ensure a unique key even if the parameters and timing is otherwise identical to an existing dataset's
Returns
Dataset key
1012 def set_key(self, key): 1013 """ 1014 Change dataset key 1015 1016 In principe, keys should never be changed. But there are rare cases 1017 where it is useful to do so, in particular when importing a dataset 1018 from another 4CAT instance; in that case it makes sense to try and 1019 ensure that the key is the same as it was before. This function sets 1020 the dataset key and updates any dataset references to it. 1021 1022 :param str key: Key to set 1023 :return str: Key that was set. If the desired key already exists, the 1024 original key is kept. 1025 """ 1026 key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,)) 1027 if key_exists or not key: 1028 return self.key 1029 1030 old_key = self.key 1031 self.db.update("datasets", data={"key": key}, where={"key": old_key}) 1032 1033 # update references 1034 self.db.update("datasets", data={"key_parent": key}, where={"key_parent": old_key}) 1035 self.db.update("datasets_owners", data={"key": key}, where={"key": old_key}) 1036 self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key}) 1037 self.db.update("users_favourites", data={"key": key}, where={"key": old_key}) 1038 1039 # for good measure 1040 self.db.commit() 1041 self.key = key 1042 1043 return self.key
Change dataset key
In principe, keys should never be changed. But there are rare cases where it is useful to do so, in particular when importing a dataset from another 4CAT instance; in that case it makes sense to try and ensure that the key is the same as it was before. This function sets the dataset key and updates any dataset references to it.
Parameters
- str key: Key to set
Returns
Key that was set. If the desired key already exists, the original key is kept.
1045 def get_status(self): 1046 """ 1047 Get Dataset status 1048 1049 :return string: Dataset status 1050 """ 1051 return self.data["status"]
Get Dataset status
Returns
Dataset status
1053 def update_status(self, status, is_final=False): 1054 """ 1055 Update dataset status 1056 1057 The status is a string that may be displayed to a user to keep them 1058 updated and informed about the progress of a dataset. No memory is kept 1059 of earlier dataset statuses; the current status is overwritten when 1060 updated. 1061 1062 Statuses are also written to the dataset log file. 1063 1064 :param string status: Dataset status 1065 :param bool is_final: If this is `True`, subsequent calls to this 1066 method while the object is instantiated will not update the dataset 1067 status. 1068 :return bool: Status update successful? 1069 """ 1070 if self.no_status_updates: 1071 return 1072 1073 # for presets, copy the updated status to the preset(s) this is part of 1074 if self.preset_parent is None: 1075 self.preset_parent = [parent for parent in self.get_genealogy() if parent.type.find("preset-") == 0 and parent.key != self.key][:1] 1076 1077 if self.preset_parent: 1078 for preset_parent in self.preset_parent: 1079 if not preset_parent.is_finished(): 1080 preset_parent.update_status(status) 1081 1082 self.data["status"] = status 1083 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"status": status}) 1084 1085 if is_final: 1086 self.no_status_updates = True 1087 1088 self.log(status) 1089 1090 return updated > 0
Update dataset status
The status is a string that may be displayed to a user to keep them updated and informed about the progress of a dataset. No memory is kept of earlier dataset statuses; the current status is overwritten when updated.
Statuses are also written to the dataset log file.
Parameters
- string status: Dataset status
- bool is_final: If this is
True
, subsequent calls to this method while the object is instantiated will not update the dataset status.
Returns
Status update successful?
1092 def update_progress(self, progress): 1093 """ 1094 Update dataset progress 1095 1096 The progress can be used to indicate to a user how close the dataset 1097 is to completion. 1098 1099 :param float progress: Between 0 and 1. 1100 :return: 1101 """ 1102 progress = min(1, max(0, progress)) # clamp 1103 if type(progress) is int: 1104 progress = float(progress) 1105 1106 self.data["progress"] = progress 1107 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={"progress": progress}) 1108 return updated > 0
Update dataset progress
The progress can be used to indicate to a user how close the dataset is to completion.
Parameters
- float progress: Between 0 and 1.
Returns
1110 def get_progress(self): 1111 """ 1112 Get dataset progress 1113 1114 :return float: Progress, between 0 and 1 1115 """ 1116 return self.data["progress"]
Get dataset progress
Returns
Progress, between 0 and 1
1118 def finish_with_error(self, error): 1119 """ 1120 Set error as final status, and finish with 0 results 1121 1122 This is a convenience function to avoid having to repeat 1123 "update_status" and "finish" a lot. 1124 1125 :param str error: Error message for final dataset status. 1126 :return: 1127 """ 1128 self.update_status(error, is_final=True) 1129 self.finish(0) 1130 1131 return None
Set error as final status, and finish with 0 results
This is a convenience function to avoid having to repeat "update_status" and "finish" a lot.
Parameters
- str error: Error message for final dataset status.
Returns
1133 def update_version(self, version): 1134 """ 1135 Update software version used for this dataset 1136 1137 This can be used to verify the code that was used to process this dataset. 1138 1139 :param string version: Version identifier 1140 :return bool: Update successul? 1141 """ 1142 try: 1143 # this fails if the processor type is unknown 1144 # edge case, but let's not crash... 1145 processor_path = self.modules.processors.get(self.data["type"]).filepath 1146 except AttributeError: 1147 processor_path = "" 1148 1149 updated = self.db.update("datasets", where={"key": self.data["key"]}, data={ 1150 "software_version": version[0], 1151 "software_source": version[1], 1152 "software_file": processor_path 1153 }) 1154 1155 return updated > 0
Update software version used for this dataset
This can be used to verify the code that was used to process this dataset.
Parameters
- string version: Version identifier
Returns
Update successul?
1157 def delete_parameter(self, parameter, instant=True): 1158 """ 1159 Delete a parameter from the dataset metadata 1160 1161 :param string parameter: Parameter to delete 1162 :param bool instant: Also delete parameters in this instance object? 1163 :return bool: Update successul? 1164 """ 1165 parameters = self.parameters.copy() 1166 if parameter in parameters: 1167 del parameters[parameter] 1168 else: 1169 return False 1170 1171 updated = self.db.update("datasets", where={"key": self.data["key"]}, 1172 data={"parameters": json.dumps(parameters)}) 1173 1174 if instant: 1175 self.parameters = parameters 1176 1177 return updated > 0
Delete a parameter from the dataset metadata
Parameters
- string parameter: Parameter to delete
- bool instant: Also delete parameters in this instance object?
Returns
Update successul?
1179 def get_version_url(self, file): 1180 """ 1181 Get a versioned github URL for the version this dataset was processed with 1182 1183 :param file: File to link within the repository 1184 :return: URL, or an empty string 1185 """ 1186 if not self.data["software_source"]: 1187 return "" 1188 1189 filepath = self.data.get("software_file", "") 1190 if filepath.startswith("/extensions/"): 1191 # go to root of extension 1192 filepath = "/" + "/".join(filepath.split("/")[3:]) 1193 1194 return self.data["software_source"] + "/blob/" + self.data["software_version"] + filepath
Get a versioned github URL for the version this dataset was processed with
Parameters
- file: File to link within the repository
Returns
URL, or an empty string
1196 def top_parent(self): 1197 """ 1198 Get root dataset 1199 1200 Traverses the tree of datasets this one is part of until it finds one 1201 with no source_dataset dataset, then returns that dataset. 1202 1203 :return Dataset: Parent dataset 1204 """ 1205 genealogy = self.get_genealogy() 1206 return genealogy[0]
Get root dataset
Traverses the tree of datasets this one is part of until it finds one with no source_dataset dataset, then returns that dataset.
Returns
Parent dataset
1208 def get_genealogy(self, inclusive=False): 1209 """ 1210 Get genealogy of this dataset 1211 1212 Creates a list of DataSet objects, with the first one being the 1213 'top' dataset, and each subsequent one being a child of the previous 1214 one, ending with the current dataset. 1215 1216 :return list: Dataset genealogy, oldest dataset first 1217 """ 1218 if self.genealogy and not inclusive: 1219 return self.genealogy 1220 1221 key_parent = self.key_parent 1222 genealogy = [] 1223 1224 while key_parent: 1225 try: 1226 parent = DataSet(key=key_parent, db=self.db, modules=self.modules) 1227 except DataSetException: 1228 break 1229 1230 genealogy.append(parent) 1231 if parent.key_parent: 1232 key_parent = parent.key_parent 1233 else: 1234 break 1235 1236 genealogy.reverse() 1237 genealogy.append(self) 1238 1239 self.genealogy = genealogy 1240 return self.genealogy
Get genealogy of this dataset
Creates a list of DataSet objects, with the first one being the 'top' dataset, and each subsequent one being a child of the previous one, ending with the current dataset.
Returns
Dataset genealogy, oldest dataset first
1242 def get_all_children(self, recursive=True): 1243 """ 1244 Get all children of this dataset 1245 1246 Results are returned as a non-hierarchical list, i.e. the result does 1247 not reflect the actual dataset hierarchy (but all datasets in the 1248 result will have the original dataset as an ancestor somewhere) 1249 1250 :return list: List of DataSets 1251 """ 1252 children = [DataSet(data=record, db=self.db, modules=self.modules) for record in self.db.fetchall("SELECT * FROM datasets WHERE key_parent = %s", (self.key,))] 1253 results = children.copy() 1254 if recursive: 1255 for child in children: 1256 results += child.get_all_children(recursive) 1257 1258 return results
Get all children of this dataset
Results are returned as a non-hierarchical list, i.e. the result does not reflect the actual dataset hierarchy (but all datasets in the result will have the original dataset as an ancestor somewhere)
Returns
List of DataSets
1260 def nearest(self, type_filter): 1261 """ 1262 Return nearest dataset that matches the given type 1263 1264 Starting with this dataset, traverse the hierarchy upwards and return 1265 whichever dataset matches the given type. 1266 1267 :param str type_filter: Type filter. Can contain wildcards and is matched 1268 using `fnmatch.fnmatch`. 1269 :return: Earliest matching dataset, or `None` if none match. 1270 """ 1271 genealogy = self.get_genealogy(inclusive=True) 1272 for dataset in reversed(genealogy): 1273 if fnmatch.fnmatch(dataset.type, type_filter): 1274 return dataset 1275 1276 return None
Return nearest dataset that matches the given type
Starting with this dataset, traverse the hierarchy upwards and return whichever dataset matches the given type.
Parameters
- str type_filter: Type filter. Can contain wildcards and is matched
using
fnmatch.fnmatch
.
Returns
Earliest matching dataset, or
None
if none match.
1310 def get_compatible_processors(self, user=None): 1311 """ 1312 Get list of processors compatible with this dataset 1313 1314 Checks whether this dataset type is one that is listed as being accepted 1315 by the processor, for each known type: if the processor does not 1316 specify accepted types (via the `is_compatible_with` method), it is 1317 assumed it accepts any top-level datasets 1318 1319 :param str|User|None user: User to get compatibility for. If set, 1320 use the user-specific config settings where available. 1321 1322 :return dict: Compatible processors, `name => class` mapping 1323 """ 1324 processors = self.modules.processors 1325 1326 available = {} 1327 for processor_type, processor in processors.items(): 1328 if processor.is_from_collector(): 1329 continue 1330 1331 own_processor = self.get_own_processor() 1332 if own_processor and own_processor.exclude_followup_processors(processor_type): 1333 continue 1334 1335 # consider a processor compatible if its is_compatible_with 1336 # method returns True *or* if it has no explicit compatibility 1337 # check and this dataset is top-level (i.e. has no parent) 1338 if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \ 1339 or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, user=user)): 1340 available[processor_type] = processor 1341 1342 return available
Get list of processors compatible with this dataset
Checks whether this dataset type is one that is listed as being accepted
by the processor, for each known type: if the processor does not
specify accepted types (via the is_compatible_with
method), it is
assumed it accepts any top-level datasets
Parameters
- str|User|None user: User to get compatibility for. If set, use the user-specific config settings where available.
Returns
Compatible processors,
name => class
mapping
1344 def get_place_in_queue(self, update=False): 1345 """ 1346 Determine dataset's position in queue 1347 1348 If the dataset is already finished, the position is -1. Else, the 1349 position is the amount of datasets to be completed before this one will 1350 be processed. A position of 0 would mean that the dataset is currently 1351 being executed, or that the backend is not running. 1352 1353 :param bool update: Update the queue position from database if True, else return cached value 1354 :return int: Queue position 1355 """ 1356 if self.is_finished() or not self.data.get("job"): 1357 self._queue_position = -1 1358 return self._queue_position 1359 elif not update and self._queue_position is not None: 1360 # Use cached value 1361 return self._queue_position 1362 else: 1363 # Collect queue position from database via the job 1364 try: 1365 job = Job.get_by_ID(self.data["job"], self.db) 1366 self._queue_position = job.get_place_in_queue() 1367 except JobNotFoundException: 1368 self._queue_position = -1 1369 1370 return self._queue_position
Determine dataset's position in queue
If the dataset is already finished, the position is -1. Else, the position is the amount of datasets to be completed before this one will be processed. A position of 0 would mean that the dataset is currently being executed, or that the backend is not running.
Parameters
- bool update: Update the queue position from database if True, else return cached value
Returns
Queue position
1372 def get_modules(self): 1373 """ 1374 Get 4CAT modules 1375 1376 Is a function because loading them is not free, and this way we can 1377 cache the result. 1378 1379 :return: 1380 """ 1381 if not self.modules: 1382 self.modules = ModuleCollector() 1383 1384 return self.modules
Get 4CAT modules
Is a function because loading them is not free, and this way we can cache the result.
Returns
1386 def get_own_processor(self): 1387 """ 1388 Get the processor class that produced this dataset 1389 1390 :return: Processor class, or `None` if not available. 1391 """ 1392 processor_type = self.parameters.get("type", self.data.get("type")) 1393 1394 return self.modules.processors.get(processor_type)
Get the processor class that produced this dataset
Returns
Processor class, or
None
if not available.
1396 def get_available_processors(self, user=None, exclude_hidden=False): 1397 """ 1398 Get list of processors that may be run for this dataset 1399 1400 Returns all compatible processors except for those that are already 1401 queued or finished and have no options. Processors that have been 1402 run but have options are included so they may be run again with a 1403 different configuration 1404 1405 :param str|User|None user: User to get compatibility for. If set, 1406 use the user-specific config settings where available. 1407 :param bool exclude_hidden: Exclude processors that should be displayed 1408 in the UI? If `False`, all processors are returned. 1409 1410 :return dict: Available processors, `name => properties` mapping 1411 """ 1412 if self.available_processors: 1413 # Update to reflect exclude_hidden parameter which may be different from last call 1414 # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this 1415 return {processor_type: processor for processor_type, processor in self.available_processors.items() if not exclude_hidden or not processor.is_hidden} 1416 1417 processors = self.get_compatible_processors(user=user) 1418 1419 for analysis in self.children: 1420 if analysis.type not in processors: 1421 continue 1422 1423 if not processors[analysis.type].get_options(): 1424 del processors[analysis.type] 1425 continue 1426 1427 if exclude_hidden and processors[analysis.type].is_hidden: 1428 del processors[analysis.type] 1429 1430 self.available_processors = processors 1431 return processors
Get list of processors that may be run for this dataset
Returns all compatible processors except for those that are already queued or finished and have no options. Processors that have been run but have options are included so they may be run again with a different configuration
Parameters
- str|User|None user: User to get compatibility for. If set, use the user-specific config settings where available.
- bool exclude_hidden: Exclude processors that should be displayed
in the UI? If
False
, all processors are returned.
Returns
Available processors,
name => properties
mapping
1433 def link_job(self, job): 1434 """ 1435 Link this dataset to a job ID 1436 1437 Updates the dataset data to include a reference to the job that will be 1438 executing (or has already executed) this job. 1439 1440 Note that if no job can be found for this dataset, this method silently 1441 fails. 1442 1443 :param Job job: The job that will run this dataset 1444 1445 :todo: If the job column ever gets used, make sure it always contains 1446 a valid value, rather than silently failing this method. 1447 """ 1448 if type(job) != Job: 1449 raise TypeError("link_job requires a Job object as its argument") 1450 1451 if "id" not in job.data: 1452 try: 1453 job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"]) 1454 except JobNotFoundException: 1455 return 1456 1457 self.db.update("datasets", where={"key": self.key}, data={"job": job.data["id"]})
Link this dataset to a job ID
Updates the dataset data to include a reference to the job that will be executing (or has already executed) this job.
Note that if no job can be found for this dataset, this method silently fails.
Parameters
- Job job: The job that will run this dataset
:todo: If the job column ever gets used, make sure it always contains a valid value, rather than silently failing this method.
1459 def link_parent(self, key_parent): 1460 """ 1461 Set source_dataset key for this dataset 1462 1463 :param key_parent: Parent key. Not checked for validity 1464 """ 1465 self.db.update("datasets", where={"key": self.key}, data={"key_parent": key_parent})
Set source_dataset key for this dataset
Parameters
- key_parent: Parent key. Not checked for validity
1467 def get_parent(self): 1468 """ 1469 Get parent dataset 1470 1471 :return DataSet: Parent dataset, or `None` if not applicable 1472 """ 1473 return DataSet(key=self.key_parent, db=self.db, modules=self.modules) if self.key_parent else None
Get parent dataset
Returns
Parent dataset, or
None
if not applicable
1475 def detach(self): 1476 """ 1477 Makes the datasets standalone, i.e. not having any source_dataset dataset 1478 """ 1479 self.link_parent("")
Makes the datasets standalone, i.e. not having any source_dataset dataset
1481 def is_dataset(self): 1482 """ 1483 Easy way to confirm this is a dataset. 1484 Used for checking processor and dataset compatibility, 1485 which needs to handle both processors and datasets. 1486 """ 1487 return True
Easy way to confirm this is a dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.
1489 def is_top_dataset(self): 1490 """ 1491 Easy way to confirm this is a top dataset. 1492 Used for checking processor and dataset compatibility, 1493 which needs to handle both processors and datasets. 1494 """ 1495 if self.key_parent: 1496 return False 1497 return True
Easy way to confirm this is a top dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.
1499 def is_expiring(self, user=None): 1500 """ 1501 Determine if dataset is set to expire 1502 1503 Similar to `is_expired`, but checks if the dataset will be deleted in 1504 the future, not if it should be deleted right now. 1505 1506 :param user: User to use for configuration context. Provide to make 1507 sure configuration overrides for this user are taken into account. 1508 :return bool|int: `False`, or the expiration date as a Unix timestamp. 1509 """ 1510 # has someone opted out of deleting this? 1511 if self.parameters.get("keep"): 1512 return False 1513 1514 # is this dataset explicitly marked as expiring after a certain time? 1515 if self.parameters.get("expires-after"): 1516 return self.parameters.get("expires-after") 1517 1518 # is the data source configured to have its datasets expire? 1519 expiration = config.get("datasources.expiration", {}, user=user) 1520 if not expiration.get(self.parameters.get("datasource")): 1521 return False 1522 1523 # is there a timeout for this data source? 1524 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1525 return self.timestamp + expiration.get(self.parameters.get("datasource")).get("timeout") 1526 1527 return False
Determine if dataset is set to expire
Similar to is_expired
, but checks if the dataset will be deleted in
the future, not if it should be deleted right now.
Parameters
- user: User to use for configuration context. Provide to make sure configuration overrides for this user are taken into account.
Returns
False
, or the expiration date as a Unix timestamp.
1529 def is_expired(self, user=None): 1530 """ 1531 Determine if dataset should be deleted 1532 1533 Datasets can be set to expire, but when they should be deleted depends 1534 on a number of factor. This checks them all. 1535 1536 :param user: User to use for configuration context. Provide to make 1537 sure configuration overrides for this user are taken into account. 1538 :return bool: 1539 """ 1540 # has someone opted out of deleting this? 1541 if not self.is_expiring(): 1542 return False 1543 1544 # is this dataset explicitly marked as expiring after a certain time? 1545 future = time.time() + 3600 # ensure we don't delete datasets with invalid expiration times 1546 if self.parameters.get("expires-after") and convert_to_int(self.parameters["expires-after"], future) < time.time(): 1547 return True 1548 1549 # is the data source configured to have its datasets expire? 1550 expiration = config.get("datasources.expiration", {}, user=user) 1551 if not expiration.get(self.parameters.get("datasource")): 1552 return False 1553 1554 # is the dataset older than the set timeout? 1555 if expiration.get(self.parameters.get("datasource")).get("timeout"): 1556 return self.timestamp + expiration[self.parameters.get("datasource")]["timeout"] < time.time() 1557 1558 return False
Determine if dataset should be deleted
Datasets can be set to expire, but when they should be deleted depends on a number of factor. This checks them all.
Parameters
- user: User to use for configuration context. Provide to make sure configuration overrides for this user are taken into account.
Returns
1560 def is_from_collector(self): 1561 """ 1562 Check if this dataset was made by a processor that collects data, i.e. 1563 a search or import worker. 1564 1565 :return bool: 1566 """ 1567 return self.type.endswith("-search") or self.type.endswith("-import")
Check if this dataset was made by a processor that collects data, i.e. a search or import worker.
Returns
1569 def get_extension(self): 1570 """ 1571 Gets the file extention this dataset produces. 1572 Also checks whether the results file exists. 1573 Used for checking processor and dataset compatibility. 1574 1575 :return str extension: Extension, e.g. `csv` 1576 """ 1577 if self.get_results_path().exists(): 1578 return self.get_results_path().suffix[1:] 1579 1580 return False
Gets the file extention this dataset produces. Also checks whether the results file exists. Used for checking processor and dataset compatibility.
Returns
Extension, e.g.
csv
1582 def get_media_type(self): 1583 """ 1584 Gets the media type of the dataset file. 1585 1586 :return str: media type, e.g., "text" 1587 """ 1588 own_processor = self.get_own_processor() 1589 if hasattr(self, "media_type"): 1590 # media type can be defined explicitly in the dataset; this is the priority 1591 return self.media_type 1592 elif own_processor is not None: 1593 # or media type can be defined in the processor 1594 # some processors can set different media types for different datasets (e.g., import_media) 1595 if hasattr(own_processor, "media_type"): 1596 return own_processor.media_type 1597 1598 # Default to text 1599 return self.parameters.get("media_type", "text")
Gets the media type of the dataset file.
Returns
media type, e.g., "text"
1601 def get_metadata(self): 1602 """ 1603 Get dataset metadata 1604 1605 This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended 1606 as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to 1607 update its database (and ensure compatibility with the exporting version of 4CAT). 1608 """ 1609 metadata = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,)) 1610 1611 # get 4CAT version (presumably to ensure export is compatible with import) 1612 metadata["current_4CAT_version"] = get_software_version() 1613 return metadata
Get dataset metadata
This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to update its database (and ensure compatibility with the exporting version of 4CAT).
1615 def get_result_url(self): 1616 """ 1617 Gets the 4CAT frontend URL of a dataset file. 1618 1619 Uses the FlaskConfig attributes (i.e., SERVER_NAME and 1620 SERVER_HTTPS) plus hardcoded '/result/'. 1621 TODO: create more dynamic method of obtaining url. 1622 """ 1623 filename = self.get_results_path().name 1624 url_to_file = ('https://' if config.get("flask.https") else 'http://') + \ 1625 config.get("flask.server_name") + '/result/' + filename 1626 return url_to_file
Gets the 4CAT frontend URL of a dataset file.
Uses the FlaskConfig attributes (i.e., SERVER_NAME and SERVER_HTTPS) plus hardcoded '/result/'. TODO: create more dynamic method of obtaining url.
1628 def warn_unmappable_item(self, item_count, processor=None, error_message=None, warn_admins=True): 1629 """ 1630 Log an item that is unable to be mapped and warn administrators. 1631 1632 :param int item_count: Item index 1633 :param Processor processor: Processor calling function8 1634 """ 1635 dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" 1636 1637 # Use processing dataset if available, otherwise use original dataset (which likely already has this error message) 1638 closest_dataset = processor.dataset if processor is not None and processor.dataset is not None else self 1639 # Log error to dataset log 1640 closest_dataset.log(dataset_error_message) 1641 1642 if warn_admins: 1643 if processor is not None: 1644 processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.") 1645 elif hasattr(self.db, "log"): 1646 # borrow the database's log handler 1647 self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.") 1648 else: 1649 # No other log available 1650 raise DataSetException(f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn")
Log an item that is unable to be mapped and warn administrators.
Parameters
- int item_count: Item index
- Processor processor: Processor calling function8