Edit on GitHub

common.lib.dataset

   1import collections
   2import itertools
   3import datetime
   4import fnmatch
   5import random
   6import shutil
   7import json
   8import time
   9import csv
  10import re
  11
  12from common.lib.annotation import Annotation
  13from common.lib.job import Job, JobNotFoundException
  14
  15from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, call_api, hash_to_md5, convert_to_float
  16from common.lib.item_mapping import MappedItem, DatasetItem
  17from common.lib.fourcat_module import FourcatModule
  18from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException,
  19                                   MapItemException, MappedItemIncompleteException, AnnotationException)
  20
  21
  22class DataSet(FourcatModule):
  23    """
  24    Provide interface to safely register and run operations on a dataset
  25
  26    A dataset is a collection of:
  27    - A unique identifier
  28    - A set of parameters that demarcate the data contained within
  29    - The data
  30
  31    The data is usually stored in a file on the disk; the parameters are stored
  32    in a database. The handling of the data, et cetera, is done by other
  33    workers; this class defines method to create and manipulate the dataset's
  34    properties.
  35    """
  36
  37    # Attributes must be created here to ensure getattr and setattr work properly
  38    data = None
  39    key = ""
  40
  41    _children = None
  42    available_processors = None
  43    _genealogy = None
  44    preset_parent = None
  45    parameters = None
  46    modules = None
  47    annotation_fields = None
  48
  49    owners = None
  50    tagged_owners = None
  51
  52    db = None
  53    folder = None
  54    is_new = True
  55
  56    no_status_updates = False
  57    staging_areas = None
  58    _queue_position = None
  59
  60    def __init__(
  61        self,
  62        parameters=None,
  63        key=None,
  64        job=None,
  65        data=None,
  66        db=None,
  67        parent="",
  68        extension=None,
  69        type=None,
  70        is_private=True,
  71        owner="anonymous",
  72        modules=None
  73    ):
  74        """
  75        Create new dataset object
  76
  77        If the dataset is not in the database yet, it is added.
  78
  79        :param dict parameters:  Only when creating a new dataset. Dataset
  80        parameters, free-form dictionary.
  81        :param str key: Dataset key. If given, dataset with this key is loaded.
  82        :param int job: Job ID. If given, dataset corresponding to job is
  83        loaded.
  84        :param dict data: Dataset data, corresponding to a row in the datasets
  85        database table. If not given, retrieved from database depending on key.
  86        :param db:  Database connection
  87        :param str parent:  Only when creating a new dataset. Parent dataset
  88        key to which the one being created is a child.
  89        :param str extension: Only when creating a new dataset. Extension of
  90        dataset result file.
  91        :param str type: Only when creating a new dataset. Type of the dataset,
  92        corresponding to the type property of a processor class.
  93        :param bool is_private: Only when creating a new dataset. Whether the
  94        dataset is private or public.
  95        :param str owner: Only when creating a new dataset. The user name of
  96        the dataset's creator.
  97        :param modules: Module cache. If not given, will be loaded when needed
  98        (expensive). Used to figure out what processors are compatible with
  99        this dataset.
 100        """
 101        self.db = db
 102
 103        # Ensure mutable attributes are set in __init__ as they are unique to each DataSet
 104        self.data = {}
 105        self.parameters = {}
 106        self.available_processors = {}
 107        self._genealogy = []
 108        self.staging_areas = []
 109        self.modules = modules
 110
 111        if key is not None:
 112            self.key = key
 113            current = self.db.fetchone(
 114                "SELECT * FROM datasets WHERE key = %s", (self.key,)
 115            )
 116            if not current:
 117                raise DataSetNotFoundException(
 118                    "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given"
 119                    % key
 120                )
 121
 122        elif job is not None:
 123            current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),))
 124            if not current:
 125                raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
 126
 127            self.key = current["key"]
 128        elif data is not None:
 129            current = data
 130            if (
 131                "query" not in data
 132                or "key" not in data
 133                or "parameters" not in data
 134                or "key_parent" not in data
 135            ):
 136                raise DataSetException(
 137                    "DataSet() requires a complete dataset record for its 'data' argument"
 138                )
 139
 140            self.key = current["key"]
 141        else:
 142            if parameters is None:
 143                raise DataSetException(
 144                    "DataSet() requires either 'key', or 'parameters' to be given"
 145                )
 146
 147            if not type:
 148                raise DataSetException("Datasets must have their type set explicitly")
 149
 150            query = self.get_label(parameters, default=type)
 151            self.key = self.get_key(query, parameters, parent)
 152            current = self.db.fetchone(
 153                "SELECT * FROM datasets WHERE key = %s AND query = %s",
 154                (self.key, query),
 155            )
 156
 157        if current:
 158            self.data = current
 159            self.parameters = json.loads(self.data["parameters"])
 160            self.annotation_fields = json.loads(self.data["annotation_fields"]) \
 161                if self.data.get("annotation_fields") else {}
 162            self.is_new = False
 163        else:
 164            self.data = {"type": type}  # get_own_processor needs this
 165            own_processor = self.get_own_processor()
 166            version = get_software_commit(own_processor)
 167            self.data = {
 168                "key": self.key,
 169                "query": self.get_label(parameters, default=type),
 170                "parameters": json.dumps(parameters),
 171                "result_file": "",
 172                "creator": owner,
 173                "status": "",
 174                "type": type,
 175                "timestamp": int(time.time()),
 176                "is_finished": False,
 177                "is_private": is_private,
 178                "software_version": version[0],
 179                "software_source": version[1],
 180                "software_file": "",
 181                "num_rows": 0,
 182                "progress": 0.0,
 183                "key_parent": parent,
 184                "annotation_fields": "{}"
 185            }
 186            self.parameters = parameters
 187            self.annotation_fields = {}
 188
 189            self.db.insert("datasets", data=self.data)
 190            self.refresh_owners()
 191            self.add_owner(owner)
 192
 193            # Find desired extension from processor if not explicitly set
 194            if extension is None:
 195                if own_processor:
 196                    extension = own_processor.get_extension(
 197                        parent_dataset=DataSet(key=parent, db=db, modules=self.modules)
 198                        if parent
 199                        else None
 200                    )
 201                # Still no extension, default to 'csv'
 202                if not extension:
 203                    extension = "csv"
 204
 205            # Reserve filename and update data['result_file']
 206            self.reserve_result_file(parameters, extension)
 207
 208        self.refresh_owners()
 209
 210    def check_dataset_finished(self):
 211        """
 212        Checks if dataset is finished. Returns path to results file is not empty,
 213        or 'empty_file' when there were not matches.
 214
 215        Only returns a path if the dataset is complete. In other words, if this
 216        method returns a path, a file with the complete results for this dataset
 217        will exist at that location.
 218
 219        :return: A path to the results file, 'empty_file', or `None`
 220        """
 221        if self.data["is_finished"] and self.data["num_rows"] > 0:
 222            return self.get_results_path()
 223        elif self.data["is_finished"] and self.data["num_rows"] == 0:
 224            return 'empty'
 225        else:
 226            return None
 227
 228    def get_results_path(self):
 229        """
 230        Get path to results file
 231
 232        Always returns a path, that will at some point contain the dataset
 233        data, but may not do so yet. Use this to get the location to write
 234        generated results to.
 235
 236        :return Path:  A path to the results file
 237        """
 238        # alas we need to instantiate a config reader here - no way around it
 239        if not self.folder:
 240            self.folder = self.modules.config.get('PATH_ROOT').joinpath(self.modules.config.get('PATH_DATA'))
 241        return self.folder.joinpath(self.data["result_file"])
 242
 243    def get_results_folder_path(self):
 244        """
 245        Get path to folder containing accompanying results
 246
 247        Returns a path that may not yet be created
 248
 249        :return Path:  A path to the results file
 250        """
 251        return self.get_results_path().parent.joinpath("folder_" + self.key)
 252
 253    def get_log_path(self):
 254        """
 255        Get path to dataset log file
 256
 257        Each dataset has a single log file that documents its creation. This
 258        method returns the path to that file. It is identical to the path of
 259        the dataset result file, with 'log' as its extension instead.
 260
 261        :return Path:  A path to the log file
 262        """
 263        return self.get_results_path().with_suffix(".log")
 264
 265    def clear_log(self):
 266        """
 267        Clears the dataset log file
 268
 269        If the log file does not exist, it is created empty. The log file will
 270        have the same file name as the dataset result file, with the 'log'
 271        extension.
 272        """
 273        log_path = self.get_log_path()
 274        with log_path.open("w"):
 275            pass
 276
 277    def log(self, log):
 278        """
 279        Write log message to file
 280
 281        Writes the log message to the log file on a new line, including a
 282        timestamp at the start of the line. Note that this assumes the log file
 283        already exists - it should have been created/cleared with clear_log()
 284        prior to calling this.
 285
 286        :param str log:  Log message to write
 287        """
 288        log_path = self.get_log_path()
 289        with log_path.open("a", encoding="utf-8") as outfile:
 290            outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
 291
 292    def _iterate_items(self, processor=None):
 293        """
 294        A generator that iterates through a CSV or NDJSON file
 295
 296        This is an internal method and should not be called directly. Rather,
 297        call iterate_items() and use the generated dictionary and its properties.
 298
 299        If a reference to a processor is provided, with every iteration,
 300        the processor's 'interrupted' flag is checked, and if set a
 301        ProcessorInterruptedException is raised, which by default is caught
 302        in the worker and subsequently stops execution gracefully.
 303
 304        There are two file types that can be iterated (currently): CSV files
 305        and NDJSON (newline-delimited JSON) files. In the future, one could
 306        envision adding a pathway to retrieve items from e.g. a MongoDB
 307        collection directly instead of from a static file
 308
 309        :param BasicProcessor processor:  A reference to the processor
 310        iterating the dataset.
 311        :return generator:  A generator that yields each item as a dictionary
 312        """
 313        path = self.get_results_path()
 314
 315
 316        # Yield through items one by one
 317        if path.suffix.lower() == ".csv":
 318            with path.open("rb") as infile:
 319                wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
 320                reader = csv.DictReader(wrapped_infile)
 321
 322                if not self.get_own_processor():
 323                    # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not
 324                    first_item = next(reader)
 325                    if first_item is None or any(
 326                        [True for key in first_item if type(key) is not str]
 327                    ):
 328                        raise NotImplementedError(
 329                            f"Cannot iterate through CSV file (deprecated processor {self.type})"
 330                        )
 331                    yield first_item
 332
 333                for item in reader:
 334                    if hasattr(processor, "interrupted") and processor.interrupted:
 335                        raise ProcessorInterruptedException(
 336                            "Processor interrupted while iterating through CSV file"
 337                        )
 338                    
 339                    yield item
 340
 341        elif path.suffix.lower() == ".ndjson":
 342            # In NDJSON format each line in the file is a self-contained JSON
 343            with path.open(encoding="utf-8") as infile:
 344                for line in infile:
 345                    if hasattr(processor, "interrupted") and processor.interrupted:
 346                        raise ProcessorInterruptedException(
 347                            "Processor interrupted while iterating through NDJSON file"
 348                        )
 349                    
 350                    yield json.loads(line)
 351
 352        else:
 353            raise NotImplementedError("Cannot iterate through %s file" % path.suffix)
 354
 355    def iterate_items(
 356        self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None
 357    ):
 358        """
 359        Generate mapped dataset items
 360
 361        Wrapper for _iterate_items that returns a DatasetItem, which can be
 362        accessed as a dict returning the original item or (if a mapper is
 363        available) the mapped item. Mapped or original versions of the item can
 364        also be accessed via the `original` and `mapped_object` properties of
 365        the DatasetItem.
 366
 367        Processors can define a method called `map_item` that can be used to map
 368        an item from the dataset file before it is processed any further. This is
 369        slower than storing the data file in the right format to begin with but
 370        not all data sources allow for easy 'flat' mapping of items, e.g. tweets
 371        are nested objects when retrieved from the twitter API that are easier
 372        to store as a JSON file than as a flat CSV file, and it would be a shame
 373        to throw away that data.
 374
 375        Note the two parameters warn_unmappable and map_missing. Items can be
 376        unmappable in that their structure is too different to coerce into a
 377        neat dictionary of the structure the data source expects. This makes it
 378        'unmappable' and warn_unmappable determines what happens in this case.
 379        It can also be of the right structure, but with some fields missing or
 380        incomplete. map_missing determines what happens in that case. The
 381        latter is for example possible when importing data via Zeeschuimer,
 382        which produces unstably-structured data captured from social media
 383        sites.
 384
 385        :param BasicProcessor processor:  A reference to the processor
 386        iterating the dataset.
 387        :param bool warn_unmappable:  If an item is not mappable, skip the item
 388        and log a warning
 389        :param max_unmappable:  Skip at most this many unmappable items; if
 390        more are encountered, stop iterating. `None` to never stop.
 391        :param map_missing: Indicates what to do with mapped items for which
 392        some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
 393        - 'default': fill missing fields with the default passed by map_item
 394        - 'abort': raise a MappedItemIncompleteException if a field is missing
 395        - a callback: replace missing field with the return value of the
 396          callback. The MappedItem object is passed to the callback as the
 397          first argument and the name of the missing field as the second.
 398        - a dictionary with a key for each possible missing field: replace missing
 399          field with a strategy for that field ('default', 'abort', or a callback)
 400        :param get_annotations: Whether to also fetch annotations from the database.
 401          This can be disabled to help speed up iteration.
 402        :return generator:  A generator that yields DatasetItems
 403        """
 404        unmapped_items = 0
 405        # Collect item_mapper for use with filter
 406        item_mapper = False
 407        own_processor = self.get_own_processor()
 408        if own_processor and own_processor.map_item_method_available(dataset=self):
 409            item_mapper = True
 410
 411        # Annotations are dynamically added, and we're handling them as 'extra' map_item fields.
 412        if get_annotations:
 413            annotation_fields = self.annotation_fields
 414            num_annotations = 0 if not annotation_fields else self.num_annotations()
 415            all_annotations = None
 416            # If this dataset has less than n annotations, just retrieve them all before iterating
 417            if 0 < num_annotations <= 500:
 418                # Convert to dict for faster lookup when iterating over items
 419                all_annotations = collections.defaultdict(list)
 420                for annotation in self.get_annotations():
 421                    all_annotations[annotation.item_id].append(annotation)
 422
 423        # missing field strategy can be for all fields at once, or per field
 424        # if it is per field, it is a dictionary with field names and their strategy
 425        # if it is for all fields, it may be a callback, 'abort', or 'default'
 426        default_strategy = "default"
 427        if type(map_missing) is not dict:
 428            default_strategy = map_missing
 429            map_missing = {}
 430
 431        # Loop through items
 432        for i, item in enumerate(self._iterate_items(processor)):
 433            # Save original to yield
 434            original_item = item.copy()
 435
 436            # Map item
 437            if item_mapper:
 438                try:
 439                    mapped_item = own_processor.get_mapped_item(item)
 440                except MapItemException as e:
 441                    if warn_unmappable:
 442                        self.warn_unmappable_item(
 443                            i, processor, e, warn_admins=unmapped_items is False
 444                        )
 445                        
 446                    unmapped_items += 1
 447                    if max_unmappable and unmapped_items > max_unmappable:
 448                        break
 449                    else:
 450                        continue
 451
 452                # check if fields have been marked as 'missing' in the
 453                # underlying data, and treat according to the chosen strategy
 454                if mapped_item.get_missing_fields():
 455                    for missing_field in mapped_item.get_missing_fields():
 456                        strategy = map_missing.get(missing_field, default_strategy)
 457
 458                        if callable(strategy):
 459                            # delegate handling to a callback
 460                            mapped_item.data[missing_field] = strategy(
 461                                mapped_item.data, missing_field
 462                            )
 463                        elif strategy == "abort":
 464                            # raise an exception to be handled at the processor level
 465                            raise MappedItemIncompleteException(
 466                                f"Cannot process item, field {missing_field} missing in source data."
 467                            )
 468                        elif strategy == "default":
 469                            # use whatever was passed to the object constructor
 470                            mapped_item.data[missing_field] = mapped_item.data[
 471                                missing_field
 472                            ].value
 473                        else:
 474                            raise ValueError(
 475                                "map_missing must be 'abort', 'default', or a callback."
 476                            )
 477            else:
 478                mapped_item = original_item
 479
 480            # Add possible annotation values
 481            if get_annotations and annotation_fields:
 482                # We're always handling annotated data as a MappedItem object,
 483                # even if no map_item() function is available for the data source.
 484                if not isinstance(mapped_item, MappedItem):
 485                    mapped_item = MappedItem(mapped_item)
 486
 487                # Retrieve from all annotations
 488                if all_annotations:
 489                    item_annotations = all_annotations.get(mapped_item.data["id"])
 490                # Or get annotations per item for large datasets (more queries but less memory usage)
 491                else:
 492                    item_annotations = self.get_annotations_for_item(
 493                        mapped_item.data["id"]
 494                    )
 495
 496                # Loop through annotation fields
 497                for (
 498                    annotation_field_id,
 499                    annotation_field_items,
 500                ) in annotation_fields.items():
 501                    # Set default value to an empty string
 502                    value = ""
 503                    # Set value if this item has annotations
 504                    if item_annotations:
 505                        # Items can have multiple annotations
 506                        for annotation in item_annotations:
 507                            if annotation.field_id == annotation_field_id:
 508                                value = annotation.value
 509                            if isinstance(value, list):
 510                                value = ",".join(value)
 511
 512                    # Make sure labels are unique when iterating through items
 513                    column_name = annotation_field_items["label"]
 514                    label_count = 1
 515                    while column_name in mapped_item.data:
 516                        label_count += 1
 517                        column_name = f"{annotation_field_items['label']}_{label_count}"
 518
 519                    # We're always adding an annotation value
 520                    # as an empty string, even if it's absent.
 521                    mapped_item.data[column_name] = value
 522
 523            # yield a DatasetItem, which is a dict with some special properties
 524            yield DatasetItem(
 525                mapper=item_mapper,
 526                original=original_item,
 527                mapped_object=mapped_item,
 528                **(
 529                    mapped_item.get_item_data()
 530                    if type(mapped_item) is MappedItem
 531                    else mapped_item
 532                ),
 533            )
 534
 535    def sort_and_iterate_items(
 536        self, sort="", reverse=False, chunk_size=50000, **kwargs
 537    ) -> dict:
 538        """
 539        Loop through items in a dataset, sorted by a given key.
 540
 541        This is a wrapper function for `iterate_items()` with the
 542        added functionality of sorting a dataset.
 543
 544        :param sort:				The item key that determines the sort order.
 545        :param reverse:				Whether to sort by largest values first.
 546
 547        :returns dict:				Yields iterated post
 548        """
 549
 550        def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False):
 551            """
 552            Sort items based on the given key and order.
 553
 554            :param items_to_sort:  The items to sort
 555            :param sort_key:  The key to sort by
 556            :param reverse:  Whether to sort in reverse order
 557            :return:  Sorted items
 558            """
 559            if reverse is False and (sort_key == "dataset-order" or sort_key == ""):
 560                # Sort by dataset order
 561                yield from items_to_sort
 562            elif sort_key == "dataset-order" and reverse:
 563                # Sort by dataset order in reverse
 564                yield from reversed(list(items_to_sort))
 565            else:
 566                # Sort on the basis of a column value
 567                if not convert_sort_to_float:
 568                    yield from sorted(
 569                        items_to_sort,
 570                        key=lambda x: x.get(sort_key, ""),
 571                        reverse=reverse,
 572                    )
 573                else:
 574                    # Dataset fields can contain integers and empty strings.
 575                    # Since these cannot be compared, we will convert every
 576                    # empty string to 0.
 577                    yield from sorted(
 578                        items_to_sort,
 579                        key=lambda x: convert_to_float(x.get(sort_key, ""), force=True),
 580                        reverse=reverse,
 581                    )
 582
 583        if self.num_rows < chunk_size:
 584            try:
 585                # First try to force-sort float values. If this doesn't work, it'll be alphabetical.
 586                yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True)
 587            except (TypeError, ValueError):
 588                yield from sort_items(
 589                    self.iterate_items(**kwargs),
 590                    sort,
 591                    reverse,
 592                    convert_sort_to_float=False,
 593                )
 594
 595        else:
 596            # For large datasets, we will use chunk sorting
 597            staging_area = self.get_staging_area()
 598            buffer = []
 599            chunk_files = []
 600            convert_sort_to_float = True
 601            fieldnames = self.get_columns()
 602
 603            def write_chunk(buffer, chunk_index):
 604                """
 605                Write a chunk of data to a temporary file
 606
 607                :param buffer:  The buffer containing the chunk of data
 608                :param chunk_index:  The index of the chunk
 609                :return:  The path to the temporary file
 610                """
 611                temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv")
 612                with temp_file.open("w", encoding="utf-8") as chunk_file:
 613                    writer = csv.DictWriter(chunk_file, fieldnames=fieldnames)
 614                    writer.writeheader()
 615                    writer.writerows(buffer)
 616                return temp_file
 617
 618            # Divide the dataset into sorted chunks
 619            for item in self.iterate_items(**kwargs):
 620                buffer.append(item)
 621                if len(buffer) >= chunk_size:
 622                    try:
 623                        buffer = list(
 624                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 625                        )
 626                    except (TypeError, ValueError):
 627                        convert_sort_to_float = False
 628                        buffer = list(
 629                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 630                        )
 631
 632                    chunk_files.append(write_chunk(buffer, len(chunk_files)))
 633                    buffer.clear()
 634
 635            # Sort and write any remaining items in the buffer
 636            if buffer:
 637                buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float))
 638                chunk_files.append(write_chunk(buffer, len(chunk_files)))
 639                buffer.clear()
 640
 641            # Merge sorted chunks into the final sorted file
 642            sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv")
 643            with sorted_file.open("w", encoding="utf-8") as outfile:
 644                writer = csv.DictWriter(outfile, fieldnames=self.get_columns())
 645                writer.writeheader()
 646
 647                # Open all chunk files for reading
 648                chunk_readers = [
 649                    csv.DictReader(chunk.open("r", encoding="utf-8"))
 650                    for chunk in chunk_files
 651                ]
 652                heap = []
 653
 654                # Initialize the heap with the first row from each chunk
 655                for i, reader in enumerate(chunk_readers):
 656                    try:
 657                        row = next(reader)
 658                        if sort == "dataset-order" and reverse:
 659                            # Use a reverse index for "dataset-order" and reverse=True
 660                            sort_key = -i
 661                        elif convert_sort_to_float:
 662                            # Negate numeric keys for reverse sorting
 663                            sort_key = (
 664                                -convert_to_float(row.get(sort, ""))
 665                                if reverse
 666                                else convert_to_float(row.get(sort, ""))
 667                            )
 668                        else:
 669                            if reverse:
 670                                # For reverse string sorting, invert string comparison by creating a tuple
 671                                # with an inverted string - this makes Python's tuple comparison work in reverse
 672                                sort_key = (
 673                                    tuple(-ord(c) for c in row.get(sort, "")),
 674                                    -i,
 675                                )
 676                            else:
 677                                sort_key = (row.get(sort, ""), i)
 678                        heap.append((sort_key, i, row))
 679                    except StopIteration:
 680                        pass
 681
 682                # Use a heap to merge sorted chunks
 683                import heapq
 684
 685                heapq.heapify(heap)
 686                while heap:
 687                    _, chunk_index, smallest_row = heapq.heappop(heap)
 688                    writer.writerow(smallest_row)
 689                    try:
 690                        next_row = next(chunk_readers[chunk_index])
 691                        if sort == "dataset-order" and reverse:
 692                            # Use a reverse index for "dataset-order" and reverse=True
 693                            sort_key = -chunk_index
 694                        elif convert_sort_to_float:
 695                            sort_key = (
 696                                -convert_to_float(next_row.get(sort, ""))
 697                                if reverse
 698                                else convert_to_float(next_row.get(sort, ""))
 699                            )
 700                        else:
 701                            # Use the same inverted comparison for string values
 702                            if reverse:
 703                                sort_key = (
 704                                    tuple(-ord(c) for c in next_row.get(sort, "")),
 705                                    -chunk_index,
 706                                )
 707                            else:
 708                                sort_key = (next_row.get(sort, ""), chunk_index)
 709                        heapq.heappush(heap, (sort_key, chunk_index, next_row))
 710                    except StopIteration:
 711                        pass
 712
 713            # Read the sorted file and yield each item
 714            with sorted_file.open("r", encoding="utf-8") as infile:
 715                reader = csv.DictReader(infile)
 716                for item in reader:
 717                    yield item
 718
 719            # Remove the temporary files
 720            if staging_area.is_dir():
 721                shutil.rmtree(staging_area)
 722
 723    def get_staging_area(self):
 724        """
 725        Get path to a temporary folder in which files can be stored before
 726        finishing
 727
 728        This folder must be created before use, but is guaranteed to not exist
 729        yet. The folder may be used as a staging area for the dataset data
 730        while it is being processed.
 731
 732        :return Path:  Path to folder
 733        """
 734        results_file = self.get_results_path()
 735
 736        results_dir_base = results_file.parent
 737        results_dir = results_file.name.replace(".", "") + "-staging"
 738        results_path = results_dir_base.joinpath(results_dir)
 739        index = 1
 740        while results_path.exists():
 741            results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
 742            index += 1
 743
 744        # create temporary folder
 745        results_path.mkdir()
 746
 747        # Storing the staging area with the dataset so that it can be removed later
 748        self.staging_areas.append(results_path)
 749
 750        return results_path
 751
 752    def remove_staging_areas(self):
 753        """
 754        Remove any staging areas that were created and all files contained in them.
 755        """
 756        # Remove DataSet staging areas
 757        if self.staging_areas:
 758            for staging_area in self.staging_areas:
 759                if staging_area.is_dir():
 760                    shutil.rmtree(staging_area)
 761
 762    def finish(self, num_rows=0):
 763        """
 764        Declare the dataset finished
 765        """
 766        if self.data["is_finished"]:
 767            raise RuntimeError("Cannot finish a finished dataset again")
 768
 769        self.db.update(
 770            "datasets",
 771            where={"key": self.data["key"]},
 772            data={
 773                "is_finished": True,
 774                "num_rows": num_rows,
 775                "progress": 1.0,
 776                "timestamp_finished": int(time.time()),
 777            },
 778        )
 779        self.data["is_finished"] = True
 780        self.data["num_rows"] = num_rows
 781
 782    def copy(self, shallow=True):
 783        """
 784        Copies the dataset, making a new version with a unique key
 785
 786
 787        :param bool shallow:  Shallow copy: does not copy the result file, but
 788        instead refers to the same file as the original dataset did
 789        :return Dataset:  Copied dataset
 790        """
 791        parameters = self.parameters.copy()
 792
 793        # a key is partially based on the parameters. so by setting these extra
 794        # attributes, we also ensure a unique key will be generated for the
 795        # copy
 796        # possibly todo: don't use time for uniqueness (but one shouldn't be
 797        # copying a dataset multiple times per microsecond, that's not what
 798        # this is for)
 799        parameters["copied_from"] = self.key
 800        parameters["copied_at"] = time.time()
 801
 802        copy = DataSet(
 803            parameters=parameters,
 804            db=self.db,
 805            extension=self.result_file.split(".")[-1],
 806            type=self.type,
 807            modules=self.modules,
 808        )
 809        for field in self.data:
 810            if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
 811                continue
 812
 813            copy.__setattr__(field, self.data[field])
 814
 815        if shallow:
 816            # use the same result file
 817            copy.result_file = self.result_file
 818        else:
 819            # copy to new file with new key
 820            shutil.copy(self.get_results_path(), copy.get_results_path())
 821
 822        if self.is_finished():
 823            copy.finish(self.num_rows)
 824
 825        # make sure ownership is also copied
 826        copy.copy_ownership_from(self)
 827
 828        return copy
 829
 830    def delete(self, commit=True, queue=None):
 831        """
 832        Delete the dataset, and all its children
 833
 834        Deletes both database records and result files. Note that manipulating
 835        a dataset object after it has been deleted is undefined behaviour.
 836
 837        :param bool commit:  Commit SQL DELETE query?
 838        """
 839        # first, recursively delete children
 840        children = self.db.fetchall(
 841            "SELECT * FROM datasets WHERE key_parent = %s", (self.key,)
 842        )
 843        for child in children:
 844            try:
 845                child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 846                child.delete(commit=commit)
 847            except DataSetException:
 848                # dataset already deleted - race condition?
 849                pass
 850
 851        # delete any queued jobs for this dataset
 852        try:
 853            job = Job.get_by_remote_ID(self.key, self.db, self.type)
 854            if job.is_claimed:
 855                # tell API to stop any jobs running for this dataset
 856                # level 2 = cancel job
 857                # we're not interested in the result - if the API is available,
 858                # it will do its thing, if it's not the backend is probably not
 859                # running so the job also doesn't need to be interrupted
 860                call_api(
 861                    "cancel-job",
 862                    {"remote_id": self.key, "jobtype": self.type, "level": 2},
 863                    False,
 864                )
 865
 866            # this deletes the job from the database
 867            job.finish(True)
 868
 869        except JobNotFoundException:
 870            pass
 871
 872        # delete this dataset's own annotations
 873        self.db.delete("annotations", where={"dataset": self.key}, commit=commit)
 874        # delete annotations that have been generated as part of this dataset
 875        self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit)
 876        # delete annotation fields from other datasets that were created as part of this dataset
 877        for parent_dataset in self.get_genealogy():
 878            field_deleted = False
 879            annotation_fields = parent_dataset.annotation_fields
 880            if annotation_fields:
 881                for field_id in list(annotation_fields.keys()):
 882                    if annotation_fields[field_id].get("from_dataset", "") == self.key:
 883                        del annotation_fields[field_id]
 884                        field_deleted = True
 885            if field_deleted:
 886                parent_dataset.save_annotation_fields(annotation_fields)
 887
 888        # delete dataset from database
 889        self.db.delete("datasets", where={"key": self.key}, commit=commit)
 890        self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
 891        self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
 892
 893        # delete from drive
 894        try:
 895            if self.get_results_path().exists():
 896                self.get_results_path().unlink()
 897            if self.get_results_path().with_suffix(".log").exists():
 898                self.get_results_path().with_suffix(".log").unlink()
 899            if self.get_results_folder_path().exists():
 900                shutil.rmtree(self.get_results_folder_path())
 901
 902        except FileNotFoundError:
 903            # already deleted, apparently
 904            pass
 905        except PermissionError as e:
 906            self.db.log.error(
 907                f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}"
 908            )
 909
 910    def update_children(self, **kwargs):
 911        """
 912        Update an attribute for all child datasets
 913
 914        Can be used to e.g. change the owner, version, finished status for all
 915        datasets in a tree
 916
 917        :param kwargs:  Parameters corresponding to known dataset attributes
 918        """
 919        for child in self.get_children(update=True):
 920            for attr, value in kwargs.items():
 921                child.__setattr__(attr, value)
 922
 923            child.update_children(**kwargs)
 924
 925    def is_finished(self):
 926        """
 927        Check if dataset is finished
 928        :return bool:
 929        """
 930        return bool(self.data["is_finished"])
 931
 932    def is_rankable(self, multiple_items=True):
 933        """
 934        Determine if a dataset is rankable
 935
 936        Rankable means that it is a CSV file with 'date' and 'value' columns
 937        as well as one or more item label columns
 938
 939        :param bool multiple_items:  Consider datasets with multiple items per
 940        item (e.g. word_1, word_2, etc)?
 941
 942        :return bool:  Whether the dataset is rankable or not
 943        """
 944        if (
 945            self.get_results_path().suffix != ".csv"
 946            or not self.get_results_path().exists()
 947        ):
 948            return False
 949
 950        column_options = {"date", "value", "item"}
 951        if multiple_items:
 952            column_options.add("word_1")
 953
 954        with self.get_results_path().open(encoding="utf-8") as infile:
 955            reader = csv.DictReader(infile)
 956            try:
 957                return len(set(reader.fieldnames) & column_options) >= 3
 958            except (TypeError, ValueError):
 959                return False
 960
 961    def is_accessible_by(self, username, role="owner"):
 962        """
 963        Check if dataset has given user as owner
 964
 965        :param str|User username: Username to check for
 966        :return bool:
 967        """
 968        if type(username) is not str:
 969            if hasattr(username, "get_id"):
 970                username = username.get_id()
 971            else:
 972                raise TypeError("User must be a str or User object")
 973
 974        # 'normal' owners
 975        if username in [
 976            owner
 977            for owner, meta in self.owners.items()
 978            if (role is None or meta["role"] == role)
 979        ]:
 980            return True
 981
 982        # owners that are owner by being part of a tag
 983        if username in itertools.chain(
 984            *[
 985                tagged_owners
 986                for tag, tagged_owners in self.tagged_owners.items()
 987                if (role is None or self.owners[f"tag:{tag}"]["role"] == role)
 988            ]
 989        ):
 990            return True
 991
 992        return False
 993
 994    def get_owners_users(self, role="owner"):
 995        """
 996        Get list of dataset owners
 997
 998        This returns a list of *users* that are considered owners. Tags are
 999        transparently replaced with the users with that tag.
1000
1001        :param str|None role:  Role to check for. If `None`, all owners are
1002        returned regardless of role.
1003
1004        :return set:  De-duplicated owner list
1005        """
1006        # 'normal' owners
1007        owners = [
1008            owner
1009            for owner, meta in self.owners.items()
1010            if (role is None or meta["role"] == role) and not owner.startswith("tag:")
1011        ]
1012
1013        # owners that are owner by being part of a tag
1014        owners.extend(
1015            itertools.chain(
1016                *[
1017                    tagged_owners
1018                    for tag, tagged_owners in self.tagged_owners.items()
1019                    if role is None or self.owners[f"tag:{tag}"]["role"] == role
1020                ]
1021            )
1022        )
1023
1024        # de-duplicate before returning
1025        return set(owners)
1026
1027    def get_owners(self, role="owner"):
1028        """
1029        Get list of dataset owners
1030
1031        This returns a list of all owners, and does not transparently resolve
1032        tags (like `get_owners_users` does).
1033
1034        :param str|None role:  Role to check for. If `None`, all owners are
1035        returned regardless of role.
1036
1037        :return set:  De-duplicated owner list
1038        """
1039        return [
1040            owner
1041            for owner, meta in self.owners.items()
1042            if (role is None or meta["role"] == role)
1043        ]
1044
1045    def add_owner(self, username, role="owner"):
1046        """
1047        Set dataset owner
1048
1049        If the user is already an owner, but with a different role, the role is
1050        updated. If the user is already an owner with the same role, nothing happens.
1051
1052        :param str|User username:  Username to set as owner
1053        :param str|None role:  Role to add user with.
1054        """
1055        if type(username) is not str:
1056            if hasattr(username, "get_id"):
1057                username = username.get_id()
1058            else:
1059                raise TypeError("User must be a str or User object")
1060
1061        if username not in self.owners:
1062            self.owners[username] = {"name": username, "key": self.key, "role": role}
1063            self.db.insert("datasets_owners", data=self.owners[username], safe=True)
1064
1065        elif username in self.owners and self.owners[username]["role"] != role:
1066            self.db.update(
1067                "datasets_owners",
1068                data={"role": role},
1069                where={"name": username, "key": self.key},
1070            )
1071            self.owners[username]["role"] = role
1072
1073        if username.startswith("tag:"):
1074            # this is a bit more complicated than just adding to the list of
1075            # owners, so do a full refresh
1076            self.refresh_owners()
1077
1078        # make sure children's owners remain in sync
1079        for child in self.get_children(update=True):
1080            child.add_owner(username, role)
1081            # not recursive, since we're calling it from recursive code!
1082            child.copy_ownership_from(self, recursive=False)
1083
1084    def remove_owner(self, username):
1085        """
1086        Remove dataset owner
1087
1088        If no owner is set, the dataset is assigned to the anonymous user.
1089        If the user is not an owner, nothing happens.
1090
1091        :param str|User username:  Username to set as owner
1092        """
1093        if type(username) is not str:
1094            if hasattr(username, "get_id"):
1095                username = username.get_id()
1096            else:
1097                raise TypeError("User must be a str or User object")
1098
1099        if username in self.owners:
1100            del self.owners[username]
1101            self.db.delete("datasets_owners", where={"name": username, "key": self.key})
1102
1103            if not self.owners:
1104                self.add_owner("anonymous")
1105
1106        if username in self.tagged_owners:
1107            del self.tagged_owners[username]
1108
1109        # make sure children's owners remain in sync
1110        for child in self.get_children(update=True):
1111            child.remove_owner(username)
1112            # not recursive, since we're calling it from recursive code!
1113            child.copy_ownership_from(self, recursive=False)
1114
1115    def refresh_owners(self):
1116        """
1117        Update internal owner cache
1118
1119        This makes sure that the list of *users* and *tags* which can access the
1120        dataset is up to date.
1121        """
1122        self.owners = {
1123            owner["name"]: owner
1124            for owner in self.db.fetchall(
1125                "SELECT * FROM datasets_owners WHERE key = %s", (self.key,)
1126            )
1127        }
1128
1129        # determine which users (if any) are owners of the dataset by having a
1130        # tag that is listed as an owner
1131        owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
1132        if owner_tags:
1133            tagged_owners = self.db.fetchall(
1134                "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)
1135            )
1136            self.tagged_owners = {
1137                owner_tag: [
1138                    user["name"] for user in tagged_owners if owner_tag in user["tags"]
1139                ]
1140                for owner_tag in owner_tags
1141            }
1142        else:
1143            self.tagged_owners = {}
1144
1145    def copy_ownership_from(self, dataset, recursive=True):
1146        """
1147        Copy ownership
1148
1149        This is useful to e.g. make sure a dataset's ownership stays in sync
1150        with its parent
1151
1152        :param Dataset dataset:  Parent to copy from
1153        :return:
1154        """
1155        self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
1156
1157        for role in ("owner", "viewer"):
1158            owners = dataset.get_owners(role=role)
1159            for owner in owners:
1160                self.db.insert(
1161                    "datasets_owners",
1162                    data={"key": self.key, "name": owner, "role": role},
1163                    commit=False,
1164                    safe=True,
1165                )
1166
1167        self.db.commit()
1168        if recursive:
1169            for child in self.get_children(update=True):
1170                child.copy_ownership_from(self, recursive=recursive)
1171
1172    def get_parameters(self):
1173        """
1174        Get dataset parameters
1175
1176        The dataset parameters are stored as JSON in the database - parse them
1177        and return the resulting object
1178
1179        :return:  Dataset parameters as originally stored
1180        """
1181        try:
1182            return json.loads(self.data["parameters"])
1183        except json.JSONDecodeError:
1184            return {}
1185
1186    def get_columns(self):
1187        """
1188        Returns the dataset columns.
1189
1190        Useful for processor input forms. Can deal with both CSV and NDJSON
1191        files, the latter only if a `map_item` function is available in the
1192        processor that generated it. While in other cases one could use the
1193        keys of the JSON object, this is not always possible in follow-up code
1194        that uses the 'column' names, so for consistency this function acts as
1195        if no column can be parsed if no `map_item` function exists.
1196
1197        :return list:  List of dataset columns; empty list if unable to parse
1198        """
1199        if not self.get_results_path().exists():
1200            # no file to get columns from
1201            return []
1202
1203        if (self.get_results_path().suffix.lower() == ".csv") or (
1204            self.get_results_path().suffix.lower() == ".ndjson"
1205            and self.get_own_processor() is not None
1206            and self.get_own_processor().map_item_method_available(dataset=self)
1207        ):
1208            items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100)
1209            try:
1210                keys = list(next(items).keys())
1211                if self.annotation_fields:
1212                    for annotation_field in self.annotation_fields.values():
1213                        annotation_column = annotation_field["label"]
1214                        label_count = 1
1215                        while annotation_column in keys:
1216                            label_count += 1
1217                            annotation_column = (
1218                                f"{annotation_field['label']}_{label_count}"
1219                            )
1220                        keys.append(annotation_column)
1221                columns = keys
1222            except (StopIteration, NotImplementedError):
1223                # No items or otherwise unable to iterate
1224                columns = []
1225            finally:
1226                del items
1227        else:
1228            # Filetype not CSV or an NDJSON with `map_item`
1229            columns = []
1230
1231        return columns
1232
1233    def update_label(self, label):
1234        """
1235        Update label for this dataset
1236
1237        :param str label:  	New label
1238        :return str: 		The new label, as returned by get_label
1239        """
1240        self.parameters["label"] = label
1241
1242        self.db.update(
1243            "datasets",
1244            data={"parameters": json.dumps(self.parameters)},
1245            where={"key": self.key},
1246        )
1247        return self.get_label()
1248
1249    def get_label(self, parameters=None, default="Query"):
1250        """
1251        Generate a readable label for the dataset
1252
1253        :param dict parameters:  Parameters of the dataset
1254        :param str default:  Label to use if it cannot be inferred from the
1255        parameters
1256
1257        :return str:  Label
1258        """
1259        if not parameters:
1260            parameters = self.parameters
1261
1262        if parameters.get("label"):
1263            return parameters["label"]
1264        elif parameters.get("body_query") and parameters["body_query"] != "empty":
1265            return parameters["body_query"]
1266        elif parameters.get("body_match") and parameters["body_match"] != "empty":
1267            return parameters["body_match"]
1268        elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
1269            return parameters["subject_query"]
1270        elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
1271            return parameters["subject_match"]
1272        elif parameters.get("query"):
1273            label = parameters["query"]
1274            # Some legacy datasets have lists as query data
1275            if isinstance(label, list):
1276                label = ", ".join(label)
1277
1278            label = label if len(label) < 30 else label[:25] + "..."
1279            label = label.strip().replace("\n", ", ")
1280            return label
1281        elif parameters.get("country_flag") and parameters["country_flag"] != "all":
1282            return "Flag: %s" % parameters["country_flag"]
1283        elif parameters.get("country_name") and parameters["country_name"] != "all":
1284            return "Country: %s" % parameters["country_name"]
1285        elif parameters.get("filename"):
1286            return parameters["filename"]
1287        elif parameters.get("board") and "datasource" in parameters:
1288            return parameters["datasource"] + "/" + parameters["board"]
1289        elif (
1290            "datasource" in parameters
1291            and parameters["datasource"] in self.modules.datasources
1292        ):
1293            return (
1294                self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
1295            )
1296        else:
1297            return default
1298
1299    def change_datasource(self, datasource):
1300        """
1301        Change the datasource type for this dataset
1302
1303        :param str label:  New datasource type
1304        :return str:  The new datasource type
1305        """
1306
1307        self.parameters["datasource"] = datasource
1308
1309        self.db.update(
1310            "datasets",
1311            data={"parameters": json.dumps(self.parameters)},
1312            where={"key": self.key},
1313        )
1314        return datasource
1315
1316    def reserve_result_file(self, parameters=None, extension="csv"):
1317        """
1318        Generate a unique path to the results file for this dataset
1319
1320        This generates a file name for the data file of this dataset, and makes sure
1321        no file exists or will exist at that location other than the file we
1322        expect (i.e. the data for this particular dataset).
1323
1324        :param str extension: File extension, "csv" by default
1325        :param parameters:  Dataset parameters
1326        :return bool:  Whether the file path was successfully reserved
1327        """
1328        if self.data["is_finished"]:
1329            raise RuntimeError("Cannot reserve results file for a finished dataset")
1330
1331        # Use 'random' for random post queries
1332        if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
1333            file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"]
1334        # Use country code for country flag queries
1335        elif "country_flag" in parameters and parameters["country_flag"] != "all":
1336            file = (
1337                "countryflag-"
1338                + str(parameters["country_flag"])
1339                + "-"
1340                + self.data["key"]
1341            )
1342        # Use the query string for all other queries
1343        else:
1344            query_bit = self.data["query"].replace(" ", "-").lower()
1345            query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
1346            query_bit = query_bit[:100]  # Crop to avoid OSError
1347            file = query_bit + "-" + self.data["key"]
1348            file = re.sub(r"[-]+", "-", file)
1349
1350        self.data["result_file"] = file + "." + extension.lower()
1351        index = 1
1352        while self.get_results_path().is_file():
1353            self.data["result_file"] = file + "-" + str(index) + "." + extension.lower()
1354            index += 1
1355
1356        updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
1357                                 data={"result_file": self.data["result_file"]})
1358        return updated > 0
1359
1360    def get_key(self, query, parameters, parent="", time_offset=0):
1361        """
1362        Generate a unique key for this dataset that can be used to identify it
1363
1364        The key is a hash of a combination of the query string and parameters.
1365        You never need to call this, really: it's used internally.
1366
1367        :param str query:  Query string
1368        :param parameters:  Dataset parameters
1369        :param parent: Parent dataset's key (if applicable)
1370        :param time_offset:  Offset to add to the time component of the dataset
1371        key. This can be used to ensure a unique key even if the parameters and
1372        timing is otherwise identical to an existing dataset's
1373
1374        :return str:  Dataset key
1375        """
1376        # Return a hash based on parameters
1377        # we're going to use the hash of the parameters to uniquely identify
1378        # the dataset, so make sure it's always in the same order, or we might
1379        # end up creating multiple keys for the same dataset if python
1380        # decides to return the dict in a different order
1381        param_key = collections.OrderedDict()
1382        for key in sorted(parameters):
1383            param_key[key] = parameters[key]
1384
1385        # we additionally use the current time as a salt - this should usually
1386        # ensure a unique key for the dataset. if for some reason there is a
1387        # hash collision
1388        param_key["_salt"] = int(time.time()) + time_offset
1389
1390        parent_key = str(parent) if parent else ""
1391        plain_key = repr(param_key) + str(query) + parent_key
1392        hashed_key = hash_to_md5(plain_key)
1393
1394        if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1395            # key exists, generate a new one
1396            return self.get_key(
1397                query, parameters, parent, time_offset=random.randint(1, 10)
1398            )
1399        else:
1400            return hashed_key
1401
1402    def set_key(self, key):
1403        """
1404        Change dataset key
1405
1406        In principe, keys should never be changed. But there are rare cases
1407        where it is useful to do so, in particular when importing a dataset
1408        from another 4CAT instance; in that case it makes sense to try and
1409        ensure that the key is the same as it was before. This function sets
1410        the dataset key and updates any dataset references to it.
1411
1412        :param str key:  Key to set
1413        :return str:  Key that was set. If the desired key already exists, the
1414        original key is kept.
1415        """
1416        key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1417        if key_exists or not key:
1418            return self.key
1419
1420        old_key = self.key
1421        self.db.update("datasets", data={"key": key}, where={"key": old_key})
1422
1423        # update references
1424        self.db.update(
1425            "datasets", data={"key_parent": key}, where={"key_parent": old_key}
1426        )
1427        self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1428        self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1429        self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1430
1431        # for good measure
1432        self.db.commit()
1433        self.key = key
1434
1435        return self.key
1436
1437    def get_status(self):
1438        """
1439        Get Dataset status
1440
1441        :return string: Dataset status
1442        """
1443        return self.data["status"]
1444
1445    def update_status(self, status, is_final=False):
1446        """
1447        Update dataset status
1448
1449        The status is a string that may be displayed to a user to keep them
1450        updated and informed about the progress of a dataset. No memory is kept
1451        of earlier dataset statuses; the current status is overwritten when
1452        updated.
1453
1454        Statuses are also written to the dataset log file.
1455
1456        :param string status:  Dataset status
1457        :param bool is_final:  If this is `True`, subsequent calls to this
1458        method while the object is instantiated will not update the dataset
1459        status.
1460        :return bool:  Status update successful?
1461        """
1462        if self.no_status_updates:
1463            return
1464
1465        # for presets, copy the updated status to the preset(s) this is part of
1466        if self.preset_parent is None:
1467            self.preset_parent = [
1468                parent
1469                for parent in self.get_genealogy()
1470                if parent.type.find("preset-") == 0 and parent.key != self.key
1471            ][:1]
1472
1473        if self.preset_parent:
1474            for preset_parent in self.preset_parent:
1475                if not preset_parent.is_finished():
1476                    preset_parent.update_status(status)
1477
1478        self.data["status"] = status
1479        updated = self.db.update(
1480            "datasets", where={"key": self.data["key"]}, data={"status": status}
1481        )
1482
1483        if is_final:
1484            self.no_status_updates = True
1485
1486        self.log(status)
1487
1488        return updated > 0
1489
1490    def update_progress(self, progress):
1491        """
1492        Update dataset progress
1493
1494        The progress can be used to indicate to a user how close the dataset
1495        is to completion.
1496
1497        :param float progress:  Between 0 and 1.
1498        :return:
1499        """
1500        progress = min(1, max(0, progress))  # clamp
1501        if type(progress) is int:
1502            progress = float(progress)
1503
1504        self.data["progress"] = progress
1505        updated = self.db.update(
1506            "datasets", where={"key": self.data["key"]}, data={"progress": progress}
1507        )
1508        return updated > 0
1509
1510    def get_progress(self):
1511        """
1512        Get dataset progress
1513
1514        :return float:  Progress, between 0 and 1
1515        """
1516        return self.data["progress"]
1517
1518    def finish_with_error(self, error):
1519        """
1520        Set error as final status, and finish with 0 results
1521
1522        This is a convenience function to avoid having to repeat
1523        "update_status" and "finish" a lot.
1524
1525        :param str error:  Error message for final dataset status.
1526        :return:
1527        """
1528        self.update_status(error, is_final=True)
1529        self.finish(0)
1530
1531        return None
1532
1533    def update_version(self, version):
1534        """
1535        Update software version used for this dataset
1536
1537        This can be used to verify the code that was used to process this dataset.
1538
1539        :param string version:  Version identifier
1540        :return bool:  Update successul?
1541        """
1542        try:
1543            # this fails if the processor type is unknown
1544            # edge case, but let's not crash...
1545            processor_path = self.modules.processors.get(self.data["type"]).filepath
1546        except AttributeError:
1547            processor_path = ""
1548
1549        updated = self.db.update(
1550            "datasets",
1551            where={"key": self.data["key"]},
1552            data={
1553                "software_version": version[0],
1554                "software_source": version[1],
1555                "software_file": processor_path,
1556            },
1557        )
1558
1559        return updated > 0
1560
1561    def delete_parameter(self, parameter, instant=True):
1562        """
1563        Delete a parameter from the dataset metadata
1564
1565        :param string parameter:  Parameter to delete
1566        :param bool instant:  Also delete parameters in this instance object?
1567        :return bool:  Update successul?
1568        """
1569        parameters = self.parameters.copy()
1570        if parameter in parameters:
1571            del parameters[parameter]
1572        else:
1573            return False
1574
1575        updated = self.db.update(
1576            "datasets",
1577            where={"key": self.data["key"]},
1578            data={"parameters": json.dumps(parameters)},
1579        )
1580
1581        if instant:
1582            self.parameters = parameters
1583
1584        return updated > 0
1585
1586    def get_version_url(self, file):
1587        """
1588        Get a versioned github URL for the version this dataset was processed with
1589
1590        :param file:  File to link within the repository
1591        :return:  URL, or an empty string
1592        """
1593        if not self.data["software_source"]:
1594            return ""
1595
1596        filepath = self.data.get("software_file", "")
1597        if filepath.startswith("/extensions/"):
1598            # go to root of extension
1599            filepath = "/" + "/".join(filepath.split("/")[3:])
1600
1601        return (
1602            self.data["software_source"]
1603            + "/blob/"
1604            + self.data["software_version"]
1605            + filepath
1606        )
1607
1608    def top_parent(self):
1609        """
1610        Get root dataset
1611
1612        Traverses the tree of datasets this one is part of until it finds one
1613        with no source_dataset dataset, then returns that dataset.
1614
1615        :return Dataset: Parent dataset
1616        """
1617        genealogy = self.get_genealogy()
1618        return genealogy[0]
1619
1620    def get_genealogy(self):
1621        """
1622        Get genealogy of this dataset
1623
1624        Creates a list of DataSet objects, with the first one being the
1625        'top' dataset, and each subsequent one being a child of the previous
1626        one, ending with the current dataset.
1627
1628        :return list:  Dataset genealogy, oldest dataset first
1629        """
1630        if not self._genealogy:
1631            key_parent = self.key_parent
1632            genealogy = []
1633
1634            while key_parent:
1635                try:
1636                    parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1637                except DataSetException:
1638                    break
1639
1640                genealogy.append(parent)
1641                if parent.key_parent:
1642                    key_parent = parent.key_parent
1643                else:
1644                    break
1645
1646            genealogy.reverse()
1647            self._genealogy = genealogy
1648
1649        genealogy = self._genealogy
1650        genealogy.append(self)
1651
1652        return genealogy
1653
1654    def get_children(self, update=False):
1655        """
1656        Get children of this dataset
1657
1658        :param bool update:  Update the list of children from database if True, else return cached value
1659        :return list:  List of child datasets
1660        """
1661        if self._children is not None and not update:
1662            return self._children
1663
1664        analyses = self.db.fetchall(
1665            "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC",
1666            (self.key,),
1667        )
1668        self._children = [
1669            DataSet(data=analysis, db=self.db, modules=self.modules)
1670            for analysis in analyses
1671        ]
1672        return self._children
1673
1674    def get_all_children(self, recursive=True, update=True):
1675        """
1676        Get all children of this dataset
1677
1678        Results are returned as a non-hierarchical list, i.e. the result does
1679        not reflect the actual dataset hierarchy (but all datasets in the
1680        result will have the original dataset as an ancestor somewhere)
1681
1682        :return list:  List of DataSets
1683        """
1684        children = self.get_children(update=update)
1685        results = children.copy()
1686        if recursive:
1687            for child in children:
1688                results += child.get_all_children(recursive=recursive, update=update)
1689
1690        return results
1691
1692    def nearest(self, type_filter):
1693        """
1694        Return nearest dataset that matches the given type
1695
1696        Starting with this dataset, traverse the hierarchy upwards and return
1697        whichever dataset matches the given type.
1698
1699        :param str type_filter:  Type filter. Can contain wildcards and is matched
1700        using `fnmatch.fnmatch`.
1701        :return:  Earliest matching dataset, or `None` if none match.
1702        """
1703        genealogy = self.get_genealogy()
1704        for dataset in reversed(genealogy):
1705            if fnmatch.fnmatch(dataset.type, type_filter):
1706                return dataset
1707
1708        return None
1709
1710    def get_breadcrumbs(self):
1711        """
1712        Get breadcrumbs navlink for use in permalinks
1713
1714        Returns a string representing this dataset's genealogy that may be used
1715        to uniquely identify it.
1716
1717        :return str: Nav link
1718        """
1719        if not self.key_parent:
1720            return self.key
1721
1722        genealogy = self.get_genealogy()
1723        return ",".join([d.key for d in genealogy])
1724
1725    def get_compatible_processors(self, config=None):
1726        """
1727        Get list of processors compatible with this dataset
1728
1729        Checks whether this dataset type is one that is listed as being accepted
1730        by the processor, for each known type: if the processor does not
1731        specify accepted types (via the `is_compatible_with` method), it is
1732        assumed it accepts any top-level datasets
1733
1734        :param ConfigManager|None config:  Configuration reader to determine
1735        compatibility through. This may not be the same reader the dataset was
1736        instantiated with, e.g. when checking whether some other user should
1737        be able to run processors on this dataset.
1738        :return dict:  Compatible processors, `name => class` mapping
1739        """
1740        processors = self.modules.processors
1741
1742        available = {}
1743        for processor_type, processor in processors.items():
1744            if processor.is_from_collector():
1745                continue
1746
1747            own_processor = self.get_own_processor()
1748            if own_processor and own_processor.exclude_followup_processors(
1749                processor_type
1750            ):
1751                continue
1752
1753            # consider a processor compatible if its is_compatible_with
1754            # method returns True *or* if it has no explicit compatibility
1755            # check and this dataset is top-level (i.e. has no parent)
1756            if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1757                    or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)):
1758                available[processor_type] = processor
1759
1760        return available
1761
1762    def get_place_in_queue(self, update=False):
1763        """
1764        Determine dataset's position in queue
1765
1766        If the dataset is already finished, the position is -1. Else, the
1767        position is the number of datasets to be completed before this one will
1768        be processed. A position of 0 would mean that the dataset is currently
1769        being executed, or that the backend is not running.
1770
1771        :param bool update:  Update the queue position from database if True, else return cached value
1772        :return int:  Queue position
1773        """
1774        if self.is_finished() or not self.data.get("job"):
1775            self._queue_position = -1
1776            return self._queue_position
1777        elif not update and self._queue_position is not None:
1778            # Use cached value
1779            return self._queue_position
1780        else:
1781            # Collect queue position from database via the job
1782            try:
1783                job = Job.get_by_ID(self.data["job"], self.db)
1784                self._queue_position = job.get_place_in_queue()
1785            except JobNotFoundException:
1786                self._queue_position = -1
1787
1788            return self._queue_position
1789
1790    def get_own_processor(self):
1791        """
1792        Get the processor class that produced this dataset
1793
1794        :return:  Processor class, or `None` if not available.
1795        """
1796        processor_type = self.parameters.get("type", self.data.get("type"))
1797
1798        return self.modules.processors.get(processor_type)
1799
1800    def get_available_processors(self, config=None, exclude_hidden=False):
1801        """
1802        Get list of processors that may be run for this dataset
1803
1804        Returns all compatible processors except for those that are already
1805        queued or finished and have no options. Processors that have been
1806        run but have options are included so they may be run again with a
1807        different configuration
1808
1809        :param ConfigManager|None config:  Configuration reader to determine
1810        compatibility through. This may not be the same reader the dataset was
1811        instantiated with, e.g. when checking whether some other user should
1812        be able to run processors on this dataset.
1813        :param bool exclude_hidden:  Exclude processors that should be displayed
1814        in the UI? If `False`, all processors are returned.
1815
1816        :return dict:  Available processors, `name => properties` mapping
1817        """
1818        if self.available_processors:
1819            # Update to reflect exclude_hidden parameter which may be different from last call
1820            # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1821            return {
1822                processor_type: processor
1823                for processor_type, processor in self.available_processors.items()
1824                if not exclude_hidden or not processor.is_hidden
1825            }
1826
1827        processors = self.get_compatible_processors(config=config)
1828
1829        for analysis in self.get_children(update=True):
1830            if analysis.type not in processors:
1831                continue
1832
1833            if not processors[analysis.type].get_options(config=config):
1834                # No variable options; this processor has been run so remove
1835                del processors[analysis.type]
1836                continue
1837
1838            if exclude_hidden and processors[analysis.type].is_hidden:
1839                del processors[analysis.type]
1840
1841        self.available_processors = processors
1842        return processors
1843
1844    def link_job(self, job):
1845        """
1846        Link this dataset to a job ID
1847
1848        Updates the dataset data to include a reference to the job that will be
1849        executing (or has already executed) this job.
1850
1851        Note that if no job can be found for this dataset, this method silently
1852        fails.
1853
1854        :param Job job:  The job that will run this dataset
1855
1856        :todo: If the job column ever gets used, make sure it always contains
1857               a valid value, rather than silently failing this method.
1858        """
1859        if type(job) is not Job:
1860            raise TypeError("link_job requires a Job object as its argument")
1861
1862        if "id" not in job.data:
1863            try:
1864                job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"])
1865            except JobNotFoundException:
1866                return
1867
1868        self.db.update(
1869            "datasets", where={"key": self.key}, data={"job": job.data["id"]}
1870        )
1871
1872    def link_parent(self, key_parent):
1873        """
1874        Set source_dataset key for this dataset
1875
1876        :param key_parent:  Parent key. Not checked for validity
1877        """
1878        self.db.update(
1879            "datasets", where={"key": self.key}, data={"key_parent": key_parent}
1880        )
1881
1882    def get_parent(self):
1883        """
1884        Get parent dataset
1885
1886        :return DataSet:  Parent dataset, or `None` if not applicable
1887        """
1888        return (
1889            DataSet(key=self.key_parent, db=self.db, modules=self.modules)
1890            if self.key_parent
1891            else None
1892        )
1893
1894    def detach(self):
1895        """
1896        Makes the datasets standalone, i.e. not having any source_dataset dataset
1897        """
1898        self.link_parent("")
1899
1900    def is_dataset(self):
1901        """
1902        Easy way to confirm this is a dataset.
1903        Used for checking processor and dataset compatibility,
1904        which needs to handle both processors and datasets.
1905        """
1906        return True
1907
1908    def is_top_dataset(self):
1909        """
1910        Easy way to confirm this is a top dataset.
1911        Used for checking processor and dataset compatibility,
1912        which needs to handle both processors and datasets.
1913        """
1914        if self.key_parent:
1915            return False
1916        return True
1917
1918    def is_expiring(self, config):
1919        """
1920        Determine if dataset is set to expire
1921
1922        Similar to `is_expired`, but checks if the dataset will be deleted in
1923        the future, not if it should be deleted right now.
1924
1925        :param ConfigManager config:  Configuration reader (context-aware)
1926        :return bool|int:  `False`, or the expiration date as a Unix timestamp.
1927        """
1928        # has someone opted out of deleting this?
1929        if self.parameters.get("keep"):
1930            return False
1931
1932        # is this dataset explicitly marked as expiring after a certain time?
1933        if self.parameters.get("expires-after"):
1934            return self.parameters.get("expires-after")
1935
1936        # is the data source configured to have its datasets expire?
1937        expiration = config.get("datasources.expiration", {})
1938        if not expiration.get(self.parameters.get("datasource")):
1939            return False
1940
1941        # is there a timeout for this data source?
1942        if expiration.get(self.parameters.get("datasource")).get("timeout"):
1943            return self.timestamp + expiration.get(
1944                self.parameters.get("datasource")
1945            ).get("timeout")
1946
1947        return False
1948
1949    def is_expired(self, config):
1950        """
1951        Determine if dataset should be deleted
1952
1953        Datasets can be set to expire, but when they should be deleted depends
1954        on a number of factor. This checks them all.
1955
1956        :param ConfigManager config:  Configuration reader (context-aware)
1957        :return bool:
1958        """
1959        # has someone opted out of deleting this?
1960        if not self.is_expiring(config):
1961            return False
1962
1963        # is this dataset explicitly marked as expiring after a certain time?
1964        future = (
1965            time.time() + 3600
1966        )  # ensure we don't delete datasets with invalid expiration times
1967        if (
1968            self.parameters.get("expires-after")
1969            and convert_to_int(self.parameters["expires-after"], future) < time.time()
1970        ):
1971            return True
1972
1973        # is the data source configured to have its datasets expire?
1974        expiration = config.get("datasources.expiration", {})
1975        if not expiration.get(self.parameters.get("datasource")):
1976            return False
1977
1978        # is the dataset older than the set timeout?
1979        if expiration.get(self.parameters.get("datasource")).get("timeout"):
1980            return (
1981                self.timestamp
1982                + expiration[self.parameters.get("datasource")]["timeout"]
1983                < time.time()
1984            )
1985
1986        return False
1987
1988    def is_from_collector(self):
1989        """
1990        Check if this dataset was made by a processor that collects data, i.e.
1991        a search or import worker.
1992
1993        :return bool:
1994        """
1995        return self.type.endswith("-search") or self.type.endswith("-import")
1996
1997    def get_extension(self):
1998        """
1999        Gets the file extention this dataset produces.
2000        Also checks whether the results file exists.
2001        Used for checking processor and dataset compatibility.
2002
2003        :return str extension:  Extension, e.g. `csv`
2004        """
2005        if self.get_results_path().exists():
2006            return self.get_results_path().suffix[1:]
2007
2008        return False
2009
2010    def get_media_type(self):
2011        """
2012        Gets the media type of the dataset file.
2013
2014        :return str: media type, e.g., "text"
2015        """
2016        own_processor = self.get_own_processor()
2017        if hasattr(self, "media_type"):
2018            # media type can be defined explicitly in the dataset; this is the priority
2019            return self.media_type
2020        elif own_processor is not None:
2021            # or media type can be defined in the processor
2022            # some processors can set different media types for different datasets (e.g., import_media)
2023            if hasattr(own_processor, "media_type"):
2024                return own_processor.media_type
2025
2026        # Default to text
2027        return self.parameters.get("media_type", "text")
2028
2029    def get_metadata(self):
2030        """
2031        Get dataset metadata
2032
2033        This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
2034        as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
2035        update its database (and ensure compatibility with the exporting version of 4CAT).
2036        """
2037        metadata = self.db.fetchone(
2038            "SELECT * FROM datasets WHERE key = %s", (self.key,)
2039        )
2040
2041        # get 4CAT version (presumably to ensure export is compatible with import)
2042        metadata["current_4CAT_version"] = get_software_version()
2043        return metadata
2044
2045    def get_result_url(self):
2046        """
2047        Gets the 4CAT frontend URL of a dataset file.
2048
2049        Uses the FlaskConfig attributes (i.e., SERVER_NAME and
2050        SERVER_HTTPS) plus hardcoded '/result/'.
2051        TODO: create more dynamic method of obtaining url.
2052        """
2053        filename = self.get_results_path().name
2054
2055        # we cheat a little here by using the modules' config reader, but these
2056        # will never be context-dependent values anyway
2057        url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \
2058                        self.modules.config.get("flask.server_name") + '/result/' + filename
2059        return url_to_file
2060
2061    def warn_unmappable_item(
2062        self, item_count, processor=None, error_message=None, warn_admins=True
2063    ):
2064        """
2065        Log an item that is unable to be mapped and warn administrators.
2066
2067        :param int item_count:			Item index
2068        :param Processor processor:		Processor calling function8
2069        """
2070        dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
2071
2072        # Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
2073        closest_dataset = (
2074            processor.dataset
2075            if processor is not None and processor.dataset is not None
2076            else self
2077        )
2078        # Log error to dataset log
2079        closest_dataset.log(dataset_error_message)
2080
2081        if warn_admins:
2082            if processor is not None:
2083                processor.log.warning(
2084                    f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}."
2085                )
2086            elif hasattr(self.db, "log"):
2087                # borrow the database's log handler
2088                self.db.log.warning(
2089                    f"Unable to map item all items for dataset {closest_dataset.key}."
2090                )
2091            else:
2092                # No other log available
2093                raise DataSetException(
2094                    f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn"
2095                )
2096
2097    # Annotation functions (most of it is handled in Annotations)
2098    def has_annotations(self) -> bool:
2099        """
2100        Whether this dataset has annotations
2101        """
2102
2103        annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,))
2104
2105        return True if annotation else False
2106
2107    def num_annotations(self) -> int:
2108        """
2109        Get the amount of annotations
2110        """
2111        return self.db.fetchone(
2112            "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,)
2113        )["count"]
2114
2115    def get_annotation(self, data: dict):
2116        """
2117        Retrieves a specific annotation if it exists.
2118
2119        :param data:		A dictionary with which to get the annotations from.
2120                                                To get specific annotations, include either an `id` field or
2121                                                `field_id` and `item_id` fields.
2122
2123        return Annotation:	Annotation object.
2124        """
2125
2126        if "id" not in data or ("field_id" not in data and "item_id" not in data):
2127            return None
2128
2129        if "dataset" not in data:
2130            data["dataset"] = self.key
2131
2132        return Annotation(data=data, db=self.db)
2133
2134    def get_annotations(self) -> list:
2135        """
2136        Retrieves all annotations for this dataset.
2137
2138        return list: 	List of Annotation objects.
2139        """
2140
2141        return Annotation.get_annotations_for_dataset(self.db, self.key)
2142
2143    def get_annotations_for_item(self, item_id: str) -> list:
2144        """
2145        Retrieves all annotations from this dataset for a specific item (e.g. social media post).
2146        """
2147        return Annotation.get_annotations_for_dataset(
2148            self.db, self.key, item_id=item_id
2149        )
2150
2151    def has_annotation_fields(self) -> bool:
2152        """
2153        Returns True if there's annotation fields saved tot the dataset table
2154        Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.).
2155        """
2156
2157        return True if self.annotation_fields else False
2158
2159    def get_annotation_field_labels(self) -> list:
2160        """
2161        Retrieves the saved annotation field labels for this dataset.
2162        These are stored in the annotations table.
2163
2164        :return list: List of annotation field labels.
2165        """
2166
2167        annotation_fields = self.annotation_fields
2168
2169        if not annotation_fields:
2170            return []
2171
2172        labels = [v["label"] for v in annotation_fields.values()]
2173
2174        return labels
2175
2176    def save_annotations(self, annotations: list) -> int:
2177        """
2178        Takes a list of annotations and saves them to the annotations table.
2179        If a field is not yet present in the `annotation_fields` column in
2180        the datasets table, it also adds it there.
2181
2182        :param list annotations:		List of dictionaries with annotation items. Must have `item_id`, `field_id`,
2183                                                                        and `label`.
2184                                                                        `item_id` is for the specific item being annotated (e.g. a social media post)
2185                                                                        `field_id` refers to the annotation field.
2186                                                                        `label` is a human-readable description of this annotation.
2187                                                                        E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd",
2188                                                                         "value": "Yes"}]
2189
2190        :returns int:					How many annotations were saved.
2191
2192        """
2193
2194        if not annotations:
2195            return 0
2196
2197        count = 0
2198        annotation_fields = self.annotation_fields
2199
2200        # Add some dataset data to annotations, if not present
2201        for annotation_data in annotations:
2202            # Check if the required fields are present
2203            if "item_id" not in annotation_data:
2204                raise AnnotationException(
2205                    "Can't save annotations; annotation must have an `item_id` referencing "
2206                    "the item it annotated, got %s" % annotation_data
2207                )
2208            if "field_id" not in annotation_data:
2209                raise AnnotationException(
2210                    "Can't save annotations; annotation must have a `field_id` field, "
2211                    "got %s" % annotation_data
2212                )
2213            if "label" not in annotation_data or not isinstance(
2214                annotation_data["label"], str
2215            ):
2216                raise AnnotationException(
2217                    "Can't save annotations; annotation must have a `label` field, "
2218                    "got %s" % annotation_data
2219                )
2220
2221            # Set dataset key
2222            if not annotation_data.get("dataset"):
2223                annotation_data["dataset"] = self.key
2224
2225            # Set default author to this dataset owner
2226            # If this annotation is made by a processor, it will have the processor name
2227            if not annotation_data.get("author"):
2228                annotation_data["author"] = self.get_owners()[0]
2229
2230            # Create Annotation object, which also saves it to the database
2231            # If this dataset/item ID/label combination already exists, this retrieves the
2232            # existing data and updates it with new values.
2233            Annotation(data=annotation_data, db=self.db)
2234            count += 1
2235
2236        # Save annotation fields if things changed
2237        if annotation_fields != self.annotation_fields:
2238            self.save_annotation_fields(annotation_fields)
2239
2240        return count
2241
2242    def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2243        """
2244        Save annotation field data to the datasets table (in the `annotation_fields` column).
2245        If changes to the annotation fields affect existing annotations,
2246        this function will also call `update_annotations_via_fields()` to change them.
2247
2248        :param dict new_fields:  		New annotation fields, with a field ID as key.
2249
2250        :param bool add:				Whether we're merely adding new fields
2251                                                                        or replacing the whole batch. If add is False,
2252                                                                        `new_fields` should contain all fields.
2253
2254        :return int:					The number of annotation fields saved.
2255
2256        """
2257
2258        # Get existing annotation fields to see if stuff changed.
2259        old_fields = self.annotation_fields
2260        changes = False
2261
2262        # Do some validation
2263        # Annotation field must be valid JSON.
2264        try:
2265            json.dumps(new_fields)
2266        except ValueError:
2267            raise AnnotationException(
2268                "Can't save annotation fields: not valid JSON (%s)" % new_fields
2269            )
2270
2271        # No duplicate IDs
2272        if len(new_fields) != len(set(new_fields)):
2273            raise AnnotationException(
2274                "Can't save annotation fields: field IDs must be unique"
2275            )
2276
2277        # Annotation fields must at minimum have `type` and `label` keys.
2278        for field_id, annotation_field in new_fields.items():
2279            if not isinstance(field_id, str):
2280                raise AnnotationException(
2281                    "Can't save annotation fields: field ID %s is not a valid string"
2282                    % field_id
2283                )
2284            if "label" not in annotation_field:
2285                raise AnnotationException(
2286                    "Can't save annotation fields: all fields must have a label"
2287                    % field_id
2288                )
2289            if "type" not in annotation_field:
2290                raise AnnotationException(
2291                    "Can't save annotation fields: all fields must have a type"
2292                    % field_id
2293                )
2294
2295        # Check if fields are removed
2296        if not add:
2297            for field_id in old_fields.keys():
2298                if field_id not in new_fields:
2299                    changes = True
2300
2301        # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin
2302        # dataset
2303        for field_id in new_fields.keys():
2304            if field_id in old_fields and old_fields[field_id].get("from_dataset"):
2305                old_fields[field_id]["label"] = new_fields[field_id][
2306                    "label"
2307                ]  # Only labels could've been changed
2308                new_fields[field_id] = old_fields[field_id]
2309
2310        # If we're just adding fields, add them to the old fields.
2311        # If the field already exists, overwrite the old field.
2312        if add and old_fields:
2313            all_fields = old_fields
2314            for field_id, annotation_field in new_fields.items():
2315                all_fields[field_id] = annotation_field
2316            new_fields = all_fields
2317
2318        # We're saving the new annotation fields as-is.
2319        # Ordering of fields is preserved this way.
2320        self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)})
2321        self.annotation_fields = new_fields
2322
2323        # If anything changed with the annotation fields, possibly update
2324        # existing annotations (e.g. to delete them or change their labels).
2325        if changes:
2326            Annotation.update_annotations_via_fields(
2327                self.key, old_fields, new_fields, self.db
2328            )
2329
2330        return len(new_fields)
2331
2332    def get_annotation_metadata(self) -> dict:
2333        """
2334        Retrieves all the data for this dataset from the annotations table.
2335        """
2336
2337        annotation_data = self.db.fetchall(
2338            "SELECT * FROM annotations WHERE dataset = '%s';" % self.key
2339        )
2340        return annotation_data
2341
2342    def __getattr__(self, attr):
2343        """
2344        Getter so we don't have to use .data all the time
2345
2346        :param attr:  Data key to get
2347        :return:  Value
2348        """
2349
2350        if attr in dir(self):
2351            # an explicitly defined attribute should always be called in favour
2352            # of this passthrough
2353            attribute = getattr(self, attr)
2354            return attribute
2355        elif attr in self.data:
2356            return self.data[attr]
2357        else:
2358            raise AttributeError("DataSet instance has no attribute %s" % attr)
2359
2360    def __setattr__(self, attr, value):
2361        """
2362        Setter so we can flexibly update the database
2363
2364        Also updates internal data stores (.data etc). If the attribute is
2365        unknown, it is stored within the 'parameters' attribute.
2366
2367        :param str attr:  Attribute to update
2368        :param value:  New value
2369        """
2370
2371        # don't override behaviour for *actual* class attributes
2372        if attr in dir(self):
2373            super().__setattr__(attr, value)
2374            return
2375
2376        if attr not in self.data:
2377            self.parameters[attr] = value
2378            attr = "parameters"
2379            value = self.parameters
2380
2381        if attr == "parameters":
2382            value = json.dumps(value)
2383
2384        self.db.update("datasets", where={"key": self.key}, data={attr: value})
2385
2386        self.data[attr] = value
2387
2388        if attr == "parameters":
2389            self.parameters = json.loads(value)
class DataSet(common.lib.fourcat_module.FourcatModule):
  23class DataSet(FourcatModule):
  24    """
  25    Provide interface to safely register and run operations on a dataset
  26
  27    A dataset is a collection of:
  28    - A unique identifier
  29    - A set of parameters that demarcate the data contained within
  30    - The data
  31
  32    The data is usually stored in a file on the disk; the parameters are stored
  33    in a database. The handling of the data, et cetera, is done by other
  34    workers; this class defines method to create and manipulate the dataset's
  35    properties.
  36    """
  37
  38    # Attributes must be created here to ensure getattr and setattr work properly
  39    data = None
  40    key = ""
  41
  42    _children = None
  43    available_processors = None
  44    _genealogy = None
  45    preset_parent = None
  46    parameters = None
  47    modules = None
  48    annotation_fields = None
  49
  50    owners = None
  51    tagged_owners = None
  52
  53    db = None
  54    folder = None
  55    is_new = True
  56
  57    no_status_updates = False
  58    staging_areas = None
  59    _queue_position = None
  60
  61    def __init__(
  62        self,
  63        parameters=None,
  64        key=None,
  65        job=None,
  66        data=None,
  67        db=None,
  68        parent="",
  69        extension=None,
  70        type=None,
  71        is_private=True,
  72        owner="anonymous",
  73        modules=None
  74    ):
  75        """
  76        Create new dataset object
  77
  78        If the dataset is not in the database yet, it is added.
  79
  80        :param dict parameters:  Only when creating a new dataset. Dataset
  81        parameters, free-form dictionary.
  82        :param str key: Dataset key. If given, dataset with this key is loaded.
  83        :param int job: Job ID. If given, dataset corresponding to job is
  84        loaded.
  85        :param dict data: Dataset data, corresponding to a row in the datasets
  86        database table. If not given, retrieved from database depending on key.
  87        :param db:  Database connection
  88        :param str parent:  Only when creating a new dataset. Parent dataset
  89        key to which the one being created is a child.
  90        :param str extension: Only when creating a new dataset. Extension of
  91        dataset result file.
  92        :param str type: Only when creating a new dataset. Type of the dataset,
  93        corresponding to the type property of a processor class.
  94        :param bool is_private: Only when creating a new dataset. Whether the
  95        dataset is private or public.
  96        :param str owner: Only when creating a new dataset. The user name of
  97        the dataset's creator.
  98        :param modules: Module cache. If not given, will be loaded when needed
  99        (expensive). Used to figure out what processors are compatible with
 100        this dataset.
 101        """
 102        self.db = db
 103
 104        # Ensure mutable attributes are set in __init__ as they are unique to each DataSet
 105        self.data = {}
 106        self.parameters = {}
 107        self.available_processors = {}
 108        self._genealogy = []
 109        self.staging_areas = []
 110        self.modules = modules
 111
 112        if key is not None:
 113            self.key = key
 114            current = self.db.fetchone(
 115                "SELECT * FROM datasets WHERE key = %s", (self.key,)
 116            )
 117            if not current:
 118                raise DataSetNotFoundException(
 119                    "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given"
 120                    % key
 121                )
 122
 123        elif job is not None:
 124            current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),))
 125            if not current:
 126                raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
 127
 128            self.key = current["key"]
 129        elif data is not None:
 130            current = data
 131            if (
 132                "query" not in data
 133                or "key" not in data
 134                or "parameters" not in data
 135                or "key_parent" not in data
 136            ):
 137                raise DataSetException(
 138                    "DataSet() requires a complete dataset record for its 'data' argument"
 139                )
 140
 141            self.key = current["key"]
 142        else:
 143            if parameters is None:
 144                raise DataSetException(
 145                    "DataSet() requires either 'key', or 'parameters' to be given"
 146                )
 147
 148            if not type:
 149                raise DataSetException("Datasets must have their type set explicitly")
 150
 151            query = self.get_label(parameters, default=type)
 152            self.key = self.get_key(query, parameters, parent)
 153            current = self.db.fetchone(
 154                "SELECT * FROM datasets WHERE key = %s AND query = %s",
 155                (self.key, query),
 156            )
 157
 158        if current:
 159            self.data = current
 160            self.parameters = json.loads(self.data["parameters"])
 161            self.annotation_fields = json.loads(self.data["annotation_fields"]) \
 162                if self.data.get("annotation_fields") else {}
 163            self.is_new = False
 164        else:
 165            self.data = {"type": type}  # get_own_processor needs this
 166            own_processor = self.get_own_processor()
 167            version = get_software_commit(own_processor)
 168            self.data = {
 169                "key": self.key,
 170                "query": self.get_label(parameters, default=type),
 171                "parameters": json.dumps(parameters),
 172                "result_file": "",
 173                "creator": owner,
 174                "status": "",
 175                "type": type,
 176                "timestamp": int(time.time()),
 177                "is_finished": False,
 178                "is_private": is_private,
 179                "software_version": version[0],
 180                "software_source": version[1],
 181                "software_file": "",
 182                "num_rows": 0,
 183                "progress": 0.0,
 184                "key_parent": parent,
 185                "annotation_fields": "{}"
 186            }
 187            self.parameters = parameters
 188            self.annotation_fields = {}
 189
 190            self.db.insert("datasets", data=self.data)
 191            self.refresh_owners()
 192            self.add_owner(owner)
 193
 194            # Find desired extension from processor if not explicitly set
 195            if extension is None:
 196                if own_processor:
 197                    extension = own_processor.get_extension(
 198                        parent_dataset=DataSet(key=parent, db=db, modules=self.modules)
 199                        if parent
 200                        else None
 201                    )
 202                # Still no extension, default to 'csv'
 203                if not extension:
 204                    extension = "csv"
 205
 206            # Reserve filename and update data['result_file']
 207            self.reserve_result_file(parameters, extension)
 208
 209        self.refresh_owners()
 210
 211    def check_dataset_finished(self):
 212        """
 213        Checks if dataset is finished. Returns path to results file is not empty,
 214        or 'empty_file' when there were not matches.
 215
 216        Only returns a path if the dataset is complete. In other words, if this
 217        method returns a path, a file with the complete results for this dataset
 218        will exist at that location.
 219
 220        :return: A path to the results file, 'empty_file', or `None`
 221        """
 222        if self.data["is_finished"] and self.data["num_rows"] > 0:
 223            return self.get_results_path()
 224        elif self.data["is_finished"] and self.data["num_rows"] == 0:
 225            return 'empty'
 226        else:
 227            return None
 228
 229    def get_results_path(self):
 230        """
 231        Get path to results file
 232
 233        Always returns a path, that will at some point contain the dataset
 234        data, but may not do so yet. Use this to get the location to write
 235        generated results to.
 236
 237        :return Path:  A path to the results file
 238        """
 239        # alas we need to instantiate a config reader here - no way around it
 240        if not self.folder:
 241            self.folder = self.modules.config.get('PATH_ROOT').joinpath(self.modules.config.get('PATH_DATA'))
 242        return self.folder.joinpath(self.data["result_file"])
 243
 244    def get_results_folder_path(self):
 245        """
 246        Get path to folder containing accompanying results
 247
 248        Returns a path that may not yet be created
 249
 250        :return Path:  A path to the results file
 251        """
 252        return self.get_results_path().parent.joinpath("folder_" + self.key)
 253
 254    def get_log_path(self):
 255        """
 256        Get path to dataset log file
 257
 258        Each dataset has a single log file that documents its creation. This
 259        method returns the path to that file. It is identical to the path of
 260        the dataset result file, with 'log' as its extension instead.
 261
 262        :return Path:  A path to the log file
 263        """
 264        return self.get_results_path().with_suffix(".log")
 265
 266    def clear_log(self):
 267        """
 268        Clears the dataset log file
 269
 270        If the log file does not exist, it is created empty. The log file will
 271        have the same file name as the dataset result file, with the 'log'
 272        extension.
 273        """
 274        log_path = self.get_log_path()
 275        with log_path.open("w"):
 276            pass
 277
 278    def log(self, log):
 279        """
 280        Write log message to file
 281
 282        Writes the log message to the log file on a new line, including a
 283        timestamp at the start of the line. Note that this assumes the log file
 284        already exists - it should have been created/cleared with clear_log()
 285        prior to calling this.
 286
 287        :param str log:  Log message to write
 288        """
 289        log_path = self.get_log_path()
 290        with log_path.open("a", encoding="utf-8") as outfile:
 291            outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
 292
 293    def _iterate_items(self, processor=None):
 294        """
 295        A generator that iterates through a CSV or NDJSON file
 296
 297        This is an internal method and should not be called directly. Rather,
 298        call iterate_items() and use the generated dictionary and its properties.
 299
 300        If a reference to a processor is provided, with every iteration,
 301        the processor's 'interrupted' flag is checked, and if set a
 302        ProcessorInterruptedException is raised, which by default is caught
 303        in the worker and subsequently stops execution gracefully.
 304
 305        There are two file types that can be iterated (currently): CSV files
 306        and NDJSON (newline-delimited JSON) files. In the future, one could
 307        envision adding a pathway to retrieve items from e.g. a MongoDB
 308        collection directly instead of from a static file
 309
 310        :param BasicProcessor processor:  A reference to the processor
 311        iterating the dataset.
 312        :return generator:  A generator that yields each item as a dictionary
 313        """
 314        path = self.get_results_path()
 315
 316
 317        # Yield through items one by one
 318        if path.suffix.lower() == ".csv":
 319            with path.open("rb") as infile:
 320                wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
 321                reader = csv.DictReader(wrapped_infile)
 322
 323                if not self.get_own_processor():
 324                    # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not
 325                    first_item = next(reader)
 326                    if first_item is None or any(
 327                        [True for key in first_item if type(key) is not str]
 328                    ):
 329                        raise NotImplementedError(
 330                            f"Cannot iterate through CSV file (deprecated processor {self.type})"
 331                        )
 332                    yield first_item
 333
 334                for item in reader:
 335                    if hasattr(processor, "interrupted") and processor.interrupted:
 336                        raise ProcessorInterruptedException(
 337                            "Processor interrupted while iterating through CSV file"
 338                        )
 339                    
 340                    yield item
 341
 342        elif path.suffix.lower() == ".ndjson":
 343            # In NDJSON format each line in the file is a self-contained JSON
 344            with path.open(encoding="utf-8") as infile:
 345                for line in infile:
 346                    if hasattr(processor, "interrupted") and processor.interrupted:
 347                        raise ProcessorInterruptedException(
 348                            "Processor interrupted while iterating through NDJSON file"
 349                        )
 350                    
 351                    yield json.loads(line)
 352
 353        else:
 354            raise NotImplementedError("Cannot iterate through %s file" % path.suffix)
 355
 356    def iterate_items(
 357        self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None
 358    ):
 359        """
 360        Generate mapped dataset items
 361
 362        Wrapper for _iterate_items that returns a DatasetItem, which can be
 363        accessed as a dict returning the original item or (if a mapper is
 364        available) the mapped item. Mapped or original versions of the item can
 365        also be accessed via the `original` and `mapped_object` properties of
 366        the DatasetItem.
 367
 368        Processors can define a method called `map_item` that can be used to map
 369        an item from the dataset file before it is processed any further. This is
 370        slower than storing the data file in the right format to begin with but
 371        not all data sources allow for easy 'flat' mapping of items, e.g. tweets
 372        are nested objects when retrieved from the twitter API that are easier
 373        to store as a JSON file than as a flat CSV file, and it would be a shame
 374        to throw away that data.
 375
 376        Note the two parameters warn_unmappable and map_missing. Items can be
 377        unmappable in that their structure is too different to coerce into a
 378        neat dictionary of the structure the data source expects. This makes it
 379        'unmappable' and warn_unmappable determines what happens in this case.
 380        It can also be of the right structure, but with some fields missing or
 381        incomplete. map_missing determines what happens in that case. The
 382        latter is for example possible when importing data via Zeeschuimer,
 383        which produces unstably-structured data captured from social media
 384        sites.
 385
 386        :param BasicProcessor processor:  A reference to the processor
 387        iterating the dataset.
 388        :param bool warn_unmappable:  If an item is not mappable, skip the item
 389        and log a warning
 390        :param max_unmappable:  Skip at most this many unmappable items; if
 391        more are encountered, stop iterating. `None` to never stop.
 392        :param map_missing: Indicates what to do with mapped items for which
 393        some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
 394        - 'default': fill missing fields with the default passed by map_item
 395        - 'abort': raise a MappedItemIncompleteException if a field is missing
 396        - a callback: replace missing field with the return value of the
 397          callback. The MappedItem object is passed to the callback as the
 398          first argument and the name of the missing field as the second.
 399        - a dictionary with a key for each possible missing field: replace missing
 400          field with a strategy for that field ('default', 'abort', or a callback)
 401        :param get_annotations: Whether to also fetch annotations from the database.
 402          This can be disabled to help speed up iteration.
 403        :return generator:  A generator that yields DatasetItems
 404        """
 405        unmapped_items = 0
 406        # Collect item_mapper for use with filter
 407        item_mapper = False
 408        own_processor = self.get_own_processor()
 409        if own_processor and own_processor.map_item_method_available(dataset=self):
 410            item_mapper = True
 411
 412        # Annotations are dynamically added, and we're handling them as 'extra' map_item fields.
 413        if get_annotations:
 414            annotation_fields = self.annotation_fields
 415            num_annotations = 0 if not annotation_fields else self.num_annotations()
 416            all_annotations = None
 417            # If this dataset has less than n annotations, just retrieve them all before iterating
 418            if 0 < num_annotations <= 500:
 419                # Convert to dict for faster lookup when iterating over items
 420                all_annotations = collections.defaultdict(list)
 421                for annotation in self.get_annotations():
 422                    all_annotations[annotation.item_id].append(annotation)
 423
 424        # missing field strategy can be for all fields at once, or per field
 425        # if it is per field, it is a dictionary with field names and their strategy
 426        # if it is for all fields, it may be a callback, 'abort', or 'default'
 427        default_strategy = "default"
 428        if type(map_missing) is not dict:
 429            default_strategy = map_missing
 430            map_missing = {}
 431
 432        # Loop through items
 433        for i, item in enumerate(self._iterate_items(processor)):
 434            # Save original to yield
 435            original_item = item.copy()
 436
 437            # Map item
 438            if item_mapper:
 439                try:
 440                    mapped_item = own_processor.get_mapped_item(item)
 441                except MapItemException as e:
 442                    if warn_unmappable:
 443                        self.warn_unmappable_item(
 444                            i, processor, e, warn_admins=unmapped_items is False
 445                        )
 446                        
 447                    unmapped_items += 1
 448                    if max_unmappable and unmapped_items > max_unmappable:
 449                        break
 450                    else:
 451                        continue
 452
 453                # check if fields have been marked as 'missing' in the
 454                # underlying data, and treat according to the chosen strategy
 455                if mapped_item.get_missing_fields():
 456                    for missing_field in mapped_item.get_missing_fields():
 457                        strategy = map_missing.get(missing_field, default_strategy)
 458
 459                        if callable(strategy):
 460                            # delegate handling to a callback
 461                            mapped_item.data[missing_field] = strategy(
 462                                mapped_item.data, missing_field
 463                            )
 464                        elif strategy == "abort":
 465                            # raise an exception to be handled at the processor level
 466                            raise MappedItemIncompleteException(
 467                                f"Cannot process item, field {missing_field} missing in source data."
 468                            )
 469                        elif strategy == "default":
 470                            # use whatever was passed to the object constructor
 471                            mapped_item.data[missing_field] = mapped_item.data[
 472                                missing_field
 473                            ].value
 474                        else:
 475                            raise ValueError(
 476                                "map_missing must be 'abort', 'default', or a callback."
 477                            )
 478            else:
 479                mapped_item = original_item
 480
 481            # Add possible annotation values
 482            if get_annotations and annotation_fields:
 483                # We're always handling annotated data as a MappedItem object,
 484                # even if no map_item() function is available for the data source.
 485                if not isinstance(mapped_item, MappedItem):
 486                    mapped_item = MappedItem(mapped_item)
 487
 488                # Retrieve from all annotations
 489                if all_annotations:
 490                    item_annotations = all_annotations.get(mapped_item.data["id"])
 491                # Or get annotations per item for large datasets (more queries but less memory usage)
 492                else:
 493                    item_annotations = self.get_annotations_for_item(
 494                        mapped_item.data["id"]
 495                    )
 496
 497                # Loop through annotation fields
 498                for (
 499                    annotation_field_id,
 500                    annotation_field_items,
 501                ) in annotation_fields.items():
 502                    # Set default value to an empty string
 503                    value = ""
 504                    # Set value if this item has annotations
 505                    if item_annotations:
 506                        # Items can have multiple annotations
 507                        for annotation in item_annotations:
 508                            if annotation.field_id == annotation_field_id:
 509                                value = annotation.value
 510                            if isinstance(value, list):
 511                                value = ",".join(value)
 512
 513                    # Make sure labels are unique when iterating through items
 514                    column_name = annotation_field_items["label"]
 515                    label_count = 1
 516                    while column_name in mapped_item.data:
 517                        label_count += 1
 518                        column_name = f"{annotation_field_items['label']}_{label_count}"
 519
 520                    # We're always adding an annotation value
 521                    # as an empty string, even if it's absent.
 522                    mapped_item.data[column_name] = value
 523
 524            # yield a DatasetItem, which is a dict with some special properties
 525            yield DatasetItem(
 526                mapper=item_mapper,
 527                original=original_item,
 528                mapped_object=mapped_item,
 529                **(
 530                    mapped_item.get_item_data()
 531                    if type(mapped_item) is MappedItem
 532                    else mapped_item
 533                ),
 534            )
 535
 536    def sort_and_iterate_items(
 537        self, sort="", reverse=False, chunk_size=50000, **kwargs
 538    ) -> dict:
 539        """
 540        Loop through items in a dataset, sorted by a given key.
 541
 542        This is a wrapper function for `iterate_items()` with the
 543        added functionality of sorting a dataset.
 544
 545        :param sort:				The item key that determines the sort order.
 546        :param reverse:				Whether to sort by largest values first.
 547
 548        :returns dict:				Yields iterated post
 549        """
 550
 551        def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False):
 552            """
 553            Sort items based on the given key and order.
 554
 555            :param items_to_sort:  The items to sort
 556            :param sort_key:  The key to sort by
 557            :param reverse:  Whether to sort in reverse order
 558            :return:  Sorted items
 559            """
 560            if reverse is False and (sort_key == "dataset-order" or sort_key == ""):
 561                # Sort by dataset order
 562                yield from items_to_sort
 563            elif sort_key == "dataset-order" and reverse:
 564                # Sort by dataset order in reverse
 565                yield from reversed(list(items_to_sort))
 566            else:
 567                # Sort on the basis of a column value
 568                if not convert_sort_to_float:
 569                    yield from sorted(
 570                        items_to_sort,
 571                        key=lambda x: x.get(sort_key, ""),
 572                        reverse=reverse,
 573                    )
 574                else:
 575                    # Dataset fields can contain integers and empty strings.
 576                    # Since these cannot be compared, we will convert every
 577                    # empty string to 0.
 578                    yield from sorted(
 579                        items_to_sort,
 580                        key=lambda x: convert_to_float(x.get(sort_key, ""), force=True),
 581                        reverse=reverse,
 582                    )
 583
 584        if self.num_rows < chunk_size:
 585            try:
 586                # First try to force-sort float values. If this doesn't work, it'll be alphabetical.
 587                yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True)
 588            except (TypeError, ValueError):
 589                yield from sort_items(
 590                    self.iterate_items(**kwargs),
 591                    sort,
 592                    reverse,
 593                    convert_sort_to_float=False,
 594                )
 595
 596        else:
 597            # For large datasets, we will use chunk sorting
 598            staging_area = self.get_staging_area()
 599            buffer = []
 600            chunk_files = []
 601            convert_sort_to_float = True
 602            fieldnames = self.get_columns()
 603
 604            def write_chunk(buffer, chunk_index):
 605                """
 606                Write a chunk of data to a temporary file
 607
 608                :param buffer:  The buffer containing the chunk of data
 609                :param chunk_index:  The index of the chunk
 610                :return:  The path to the temporary file
 611                """
 612                temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv")
 613                with temp_file.open("w", encoding="utf-8") as chunk_file:
 614                    writer = csv.DictWriter(chunk_file, fieldnames=fieldnames)
 615                    writer.writeheader()
 616                    writer.writerows(buffer)
 617                return temp_file
 618
 619            # Divide the dataset into sorted chunks
 620            for item in self.iterate_items(**kwargs):
 621                buffer.append(item)
 622                if len(buffer) >= chunk_size:
 623                    try:
 624                        buffer = list(
 625                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 626                        )
 627                    except (TypeError, ValueError):
 628                        convert_sort_to_float = False
 629                        buffer = list(
 630                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 631                        )
 632
 633                    chunk_files.append(write_chunk(buffer, len(chunk_files)))
 634                    buffer.clear()
 635
 636            # Sort and write any remaining items in the buffer
 637            if buffer:
 638                buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float))
 639                chunk_files.append(write_chunk(buffer, len(chunk_files)))
 640                buffer.clear()
 641
 642            # Merge sorted chunks into the final sorted file
 643            sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv")
 644            with sorted_file.open("w", encoding="utf-8") as outfile:
 645                writer = csv.DictWriter(outfile, fieldnames=self.get_columns())
 646                writer.writeheader()
 647
 648                # Open all chunk files for reading
 649                chunk_readers = [
 650                    csv.DictReader(chunk.open("r", encoding="utf-8"))
 651                    for chunk in chunk_files
 652                ]
 653                heap = []
 654
 655                # Initialize the heap with the first row from each chunk
 656                for i, reader in enumerate(chunk_readers):
 657                    try:
 658                        row = next(reader)
 659                        if sort == "dataset-order" and reverse:
 660                            # Use a reverse index for "dataset-order" and reverse=True
 661                            sort_key = -i
 662                        elif convert_sort_to_float:
 663                            # Negate numeric keys for reverse sorting
 664                            sort_key = (
 665                                -convert_to_float(row.get(sort, ""))
 666                                if reverse
 667                                else convert_to_float(row.get(sort, ""))
 668                            )
 669                        else:
 670                            if reverse:
 671                                # For reverse string sorting, invert string comparison by creating a tuple
 672                                # with an inverted string - this makes Python's tuple comparison work in reverse
 673                                sort_key = (
 674                                    tuple(-ord(c) for c in row.get(sort, "")),
 675                                    -i,
 676                                )
 677                            else:
 678                                sort_key = (row.get(sort, ""), i)
 679                        heap.append((sort_key, i, row))
 680                    except StopIteration:
 681                        pass
 682
 683                # Use a heap to merge sorted chunks
 684                import heapq
 685
 686                heapq.heapify(heap)
 687                while heap:
 688                    _, chunk_index, smallest_row = heapq.heappop(heap)
 689                    writer.writerow(smallest_row)
 690                    try:
 691                        next_row = next(chunk_readers[chunk_index])
 692                        if sort == "dataset-order" and reverse:
 693                            # Use a reverse index for "dataset-order" and reverse=True
 694                            sort_key = -chunk_index
 695                        elif convert_sort_to_float:
 696                            sort_key = (
 697                                -convert_to_float(next_row.get(sort, ""))
 698                                if reverse
 699                                else convert_to_float(next_row.get(sort, ""))
 700                            )
 701                        else:
 702                            # Use the same inverted comparison for string values
 703                            if reverse:
 704                                sort_key = (
 705                                    tuple(-ord(c) for c in next_row.get(sort, "")),
 706                                    -chunk_index,
 707                                )
 708                            else:
 709                                sort_key = (next_row.get(sort, ""), chunk_index)
 710                        heapq.heappush(heap, (sort_key, chunk_index, next_row))
 711                    except StopIteration:
 712                        pass
 713
 714            # Read the sorted file and yield each item
 715            with sorted_file.open("r", encoding="utf-8") as infile:
 716                reader = csv.DictReader(infile)
 717                for item in reader:
 718                    yield item
 719
 720            # Remove the temporary files
 721            if staging_area.is_dir():
 722                shutil.rmtree(staging_area)
 723
 724    def get_staging_area(self):
 725        """
 726        Get path to a temporary folder in which files can be stored before
 727        finishing
 728
 729        This folder must be created before use, but is guaranteed to not exist
 730        yet. The folder may be used as a staging area for the dataset data
 731        while it is being processed.
 732
 733        :return Path:  Path to folder
 734        """
 735        results_file = self.get_results_path()
 736
 737        results_dir_base = results_file.parent
 738        results_dir = results_file.name.replace(".", "") + "-staging"
 739        results_path = results_dir_base.joinpath(results_dir)
 740        index = 1
 741        while results_path.exists():
 742            results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
 743            index += 1
 744
 745        # create temporary folder
 746        results_path.mkdir()
 747
 748        # Storing the staging area with the dataset so that it can be removed later
 749        self.staging_areas.append(results_path)
 750
 751        return results_path
 752
 753    def remove_staging_areas(self):
 754        """
 755        Remove any staging areas that were created and all files contained in them.
 756        """
 757        # Remove DataSet staging areas
 758        if self.staging_areas:
 759            for staging_area in self.staging_areas:
 760                if staging_area.is_dir():
 761                    shutil.rmtree(staging_area)
 762
 763    def finish(self, num_rows=0):
 764        """
 765        Declare the dataset finished
 766        """
 767        if self.data["is_finished"]:
 768            raise RuntimeError("Cannot finish a finished dataset again")
 769
 770        self.db.update(
 771            "datasets",
 772            where={"key": self.data["key"]},
 773            data={
 774                "is_finished": True,
 775                "num_rows": num_rows,
 776                "progress": 1.0,
 777                "timestamp_finished": int(time.time()),
 778            },
 779        )
 780        self.data["is_finished"] = True
 781        self.data["num_rows"] = num_rows
 782
 783    def copy(self, shallow=True):
 784        """
 785        Copies the dataset, making a new version with a unique key
 786
 787
 788        :param bool shallow:  Shallow copy: does not copy the result file, but
 789        instead refers to the same file as the original dataset did
 790        :return Dataset:  Copied dataset
 791        """
 792        parameters = self.parameters.copy()
 793
 794        # a key is partially based on the parameters. so by setting these extra
 795        # attributes, we also ensure a unique key will be generated for the
 796        # copy
 797        # possibly todo: don't use time for uniqueness (but one shouldn't be
 798        # copying a dataset multiple times per microsecond, that's not what
 799        # this is for)
 800        parameters["copied_from"] = self.key
 801        parameters["copied_at"] = time.time()
 802
 803        copy = DataSet(
 804            parameters=parameters,
 805            db=self.db,
 806            extension=self.result_file.split(".")[-1],
 807            type=self.type,
 808            modules=self.modules,
 809        )
 810        for field in self.data:
 811            if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
 812                continue
 813
 814            copy.__setattr__(field, self.data[field])
 815
 816        if shallow:
 817            # use the same result file
 818            copy.result_file = self.result_file
 819        else:
 820            # copy to new file with new key
 821            shutil.copy(self.get_results_path(), copy.get_results_path())
 822
 823        if self.is_finished():
 824            copy.finish(self.num_rows)
 825
 826        # make sure ownership is also copied
 827        copy.copy_ownership_from(self)
 828
 829        return copy
 830
 831    def delete(self, commit=True, queue=None):
 832        """
 833        Delete the dataset, and all its children
 834
 835        Deletes both database records and result files. Note that manipulating
 836        a dataset object after it has been deleted is undefined behaviour.
 837
 838        :param bool commit:  Commit SQL DELETE query?
 839        """
 840        # first, recursively delete children
 841        children = self.db.fetchall(
 842            "SELECT * FROM datasets WHERE key_parent = %s", (self.key,)
 843        )
 844        for child in children:
 845            try:
 846                child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 847                child.delete(commit=commit)
 848            except DataSetException:
 849                # dataset already deleted - race condition?
 850                pass
 851
 852        # delete any queued jobs for this dataset
 853        try:
 854            job = Job.get_by_remote_ID(self.key, self.db, self.type)
 855            if job.is_claimed:
 856                # tell API to stop any jobs running for this dataset
 857                # level 2 = cancel job
 858                # we're not interested in the result - if the API is available,
 859                # it will do its thing, if it's not the backend is probably not
 860                # running so the job also doesn't need to be interrupted
 861                call_api(
 862                    "cancel-job",
 863                    {"remote_id": self.key, "jobtype": self.type, "level": 2},
 864                    False,
 865                )
 866
 867            # this deletes the job from the database
 868            job.finish(True)
 869
 870        except JobNotFoundException:
 871            pass
 872
 873        # delete this dataset's own annotations
 874        self.db.delete("annotations", where={"dataset": self.key}, commit=commit)
 875        # delete annotations that have been generated as part of this dataset
 876        self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit)
 877        # delete annotation fields from other datasets that were created as part of this dataset
 878        for parent_dataset in self.get_genealogy():
 879            field_deleted = False
 880            annotation_fields = parent_dataset.annotation_fields
 881            if annotation_fields:
 882                for field_id in list(annotation_fields.keys()):
 883                    if annotation_fields[field_id].get("from_dataset", "") == self.key:
 884                        del annotation_fields[field_id]
 885                        field_deleted = True
 886            if field_deleted:
 887                parent_dataset.save_annotation_fields(annotation_fields)
 888
 889        # delete dataset from database
 890        self.db.delete("datasets", where={"key": self.key}, commit=commit)
 891        self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
 892        self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
 893
 894        # delete from drive
 895        try:
 896            if self.get_results_path().exists():
 897                self.get_results_path().unlink()
 898            if self.get_results_path().with_suffix(".log").exists():
 899                self.get_results_path().with_suffix(".log").unlink()
 900            if self.get_results_folder_path().exists():
 901                shutil.rmtree(self.get_results_folder_path())
 902
 903        except FileNotFoundError:
 904            # already deleted, apparently
 905            pass
 906        except PermissionError as e:
 907            self.db.log.error(
 908                f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}"
 909            )
 910
 911    def update_children(self, **kwargs):
 912        """
 913        Update an attribute for all child datasets
 914
 915        Can be used to e.g. change the owner, version, finished status for all
 916        datasets in a tree
 917
 918        :param kwargs:  Parameters corresponding to known dataset attributes
 919        """
 920        for child in self.get_children(update=True):
 921            for attr, value in kwargs.items():
 922                child.__setattr__(attr, value)
 923
 924            child.update_children(**kwargs)
 925
 926    def is_finished(self):
 927        """
 928        Check if dataset is finished
 929        :return bool:
 930        """
 931        return bool(self.data["is_finished"])
 932
 933    def is_rankable(self, multiple_items=True):
 934        """
 935        Determine if a dataset is rankable
 936
 937        Rankable means that it is a CSV file with 'date' and 'value' columns
 938        as well as one or more item label columns
 939
 940        :param bool multiple_items:  Consider datasets with multiple items per
 941        item (e.g. word_1, word_2, etc)?
 942
 943        :return bool:  Whether the dataset is rankable or not
 944        """
 945        if (
 946            self.get_results_path().suffix != ".csv"
 947            or not self.get_results_path().exists()
 948        ):
 949            return False
 950
 951        column_options = {"date", "value", "item"}
 952        if multiple_items:
 953            column_options.add("word_1")
 954
 955        with self.get_results_path().open(encoding="utf-8") as infile:
 956            reader = csv.DictReader(infile)
 957            try:
 958                return len(set(reader.fieldnames) & column_options) >= 3
 959            except (TypeError, ValueError):
 960                return False
 961
 962    def is_accessible_by(self, username, role="owner"):
 963        """
 964        Check if dataset has given user as owner
 965
 966        :param str|User username: Username to check for
 967        :return bool:
 968        """
 969        if type(username) is not str:
 970            if hasattr(username, "get_id"):
 971                username = username.get_id()
 972            else:
 973                raise TypeError("User must be a str or User object")
 974
 975        # 'normal' owners
 976        if username in [
 977            owner
 978            for owner, meta in self.owners.items()
 979            if (role is None or meta["role"] == role)
 980        ]:
 981            return True
 982
 983        # owners that are owner by being part of a tag
 984        if username in itertools.chain(
 985            *[
 986                tagged_owners
 987                for tag, tagged_owners in self.tagged_owners.items()
 988                if (role is None or self.owners[f"tag:{tag}"]["role"] == role)
 989            ]
 990        ):
 991            return True
 992
 993        return False
 994
 995    def get_owners_users(self, role="owner"):
 996        """
 997        Get list of dataset owners
 998
 999        This returns a list of *users* that are considered owners. Tags are
1000        transparently replaced with the users with that tag.
1001
1002        :param str|None role:  Role to check for. If `None`, all owners are
1003        returned regardless of role.
1004
1005        :return set:  De-duplicated owner list
1006        """
1007        # 'normal' owners
1008        owners = [
1009            owner
1010            for owner, meta in self.owners.items()
1011            if (role is None or meta["role"] == role) and not owner.startswith("tag:")
1012        ]
1013
1014        # owners that are owner by being part of a tag
1015        owners.extend(
1016            itertools.chain(
1017                *[
1018                    tagged_owners
1019                    for tag, tagged_owners in self.tagged_owners.items()
1020                    if role is None or self.owners[f"tag:{tag}"]["role"] == role
1021                ]
1022            )
1023        )
1024
1025        # de-duplicate before returning
1026        return set(owners)
1027
1028    def get_owners(self, role="owner"):
1029        """
1030        Get list of dataset owners
1031
1032        This returns a list of all owners, and does not transparently resolve
1033        tags (like `get_owners_users` does).
1034
1035        :param str|None role:  Role to check for. If `None`, all owners are
1036        returned regardless of role.
1037
1038        :return set:  De-duplicated owner list
1039        """
1040        return [
1041            owner
1042            for owner, meta in self.owners.items()
1043            if (role is None or meta["role"] == role)
1044        ]
1045
1046    def add_owner(self, username, role="owner"):
1047        """
1048        Set dataset owner
1049
1050        If the user is already an owner, but with a different role, the role is
1051        updated. If the user is already an owner with the same role, nothing happens.
1052
1053        :param str|User username:  Username to set as owner
1054        :param str|None role:  Role to add user with.
1055        """
1056        if type(username) is not str:
1057            if hasattr(username, "get_id"):
1058                username = username.get_id()
1059            else:
1060                raise TypeError("User must be a str or User object")
1061
1062        if username not in self.owners:
1063            self.owners[username] = {"name": username, "key": self.key, "role": role}
1064            self.db.insert("datasets_owners", data=self.owners[username], safe=True)
1065
1066        elif username in self.owners and self.owners[username]["role"] != role:
1067            self.db.update(
1068                "datasets_owners",
1069                data={"role": role},
1070                where={"name": username, "key": self.key},
1071            )
1072            self.owners[username]["role"] = role
1073
1074        if username.startswith("tag:"):
1075            # this is a bit more complicated than just adding to the list of
1076            # owners, so do a full refresh
1077            self.refresh_owners()
1078
1079        # make sure children's owners remain in sync
1080        for child in self.get_children(update=True):
1081            child.add_owner(username, role)
1082            # not recursive, since we're calling it from recursive code!
1083            child.copy_ownership_from(self, recursive=False)
1084
1085    def remove_owner(self, username):
1086        """
1087        Remove dataset owner
1088
1089        If no owner is set, the dataset is assigned to the anonymous user.
1090        If the user is not an owner, nothing happens.
1091
1092        :param str|User username:  Username to set as owner
1093        """
1094        if type(username) is not str:
1095            if hasattr(username, "get_id"):
1096                username = username.get_id()
1097            else:
1098                raise TypeError("User must be a str or User object")
1099
1100        if username in self.owners:
1101            del self.owners[username]
1102            self.db.delete("datasets_owners", where={"name": username, "key": self.key})
1103
1104            if not self.owners:
1105                self.add_owner("anonymous")
1106
1107        if username in self.tagged_owners:
1108            del self.tagged_owners[username]
1109
1110        # make sure children's owners remain in sync
1111        for child in self.get_children(update=True):
1112            child.remove_owner(username)
1113            # not recursive, since we're calling it from recursive code!
1114            child.copy_ownership_from(self, recursive=False)
1115
1116    def refresh_owners(self):
1117        """
1118        Update internal owner cache
1119
1120        This makes sure that the list of *users* and *tags* which can access the
1121        dataset is up to date.
1122        """
1123        self.owners = {
1124            owner["name"]: owner
1125            for owner in self.db.fetchall(
1126                "SELECT * FROM datasets_owners WHERE key = %s", (self.key,)
1127            )
1128        }
1129
1130        # determine which users (if any) are owners of the dataset by having a
1131        # tag that is listed as an owner
1132        owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
1133        if owner_tags:
1134            tagged_owners = self.db.fetchall(
1135                "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)
1136            )
1137            self.tagged_owners = {
1138                owner_tag: [
1139                    user["name"] for user in tagged_owners if owner_tag in user["tags"]
1140                ]
1141                for owner_tag in owner_tags
1142            }
1143        else:
1144            self.tagged_owners = {}
1145
1146    def copy_ownership_from(self, dataset, recursive=True):
1147        """
1148        Copy ownership
1149
1150        This is useful to e.g. make sure a dataset's ownership stays in sync
1151        with its parent
1152
1153        :param Dataset dataset:  Parent to copy from
1154        :return:
1155        """
1156        self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
1157
1158        for role in ("owner", "viewer"):
1159            owners = dataset.get_owners(role=role)
1160            for owner in owners:
1161                self.db.insert(
1162                    "datasets_owners",
1163                    data={"key": self.key, "name": owner, "role": role},
1164                    commit=False,
1165                    safe=True,
1166                )
1167
1168        self.db.commit()
1169        if recursive:
1170            for child in self.get_children(update=True):
1171                child.copy_ownership_from(self, recursive=recursive)
1172
1173    def get_parameters(self):
1174        """
1175        Get dataset parameters
1176
1177        The dataset parameters are stored as JSON in the database - parse them
1178        and return the resulting object
1179
1180        :return:  Dataset parameters as originally stored
1181        """
1182        try:
1183            return json.loads(self.data["parameters"])
1184        except json.JSONDecodeError:
1185            return {}
1186
1187    def get_columns(self):
1188        """
1189        Returns the dataset columns.
1190
1191        Useful for processor input forms. Can deal with both CSV and NDJSON
1192        files, the latter only if a `map_item` function is available in the
1193        processor that generated it. While in other cases one could use the
1194        keys of the JSON object, this is not always possible in follow-up code
1195        that uses the 'column' names, so for consistency this function acts as
1196        if no column can be parsed if no `map_item` function exists.
1197
1198        :return list:  List of dataset columns; empty list if unable to parse
1199        """
1200        if not self.get_results_path().exists():
1201            # no file to get columns from
1202            return []
1203
1204        if (self.get_results_path().suffix.lower() == ".csv") or (
1205            self.get_results_path().suffix.lower() == ".ndjson"
1206            and self.get_own_processor() is not None
1207            and self.get_own_processor().map_item_method_available(dataset=self)
1208        ):
1209            items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100)
1210            try:
1211                keys = list(next(items).keys())
1212                if self.annotation_fields:
1213                    for annotation_field in self.annotation_fields.values():
1214                        annotation_column = annotation_field["label"]
1215                        label_count = 1
1216                        while annotation_column in keys:
1217                            label_count += 1
1218                            annotation_column = (
1219                                f"{annotation_field['label']}_{label_count}"
1220                            )
1221                        keys.append(annotation_column)
1222                columns = keys
1223            except (StopIteration, NotImplementedError):
1224                # No items or otherwise unable to iterate
1225                columns = []
1226            finally:
1227                del items
1228        else:
1229            # Filetype not CSV or an NDJSON with `map_item`
1230            columns = []
1231
1232        return columns
1233
1234    def update_label(self, label):
1235        """
1236        Update label for this dataset
1237
1238        :param str label:  	New label
1239        :return str: 		The new label, as returned by get_label
1240        """
1241        self.parameters["label"] = label
1242
1243        self.db.update(
1244            "datasets",
1245            data={"parameters": json.dumps(self.parameters)},
1246            where={"key": self.key},
1247        )
1248        return self.get_label()
1249
1250    def get_label(self, parameters=None, default="Query"):
1251        """
1252        Generate a readable label for the dataset
1253
1254        :param dict parameters:  Parameters of the dataset
1255        :param str default:  Label to use if it cannot be inferred from the
1256        parameters
1257
1258        :return str:  Label
1259        """
1260        if not parameters:
1261            parameters = self.parameters
1262
1263        if parameters.get("label"):
1264            return parameters["label"]
1265        elif parameters.get("body_query") and parameters["body_query"] != "empty":
1266            return parameters["body_query"]
1267        elif parameters.get("body_match") and parameters["body_match"] != "empty":
1268            return parameters["body_match"]
1269        elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
1270            return parameters["subject_query"]
1271        elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
1272            return parameters["subject_match"]
1273        elif parameters.get("query"):
1274            label = parameters["query"]
1275            # Some legacy datasets have lists as query data
1276            if isinstance(label, list):
1277                label = ", ".join(label)
1278
1279            label = label if len(label) < 30 else label[:25] + "..."
1280            label = label.strip().replace("\n", ", ")
1281            return label
1282        elif parameters.get("country_flag") and parameters["country_flag"] != "all":
1283            return "Flag: %s" % parameters["country_flag"]
1284        elif parameters.get("country_name") and parameters["country_name"] != "all":
1285            return "Country: %s" % parameters["country_name"]
1286        elif parameters.get("filename"):
1287            return parameters["filename"]
1288        elif parameters.get("board") and "datasource" in parameters:
1289            return parameters["datasource"] + "/" + parameters["board"]
1290        elif (
1291            "datasource" in parameters
1292            and parameters["datasource"] in self.modules.datasources
1293        ):
1294            return (
1295                self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
1296            )
1297        else:
1298            return default
1299
1300    def change_datasource(self, datasource):
1301        """
1302        Change the datasource type for this dataset
1303
1304        :param str label:  New datasource type
1305        :return str:  The new datasource type
1306        """
1307
1308        self.parameters["datasource"] = datasource
1309
1310        self.db.update(
1311            "datasets",
1312            data={"parameters": json.dumps(self.parameters)},
1313            where={"key": self.key},
1314        )
1315        return datasource
1316
1317    def reserve_result_file(self, parameters=None, extension="csv"):
1318        """
1319        Generate a unique path to the results file for this dataset
1320
1321        This generates a file name for the data file of this dataset, and makes sure
1322        no file exists or will exist at that location other than the file we
1323        expect (i.e. the data for this particular dataset).
1324
1325        :param str extension: File extension, "csv" by default
1326        :param parameters:  Dataset parameters
1327        :return bool:  Whether the file path was successfully reserved
1328        """
1329        if self.data["is_finished"]:
1330            raise RuntimeError("Cannot reserve results file for a finished dataset")
1331
1332        # Use 'random' for random post queries
1333        if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
1334            file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"]
1335        # Use country code for country flag queries
1336        elif "country_flag" in parameters and parameters["country_flag"] != "all":
1337            file = (
1338                "countryflag-"
1339                + str(parameters["country_flag"])
1340                + "-"
1341                + self.data["key"]
1342            )
1343        # Use the query string for all other queries
1344        else:
1345            query_bit = self.data["query"].replace(" ", "-").lower()
1346            query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
1347            query_bit = query_bit[:100]  # Crop to avoid OSError
1348            file = query_bit + "-" + self.data["key"]
1349            file = re.sub(r"[-]+", "-", file)
1350
1351        self.data["result_file"] = file + "." + extension.lower()
1352        index = 1
1353        while self.get_results_path().is_file():
1354            self.data["result_file"] = file + "-" + str(index) + "." + extension.lower()
1355            index += 1
1356
1357        updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
1358                                 data={"result_file": self.data["result_file"]})
1359        return updated > 0
1360
1361    def get_key(self, query, parameters, parent="", time_offset=0):
1362        """
1363        Generate a unique key for this dataset that can be used to identify it
1364
1365        The key is a hash of a combination of the query string and parameters.
1366        You never need to call this, really: it's used internally.
1367
1368        :param str query:  Query string
1369        :param parameters:  Dataset parameters
1370        :param parent: Parent dataset's key (if applicable)
1371        :param time_offset:  Offset to add to the time component of the dataset
1372        key. This can be used to ensure a unique key even if the parameters and
1373        timing is otherwise identical to an existing dataset's
1374
1375        :return str:  Dataset key
1376        """
1377        # Return a hash based on parameters
1378        # we're going to use the hash of the parameters to uniquely identify
1379        # the dataset, so make sure it's always in the same order, or we might
1380        # end up creating multiple keys for the same dataset if python
1381        # decides to return the dict in a different order
1382        param_key = collections.OrderedDict()
1383        for key in sorted(parameters):
1384            param_key[key] = parameters[key]
1385
1386        # we additionally use the current time as a salt - this should usually
1387        # ensure a unique key for the dataset. if for some reason there is a
1388        # hash collision
1389        param_key["_salt"] = int(time.time()) + time_offset
1390
1391        parent_key = str(parent) if parent else ""
1392        plain_key = repr(param_key) + str(query) + parent_key
1393        hashed_key = hash_to_md5(plain_key)
1394
1395        if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1396            # key exists, generate a new one
1397            return self.get_key(
1398                query, parameters, parent, time_offset=random.randint(1, 10)
1399            )
1400        else:
1401            return hashed_key
1402
1403    def set_key(self, key):
1404        """
1405        Change dataset key
1406
1407        In principe, keys should never be changed. But there are rare cases
1408        where it is useful to do so, in particular when importing a dataset
1409        from another 4CAT instance; in that case it makes sense to try and
1410        ensure that the key is the same as it was before. This function sets
1411        the dataset key and updates any dataset references to it.
1412
1413        :param str key:  Key to set
1414        :return str:  Key that was set. If the desired key already exists, the
1415        original key is kept.
1416        """
1417        key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1418        if key_exists or not key:
1419            return self.key
1420
1421        old_key = self.key
1422        self.db.update("datasets", data={"key": key}, where={"key": old_key})
1423
1424        # update references
1425        self.db.update(
1426            "datasets", data={"key_parent": key}, where={"key_parent": old_key}
1427        )
1428        self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1429        self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1430        self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1431
1432        # for good measure
1433        self.db.commit()
1434        self.key = key
1435
1436        return self.key
1437
1438    def get_status(self):
1439        """
1440        Get Dataset status
1441
1442        :return string: Dataset status
1443        """
1444        return self.data["status"]
1445
1446    def update_status(self, status, is_final=False):
1447        """
1448        Update dataset status
1449
1450        The status is a string that may be displayed to a user to keep them
1451        updated and informed about the progress of a dataset. No memory is kept
1452        of earlier dataset statuses; the current status is overwritten when
1453        updated.
1454
1455        Statuses are also written to the dataset log file.
1456
1457        :param string status:  Dataset status
1458        :param bool is_final:  If this is `True`, subsequent calls to this
1459        method while the object is instantiated will not update the dataset
1460        status.
1461        :return bool:  Status update successful?
1462        """
1463        if self.no_status_updates:
1464            return
1465
1466        # for presets, copy the updated status to the preset(s) this is part of
1467        if self.preset_parent is None:
1468            self.preset_parent = [
1469                parent
1470                for parent in self.get_genealogy()
1471                if parent.type.find("preset-") == 0 and parent.key != self.key
1472            ][:1]
1473
1474        if self.preset_parent:
1475            for preset_parent in self.preset_parent:
1476                if not preset_parent.is_finished():
1477                    preset_parent.update_status(status)
1478
1479        self.data["status"] = status
1480        updated = self.db.update(
1481            "datasets", where={"key": self.data["key"]}, data={"status": status}
1482        )
1483
1484        if is_final:
1485            self.no_status_updates = True
1486
1487        self.log(status)
1488
1489        return updated > 0
1490
1491    def update_progress(self, progress):
1492        """
1493        Update dataset progress
1494
1495        The progress can be used to indicate to a user how close the dataset
1496        is to completion.
1497
1498        :param float progress:  Between 0 and 1.
1499        :return:
1500        """
1501        progress = min(1, max(0, progress))  # clamp
1502        if type(progress) is int:
1503            progress = float(progress)
1504
1505        self.data["progress"] = progress
1506        updated = self.db.update(
1507            "datasets", where={"key": self.data["key"]}, data={"progress": progress}
1508        )
1509        return updated > 0
1510
1511    def get_progress(self):
1512        """
1513        Get dataset progress
1514
1515        :return float:  Progress, between 0 and 1
1516        """
1517        return self.data["progress"]
1518
1519    def finish_with_error(self, error):
1520        """
1521        Set error as final status, and finish with 0 results
1522
1523        This is a convenience function to avoid having to repeat
1524        "update_status" and "finish" a lot.
1525
1526        :param str error:  Error message for final dataset status.
1527        :return:
1528        """
1529        self.update_status(error, is_final=True)
1530        self.finish(0)
1531
1532        return None
1533
1534    def update_version(self, version):
1535        """
1536        Update software version used for this dataset
1537
1538        This can be used to verify the code that was used to process this dataset.
1539
1540        :param string version:  Version identifier
1541        :return bool:  Update successul?
1542        """
1543        try:
1544            # this fails if the processor type is unknown
1545            # edge case, but let's not crash...
1546            processor_path = self.modules.processors.get(self.data["type"]).filepath
1547        except AttributeError:
1548            processor_path = ""
1549
1550        updated = self.db.update(
1551            "datasets",
1552            where={"key": self.data["key"]},
1553            data={
1554                "software_version": version[0],
1555                "software_source": version[1],
1556                "software_file": processor_path,
1557            },
1558        )
1559
1560        return updated > 0
1561
1562    def delete_parameter(self, parameter, instant=True):
1563        """
1564        Delete a parameter from the dataset metadata
1565
1566        :param string parameter:  Parameter to delete
1567        :param bool instant:  Also delete parameters in this instance object?
1568        :return bool:  Update successul?
1569        """
1570        parameters = self.parameters.copy()
1571        if parameter in parameters:
1572            del parameters[parameter]
1573        else:
1574            return False
1575
1576        updated = self.db.update(
1577            "datasets",
1578            where={"key": self.data["key"]},
1579            data={"parameters": json.dumps(parameters)},
1580        )
1581
1582        if instant:
1583            self.parameters = parameters
1584
1585        return updated > 0
1586
1587    def get_version_url(self, file):
1588        """
1589        Get a versioned github URL for the version this dataset was processed with
1590
1591        :param file:  File to link within the repository
1592        :return:  URL, or an empty string
1593        """
1594        if not self.data["software_source"]:
1595            return ""
1596
1597        filepath = self.data.get("software_file", "")
1598        if filepath.startswith("/extensions/"):
1599            # go to root of extension
1600            filepath = "/" + "/".join(filepath.split("/")[3:])
1601
1602        return (
1603            self.data["software_source"]
1604            + "/blob/"
1605            + self.data["software_version"]
1606            + filepath
1607        )
1608
1609    def top_parent(self):
1610        """
1611        Get root dataset
1612
1613        Traverses the tree of datasets this one is part of until it finds one
1614        with no source_dataset dataset, then returns that dataset.
1615
1616        :return Dataset: Parent dataset
1617        """
1618        genealogy = self.get_genealogy()
1619        return genealogy[0]
1620
1621    def get_genealogy(self):
1622        """
1623        Get genealogy of this dataset
1624
1625        Creates a list of DataSet objects, with the first one being the
1626        'top' dataset, and each subsequent one being a child of the previous
1627        one, ending with the current dataset.
1628
1629        :return list:  Dataset genealogy, oldest dataset first
1630        """
1631        if not self._genealogy:
1632            key_parent = self.key_parent
1633            genealogy = []
1634
1635            while key_parent:
1636                try:
1637                    parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1638                except DataSetException:
1639                    break
1640
1641                genealogy.append(parent)
1642                if parent.key_parent:
1643                    key_parent = parent.key_parent
1644                else:
1645                    break
1646
1647            genealogy.reverse()
1648            self._genealogy = genealogy
1649
1650        genealogy = self._genealogy
1651        genealogy.append(self)
1652
1653        return genealogy
1654
1655    def get_children(self, update=False):
1656        """
1657        Get children of this dataset
1658
1659        :param bool update:  Update the list of children from database if True, else return cached value
1660        :return list:  List of child datasets
1661        """
1662        if self._children is not None and not update:
1663            return self._children
1664
1665        analyses = self.db.fetchall(
1666            "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC",
1667            (self.key,),
1668        )
1669        self._children = [
1670            DataSet(data=analysis, db=self.db, modules=self.modules)
1671            for analysis in analyses
1672        ]
1673        return self._children
1674
1675    def get_all_children(self, recursive=True, update=True):
1676        """
1677        Get all children of this dataset
1678
1679        Results are returned as a non-hierarchical list, i.e. the result does
1680        not reflect the actual dataset hierarchy (but all datasets in the
1681        result will have the original dataset as an ancestor somewhere)
1682
1683        :return list:  List of DataSets
1684        """
1685        children = self.get_children(update=update)
1686        results = children.copy()
1687        if recursive:
1688            for child in children:
1689                results += child.get_all_children(recursive=recursive, update=update)
1690
1691        return results
1692
1693    def nearest(self, type_filter):
1694        """
1695        Return nearest dataset that matches the given type
1696
1697        Starting with this dataset, traverse the hierarchy upwards and return
1698        whichever dataset matches the given type.
1699
1700        :param str type_filter:  Type filter. Can contain wildcards and is matched
1701        using `fnmatch.fnmatch`.
1702        :return:  Earliest matching dataset, or `None` if none match.
1703        """
1704        genealogy = self.get_genealogy()
1705        for dataset in reversed(genealogy):
1706            if fnmatch.fnmatch(dataset.type, type_filter):
1707                return dataset
1708
1709        return None
1710
1711    def get_breadcrumbs(self):
1712        """
1713        Get breadcrumbs navlink for use in permalinks
1714
1715        Returns a string representing this dataset's genealogy that may be used
1716        to uniquely identify it.
1717
1718        :return str: Nav link
1719        """
1720        if not self.key_parent:
1721            return self.key
1722
1723        genealogy = self.get_genealogy()
1724        return ",".join([d.key for d in genealogy])
1725
1726    def get_compatible_processors(self, config=None):
1727        """
1728        Get list of processors compatible with this dataset
1729
1730        Checks whether this dataset type is one that is listed as being accepted
1731        by the processor, for each known type: if the processor does not
1732        specify accepted types (via the `is_compatible_with` method), it is
1733        assumed it accepts any top-level datasets
1734
1735        :param ConfigManager|None config:  Configuration reader to determine
1736        compatibility through. This may not be the same reader the dataset was
1737        instantiated with, e.g. when checking whether some other user should
1738        be able to run processors on this dataset.
1739        :return dict:  Compatible processors, `name => class` mapping
1740        """
1741        processors = self.modules.processors
1742
1743        available = {}
1744        for processor_type, processor in processors.items():
1745            if processor.is_from_collector():
1746                continue
1747
1748            own_processor = self.get_own_processor()
1749            if own_processor and own_processor.exclude_followup_processors(
1750                processor_type
1751            ):
1752                continue
1753
1754            # consider a processor compatible if its is_compatible_with
1755            # method returns True *or* if it has no explicit compatibility
1756            # check and this dataset is top-level (i.e. has no parent)
1757            if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1758                    or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)):
1759                available[processor_type] = processor
1760
1761        return available
1762
1763    def get_place_in_queue(self, update=False):
1764        """
1765        Determine dataset's position in queue
1766
1767        If the dataset is already finished, the position is -1. Else, the
1768        position is the number of datasets to be completed before this one will
1769        be processed. A position of 0 would mean that the dataset is currently
1770        being executed, or that the backend is not running.
1771
1772        :param bool update:  Update the queue position from database if True, else return cached value
1773        :return int:  Queue position
1774        """
1775        if self.is_finished() or not self.data.get("job"):
1776            self._queue_position = -1
1777            return self._queue_position
1778        elif not update and self._queue_position is not None:
1779            # Use cached value
1780            return self._queue_position
1781        else:
1782            # Collect queue position from database via the job
1783            try:
1784                job = Job.get_by_ID(self.data["job"], self.db)
1785                self._queue_position = job.get_place_in_queue()
1786            except JobNotFoundException:
1787                self._queue_position = -1
1788
1789            return self._queue_position
1790
1791    def get_own_processor(self):
1792        """
1793        Get the processor class that produced this dataset
1794
1795        :return:  Processor class, or `None` if not available.
1796        """
1797        processor_type = self.parameters.get("type", self.data.get("type"))
1798
1799        return self.modules.processors.get(processor_type)
1800
1801    def get_available_processors(self, config=None, exclude_hidden=False):
1802        """
1803        Get list of processors that may be run for this dataset
1804
1805        Returns all compatible processors except for those that are already
1806        queued or finished and have no options. Processors that have been
1807        run but have options are included so they may be run again with a
1808        different configuration
1809
1810        :param ConfigManager|None config:  Configuration reader to determine
1811        compatibility through. This may not be the same reader the dataset was
1812        instantiated with, e.g. when checking whether some other user should
1813        be able to run processors on this dataset.
1814        :param bool exclude_hidden:  Exclude processors that should be displayed
1815        in the UI? If `False`, all processors are returned.
1816
1817        :return dict:  Available processors, `name => properties` mapping
1818        """
1819        if self.available_processors:
1820            # Update to reflect exclude_hidden parameter which may be different from last call
1821            # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1822            return {
1823                processor_type: processor
1824                for processor_type, processor in self.available_processors.items()
1825                if not exclude_hidden or not processor.is_hidden
1826            }
1827
1828        processors = self.get_compatible_processors(config=config)
1829
1830        for analysis in self.get_children(update=True):
1831            if analysis.type not in processors:
1832                continue
1833
1834            if not processors[analysis.type].get_options(config=config):
1835                # No variable options; this processor has been run so remove
1836                del processors[analysis.type]
1837                continue
1838
1839            if exclude_hidden and processors[analysis.type].is_hidden:
1840                del processors[analysis.type]
1841
1842        self.available_processors = processors
1843        return processors
1844
1845    def link_job(self, job):
1846        """
1847        Link this dataset to a job ID
1848
1849        Updates the dataset data to include a reference to the job that will be
1850        executing (or has already executed) this job.
1851
1852        Note that if no job can be found for this dataset, this method silently
1853        fails.
1854
1855        :param Job job:  The job that will run this dataset
1856
1857        :todo: If the job column ever gets used, make sure it always contains
1858               a valid value, rather than silently failing this method.
1859        """
1860        if type(job) is not Job:
1861            raise TypeError("link_job requires a Job object as its argument")
1862
1863        if "id" not in job.data:
1864            try:
1865                job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"])
1866            except JobNotFoundException:
1867                return
1868
1869        self.db.update(
1870            "datasets", where={"key": self.key}, data={"job": job.data["id"]}
1871        )
1872
1873    def link_parent(self, key_parent):
1874        """
1875        Set source_dataset key for this dataset
1876
1877        :param key_parent:  Parent key. Not checked for validity
1878        """
1879        self.db.update(
1880            "datasets", where={"key": self.key}, data={"key_parent": key_parent}
1881        )
1882
1883    def get_parent(self):
1884        """
1885        Get parent dataset
1886
1887        :return DataSet:  Parent dataset, or `None` if not applicable
1888        """
1889        return (
1890            DataSet(key=self.key_parent, db=self.db, modules=self.modules)
1891            if self.key_parent
1892            else None
1893        )
1894
1895    def detach(self):
1896        """
1897        Makes the datasets standalone, i.e. not having any source_dataset dataset
1898        """
1899        self.link_parent("")
1900
1901    def is_dataset(self):
1902        """
1903        Easy way to confirm this is a dataset.
1904        Used for checking processor and dataset compatibility,
1905        which needs to handle both processors and datasets.
1906        """
1907        return True
1908
1909    def is_top_dataset(self):
1910        """
1911        Easy way to confirm this is a top dataset.
1912        Used for checking processor and dataset compatibility,
1913        which needs to handle both processors and datasets.
1914        """
1915        if self.key_parent:
1916            return False
1917        return True
1918
1919    def is_expiring(self, config):
1920        """
1921        Determine if dataset is set to expire
1922
1923        Similar to `is_expired`, but checks if the dataset will be deleted in
1924        the future, not if it should be deleted right now.
1925
1926        :param ConfigManager config:  Configuration reader (context-aware)
1927        :return bool|int:  `False`, or the expiration date as a Unix timestamp.
1928        """
1929        # has someone opted out of deleting this?
1930        if self.parameters.get("keep"):
1931            return False
1932
1933        # is this dataset explicitly marked as expiring after a certain time?
1934        if self.parameters.get("expires-after"):
1935            return self.parameters.get("expires-after")
1936
1937        # is the data source configured to have its datasets expire?
1938        expiration = config.get("datasources.expiration", {})
1939        if not expiration.get(self.parameters.get("datasource")):
1940            return False
1941
1942        # is there a timeout for this data source?
1943        if expiration.get(self.parameters.get("datasource")).get("timeout"):
1944            return self.timestamp + expiration.get(
1945                self.parameters.get("datasource")
1946            ).get("timeout")
1947
1948        return False
1949
1950    def is_expired(self, config):
1951        """
1952        Determine if dataset should be deleted
1953
1954        Datasets can be set to expire, but when they should be deleted depends
1955        on a number of factor. This checks them all.
1956
1957        :param ConfigManager config:  Configuration reader (context-aware)
1958        :return bool:
1959        """
1960        # has someone opted out of deleting this?
1961        if not self.is_expiring(config):
1962            return False
1963
1964        # is this dataset explicitly marked as expiring after a certain time?
1965        future = (
1966            time.time() + 3600
1967        )  # ensure we don't delete datasets with invalid expiration times
1968        if (
1969            self.parameters.get("expires-after")
1970            and convert_to_int(self.parameters["expires-after"], future) < time.time()
1971        ):
1972            return True
1973
1974        # is the data source configured to have its datasets expire?
1975        expiration = config.get("datasources.expiration", {})
1976        if not expiration.get(self.parameters.get("datasource")):
1977            return False
1978
1979        # is the dataset older than the set timeout?
1980        if expiration.get(self.parameters.get("datasource")).get("timeout"):
1981            return (
1982                self.timestamp
1983                + expiration[self.parameters.get("datasource")]["timeout"]
1984                < time.time()
1985            )
1986
1987        return False
1988
1989    def is_from_collector(self):
1990        """
1991        Check if this dataset was made by a processor that collects data, i.e.
1992        a search or import worker.
1993
1994        :return bool:
1995        """
1996        return self.type.endswith("-search") or self.type.endswith("-import")
1997
1998    def get_extension(self):
1999        """
2000        Gets the file extention this dataset produces.
2001        Also checks whether the results file exists.
2002        Used for checking processor and dataset compatibility.
2003
2004        :return str extension:  Extension, e.g. `csv`
2005        """
2006        if self.get_results_path().exists():
2007            return self.get_results_path().suffix[1:]
2008
2009        return False
2010
2011    def get_media_type(self):
2012        """
2013        Gets the media type of the dataset file.
2014
2015        :return str: media type, e.g., "text"
2016        """
2017        own_processor = self.get_own_processor()
2018        if hasattr(self, "media_type"):
2019            # media type can be defined explicitly in the dataset; this is the priority
2020            return self.media_type
2021        elif own_processor is not None:
2022            # or media type can be defined in the processor
2023            # some processors can set different media types for different datasets (e.g., import_media)
2024            if hasattr(own_processor, "media_type"):
2025                return own_processor.media_type
2026
2027        # Default to text
2028        return self.parameters.get("media_type", "text")
2029
2030    def get_metadata(self):
2031        """
2032        Get dataset metadata
2033
2034        This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
2035        as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
2036        update its database (and ensure compatibility with the exporting version of 4CAT).
2037        """
2038        metadata = self.db.fetchone(
2039            "SELECT * FROM datasets WHERE key = %s", (self.key,)
2040        )
2041
2042        # get 4CAT version (presumably to ensure export is compatible with import)
2043        metadata["current_4CAT_version"] = get_software_version()
2044        return metadata
2045
2046    def get_result_url(self):
2047        """
2048        Gets the 4CAT frontend URL of a dataset file.
2049
2050        Uses the FlaskConfig attributes (i.e., SERVER_NAME and
2051        SERVER_HTTPS) plus hardcoded '/result/'.
2052        TODO: create more dynamic method of obtaining url.
2053        """
2054        filename = self.get_results_path().name
2055
2056        # we cheat a little here by using the modules' config reader, but these
2057        # will never be context-dependent values anyway
2058        url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \
2059                        self.modules.config.get("flask.server_name") + '/result/' + filename
2060        return url_to_file
2061
2062    def warn_unmappable_item(
2063        self, item_count, processor=None, error_message=None, warn_admins=True
2064    ):
2065        """
2066        Log an item that is unable to be mapped and warn administrators.
2067
2068        :param int item_count:			Item index
2069        :param Processor processor:		Processor calling function8
2070        """
2071        dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
2072
2073        # Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
2074        closest_dataset = (
2075            processor.dataset
2076            if processor is not None and processor.dataset is not None
2077            else self
2078        )
2079        # Log error to dataset log
2080        closest_dataset.log(dataset_error_message)
2081
2082        if warn_admins:
2083            if processor is not None:
2084                processor.log.warning(
2085                    f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}."
2086                )
2087            elif hasattr(self.db, "log"):
2088                # borrow the database's log handler
2089                self.db.log.warning(
2090                    f"Unable to map item all items for dataset {closest_dataset.key}."
2091                )
2092            else:
2093                # No other log available
2094                raise DataSetException(
2095                    f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn"
2096                )
2097
2098    # Annotation functions (most of it is handled in Annotations)
2099    def has_annotations(self) -> bool:
2100        """
2101        Whether this dataset has annotations
2102        """
2103
2104        annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,))
2105
2106        return True if annotation else False
2107
2108    def num_annotations(self) -> int:
2109        """
2110        Get the amount of annotations
2111        """
2112        return self.db.fetchone(
2113            "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,)
2114        )["count"]
2115
2116    def get_annotation(self, data: dict):
2117        """
2118        Retrieves a specific annotation if it exists.
2119
2120        :param data:		A dictionary with which to get the annotations from.
2121                                                To get specific annotations, include either an `id` field or
2122                                                `field_id` and `item_id` fields.
2123
2124        return Annotation:	Annotation object.
2125        """
2126
2127        if "id" not in data or ("field_id" not in data and "item_id" not in data):
2128            return None
2129
2130        if "dataset" not in data:
2131            data["dataset"] = self.key
2132
2133        return Annotation(data=data, db=self.db)
2134
2135    def get_annotations(self) -> list:
2136        """
2137        Retrieves all annotations for this dataset.
2138
2139        return list: 	List of Annotation objects.
2140        """
2141
2142        return Annotation.get_annotations_for_dataset(self.db, self.key)
2143
2144    def get_annotations_for_item(self, item_id: str) -> list:
2145        """
2146        Retrieves all annotations from this dataset for a specific item (e.g. social media post).
2147        """
2148        return Annotation.get_annotations_for_dataset(
2149            self.db, self.key, item_id=item_id
2150        )
2151
2152    def has_annotation_fields(self) -> bool:
2153        """
2154        Returns True if there's annotation fields saved tot the dataset table
2155        Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.).
2156        """
2157
2158        return True if self.annotation_fields else False
2159
2160    def get_annotation_field_labels(self) -> list:
2161        """
2162        Retrieves the saved annotation field labels for this dataset.
2163        These are stored in the annotations table.
2164
2165        :return list: List of annotation field labels.
2166        """
2167
2168        annotation_fields = self.annotation_fields
2169
2170        if not annotation_fields:
2171            return []
2172
2173        labels = [v["label"] for v in annotation_fields.values()]
2174
2175        return labels
2176
2177    def save_annotations(self, annotations: list) -> int:
2178        """
2179        Takes a list of annotations and saves them to the annotations table.
2180        If a field is not yet present in the `annotation_fields` column in
2181        the datasets table, it also adds it there.
2182
2183        :param list annotations:		List of dictionaries with annotation items. Must have `item_id`, `field_id`,
2184                                                                        and `label`.
2185                                                                        `item_id` is for the specific item being annotated (e.g. a social media post)
2186                                                                        `field_id` refers to the annotation field.
2187                                                                        `label` is a human-readable description of this annotation.
2188                                                                        E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd",
2189                                                                         "value": "Yes"}]
2190
2191        :returns int:					How many annotations were saved.
2192
2193        """
2194
2195        if not annotations:
2196            return 0
2197
2198        count = 0
2199        annotation_fields = self.annotation_fields
2200
2201        # Add some dataset data to annotations, if not present
2202        for annotation_data in annotations:
2203            # Check if the required fields are present
2204            if "item_id" not in annotation_data:
2205                raise AnnotationException(
2206                    "Can't save annotations; annotation must have an `item_id` referencing "
2207                    "the item it annotated, got %s" % annotation_data
2208                )
2209            if "field_id" not in annotation_data:
2210                raise AnnotationException(
2211                    "Can't save annotations; annotation must have a `field_id` field, "
2212                    "got %s" % annotation_data
2213                )
2214            if "label" not in annotation_data or not isinstance(
2215                annotation_data["label"], str
2216            ):
2217                raise AnnotationException(
2218                    "Can't save annotations; annotation must have a `label` field, "
2219                    "got %s" % annotation_data
2220                )
2221
2222            # Set dataset key
2223            if not annotation_data.get("dataset"):
2224                annotation_data["dataset"] = self.key
2225
2226            # Set default author to this dataset owner
2227            # If this annotation is made by a processor, it will have the processor name
2228            if not annotation_data.get("author"):
2229                annotation_data["author"] = self.get_owners()[0]
2230
2231            # Create Annotation object, which also saves it to the database
2232            # If this dataset/item ID/label combination already exists, this retrieves the
2233            # existing data and updates it with new values.
2234            Annotation(data=annotation_data, db=self.db)
2235            count += 1
2236
2237        # Save annotation fields if things changed
2238        if annotation_fields != self.annotation_fields:
2239            self.save_annotation_fields(annotation_fields)
2240
2241        return count
2242
2243    def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2244        """
2245        Save annotation field data to the datasets table (in the `annotation_fields` column).
2246        If changes to the annotation fields affect existing annotations,
2247        this function will also call `update_annotations_via_fields()` to change them.
2248
2249        :param dict new_fields:  		New annotation fields, with a field ID as key.
2250
2251        :param bool add:				Whether we're merely adding new fields
2252                                                                        or replacing the whole batch. If add is False,
2253                                                                        `new_fields` should contain all fields.
2254
2255        :return int:					The number of annotation fields saved.
2256
2257        """
2258
2259        # Get existing annotation fields to see if stuff changed.
2260        old_fields = self.annotation_fields
2261        changes = False
2262
2263        # Do some validation
2264        # Annotation field must be valid JSON.
2265        try:
2266            json.dumps(new_fields)
2267        except ValueError:
2268            raise AnnotationException(
2269                "Can't save annotation fields: not valid JSON (%s)" % new_fields
2270            )
2271
2272        # No duplicate IDs
2273        if len(new_fields) != len(set(new_fields)):
2274            raise AnnotationException(
2275                "Can't save annotation fields: field IDs must be unique"
2276            )
2277
2278        # Annotation fields must at minimum have `type` and `label` keys.
2279        for field_id, annotation_field in new_fields.items():
2280            if not isinstance(field_id, str):
2281                raise AnnotationException(
2282                    "Can't save annotation fields: field ID %s is not a valid string"
2283                    % field_id
2284                )
2285            if "label" not in annotation_field:
2286                raise AnnotationException(
2287                    "Can't save annotation fields: all fields must have a label"
2288                    % field_id
2289                )
2290            if "type" not in annotation_field:
2291                raise AnnotationException(
2292                    "Can't save annotation fields: all fields must have a type"
2293                    % field_id
2294                )
2295
2296        # Check if fields are removed
2297        if not add:
2298            for field_id in old_fields.keys():
2299                if field_id not in new_fields:
2300                    changes = True
2301
2302        # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin
2303        # dataset
2304        for field_id in new_fields.keys():
2305            if field_id in old_fields and old_fields[field_id].get("from_dataset"):
2306                old_fields[field_id]["label"] = new_fields[field_id][
2307                    "label"
2308                ]  # Only labels could've been changed
2309                new_fields[field_id] = old_fields[field_id]
2310
2311        # If we're just adding fields, add them to the old fields.
2312        # If the field already exists, overwrite the old field.
2313        if add and old_fields:
2314            all_fields = old_fields
2315            for field_id, annotation_field in new_fields.items():
2316                all_fields[field_id] = annotation_field
2317            new_fields = all_fields
2318
2319        # We're saving the new annotation fields as-is.
2320        # Ordering of fields is preserved this way.
2321        self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)})
2322        self.annotation_fields = new_fields
2323
2324        # If anything changed with the annotation fields, possibly update
2325        # existing annotations (e.g. to delete them or change their labels).
2326        if changes:
2327            Annotation.update_annotations_via_fields(
2328                self.key, old_fields, new_fields, self.db
2329            )
2330
2331        return len(new_fields)
2332
2333    def get_annotation_metadata(self) -> dict:
2334        """
2335        Retrieves all the data for this dataset from the annotations table.
2336        """
2337
2338        annotation_data = self.db.fetchall(
2339            "SELECT * FROM annotations WHERE dataset = '%s';" % self.key
2340        )
2341        return annotation_data
2342
2343    def __getattr__(self, attr):
2344        """
2345        Getter so we don't have to use .data all the time
2346
2347        :param attr:  Data key to get
2348        :return:  Value
2349        """
2350
2351        if attr in dir(self):
2352            # an explicitly defined attribute should always be called in favour
2353            # of this passthrough
2354            attribute = getattr(self, attr)
2355            return attribute
2356        elif attr in self.data:
2357            return self.data[attr]
2358        else:
2359            raise AttributeError("DataSet instance has no attribute %s" % attr)
2360
2361    def __setattr__(self, attr, value):
2362        """
2363        Setter so we can flexibly update the database
2364
2365        Also updates internal data stores (.data etc). If the attribute is
2366        unknown, it is stored within the 'parameters' attribute.
2367
2368        :param str attr:  Attribute to update
2369        :param value:  New value
2370        """
2371
2372        # don't override behaviour for *actual* class attributes
2373        if attr in dir(self):
2374            super().__setattr__(attr, value)
2375            return
2376
2377        if attr not in self.data:
2378            self.parameters[attr] = value
2379            attr = "parameters"
2380            value = self.parameters
2381
2382        if attr == "parameters":
2383            value = json.dumps(value)
2384
2385        self.db.update("datasets", where={"key": self.key}, data={attr: value})
2386
2387        self.data[attr] = value
2388
2389        if attr == "parameters":
2390            self.parameters = json.loads(value)

Provide interface to safely register and run operations on a dataset

A dataset is a collection of:

  • A unique identifier
  • A set of parameters that demarcate the data contained within
  • The data

The data is usually stored in a file on the disk; the parameters are stored in a database. The handling of the data, et cetera, is done by other workers; this class defines method to create and manipulate the dataset's properties.

DataSet( parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None, type=None, is_private=True, owner='anonymous', modules=None)
 61    def __init__(
 62        self,
 63        parameters=None,
 64        key=None,
 65        job=None,
 66        data=None,
 67        db=None,
 68        parent="",
 69        extension=None,
 70        type=None,
 71        is_private=True,
 72        owner="anonymous",
 73        modules=None
 74    ):
 75        """
 76        Create new dataset object
 77
 78        If the dataset is not in the database yet, it is added.
 79
 80        :param dict parameters:  Only when creating a new dataset. Dataset
 81        parameters, free-form dictionary.
 82        :param str key: Dataset key. If given, dataset with this key is loaded.
 83        :param int job: Job ID. If given, dataset corresponding to job is
 84        loaded.
 85        :param dict data: Dataset data, corresponding to a row in the datasets
 86        database table. If not given, retrieved from database depending on key.
 87        :param db:  Database connection
 88        :param str parent:  Only when creating a new dataset. Parent dataset
 89        key to which the one being created is a child.
 90        :param str extension: Only when creating a new dataset. Extension of
 91        dataset result file.
 92        :param str type: Only when creating a new dataset. Type of the dataset,
 93        corresponding to the type property of a processor class.
 94        :param bool is_private: Only when creating a new dataset. Whether the
 95        dataset is private or public.
 96        :param str owner: Only when creating a new dataset. The user name of
 97        the dataset's creator.
 98        :param modules: Module cache. If not given, will be loaded when needed
 99        (expensive). Used to figure out what processors are compatible with
100        this dataset.
101        """
102        self.db = db
103
104        # Ensure mutable attributes are set in __init__ as they are unique to each DataSet
105        self.data = {}
106        self.parameters = {}
107        self.available_processors = {}
108        self._genealogy = []
109        self.staging_areas = []
110        self.modules = modules
111
112        if key is not None:
113            self.key = key
114            current = self.db.fetchone(
115                "SELECT * FROM datasets WHERE key = %s", (self.key,)
116            )
117            if not current:
118                raise DataSetNotFoundException(
119                    "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given"
120                    % key
121                )
122
123        elif job is not None:
124            current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),))
125            if not current:
126                raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
127
128            self.key = current["key"]
129        elif data is not None:
130            current = data
131            if (
132                "query" not in data
133                or "key" not in data
134                or "parameters" not in data
135                or "key_parent" not in data
136            ):
137                raise DataSetException(
138                    "DataSet() requires a complete dataset record for its 'data' argument"
139                )
140
141            self.key = current["key"]
142        else:
143            if parameters is None:
144                raise DataSetException(
145                    "DataSet() requires either 'key', or 'parameters' to be given"
146                )
147
148            if not type:
149                raise DataSetException("Datasets must have their type set explicitly")
150
151            query = self.get_label(parameters, default=type)
152            self.key = self.get_key(query, parameters, parent)
153            current = self.db.fetchone(
154                "SELECT * FROM datasets WHERE key = %s AND query = %s",
155                (self.key, query),
156            )
157
158        if current:
159            self.data = current
160            self.parameters = json.loads(self.data["parameters"])
161            self.annotation_fields = json.loads(self.data["annotation_fields"]) \
162                if self.data.get("annotation_fields") else {}
163            self.is_new = False
164        else:
165            self.data = {"type": type}  # get_own_processor needs this
166            own_processor = self.get_own_processor()
167            version = get_software_commit(own_processor)
168            self.data = {
169                "key": self.key,
170                "query": self.get_label(parameters, default=type),
171                "parameters": json.dumps(parameters),
172                "result_file": "",
173                "creator": owner,
174                "status": "",
175                "type": type,
176                "timestamp": int(time.time()),
177                "is_finished": False,
178                "is_private": is_private,
179                "software_version": version[0],
180                "software_source": version[1],
181                "software_file": "",
182                "num_rows": 0,
183                "progress": 0.0,
184                "key_parent": parent,
185                "annotation_fields": "{}"
186            }
187            self.parameters = parameters
188            self.annotation_fields = {}
189
190            self.db.insert("datasets", data=self.data)
191            self.refresh_owners()
192            self.add_owner(owner)
193
194            # Find desired extension from processor if not explicitly set
195            if extension is None:
196                if own_processor:
197                    extension = own_processor.get_extension(
198                        parent_dataset=DataSet(key=parent, db=db, modules=self.modules)
199                        if parent
200                        else None
201                    )
202                # Still no extension, default to 'csv'
203                if not extension:
204                    extension = "csv"
205
206            # Reserve filename and update data['result_file']
207            self.reserve_result_file(parameters, extension)
208
209        self.refresh_owners()

Create new dataset object

If the dataset is not in the database yet, it is added.

Parameters
  • dict parameters: Only when creating a new dataset. Dataset parameters, free-form dictionary.
  • str key: Dataset key. If given, dataset with this key is loaded.
  • int job: Job ID. If given, dataset corresponding to job is loaded.
  • dict data: Dataset data, corresponding to a row in the datasets database table. If not given, retrieved from database depending on key.
  • db: Database connection
  • str parent: Only when creating a new dataset. Parent dataset key to which the one being created is a child.
  • str extension: Only when creating a new dataset. Extension of dataset result file.
  • str type: Only when creating a new dataset. Type of the dataset, corresponding to the type property of a processor class.
  • bool is_private: Only when creating a new dataset. Whether the dataset is private or public.
  • str owner: Only when creating a new dataset. The user name of the dataset's creator.
  • modules: Module cache. If not given, will be loaded when needed (expensive). Used to figure out what processors are compatible with this dataset.
data = None
key = ''
available_processors = None
preset_parent = None
parameters = None
modules = None
annotation_fields = None
owners = None
tagged_owners = None
db = None
folder = None
is_new = True
no_status_updates = False
staging_areas = None
def check_dataset_finished(self):
211    def check_dataset_finished(self):
212        """
213        Checks if dataset is finished. Returns path to results file is not empty,
214        or 'empty_file' when there were not matches.
215
216        Only returns a path if the dataset is complete. In other words, if this
217        method returns a path, a file with the complete results for this dataset
218        will exist at that location.
219
220        :return: A path to the results file, 'empty_file', or `None`
221        """
222        if self.data["is_finished"] and self.data["num_rows"] > 0:
223            return self.get_results_path()
224        elif self.data["is_finished"] and self.data["num_rows"] == 0:
225            return 'empty'
226        else:
227            return None

Checks if dataset is finished. Returns path to results file is not empty, or 'empty_file' when there were not matches.

Only returns a path if the dataset is complete. In other words, if this method returns a path, a file with the complete results for this dataset will exist at that location.

Returns

A path to the results file, 'empty_file', or None

def get_results_path(self):
229    def get_results_path(self):
230        """
231        Get path to results file
232
233        Always returns a path, that will at some point contain the dataset
234        data, but may not do so yet. Use this to get the location to write
235        generated results to.
236
237        :return Path:  A path to the results file
238        """
239        # alas we need to instantiate a config reader here - no way around it
240        if not self.folder:
241            self.folder = self.modules.config.get('PATH_ROOT').joinpath(self.modules.config.get('PATH_DATA'))
242        return self.folder.joinpath(self.data["result_file"])

Get path to results file

Always returns a path, that will at some point contain the dataset data, but may not do so yet. Use this to get the location to write generated results to.

Returns

A path to the results file

def get_results_folder_path(self):
244    def get_results_folder_path(self):
245        """
246        Get path to folder containing accompanying results
247
248        Returns a path that may not yet be created
249
250        :return Path:  A path to the results file
251        """
252        return self.get_results_path().parent.joinpath("folder_" + self.key)

Get path to folder containing accompanying results

Returns a path that may not yet be created

Returns

A path to the results file

def get_log_path(self):
254    def get_log_path(self):
255        """
256        Get path to dataset log file
257
258        Each dataset has a single log file that documents its creation. This
259        method returns the path to that file. It is identical to the path of
260        the dataset result file, with 'log' as its extension instead.
261
262        :return Path:  A path to the log file
263        """
264        return self.get_results_path().with_suffix(".log")

Get path to dataset log file

Each dataset has a single log file that documents its creation. This method returns the path to that file. It is identical to the path of the dataset result file, with 'log' as its extension instead.

Returns

A path to the log file

def clear_log(self):
266    def clear_log(self):
267        """
268        Clears the dataset log file
269
270        If the log file does not exist, it is created empty. The log file will
271        have the same file name as the dataset result file, with the 'log'
272        extension.
273        """
274        log_path = self.get_log_path()
275        with log_path.open("w"):
276            pass

Clears the dataset log file

If the log file does not exist, it is created empty. The log file will have the same file name as the dataset result file, with the 'log' extension.

def log(self, log):
278    def log(self, log):
279        """
280        Write log message to file
281
282        Writes the log message to the log file on a new line, including a
283        timestamp at the start of the line. Note that this assumes the log file
284        already exists - it should have been created/cleared with clear_log()
285        prior to calling this.
286
287        :param str log:  Log message to write
288        """
289        log_path = self.get_log_path()
290        with log_path.open("a", encoding="utf-8") as outfile:
291            outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))

Write log message to file

Writes the log message to the log file on a new line, including a timestamp at the start of the line. Note that this assumes the log file already exists - it should have been created/cleared with clear_log() prior to calling this.

Parameters
  • str log: Log message to write
def iterate_items( self, processor=None, warn_unmappable=True, map_missing='default', get_annotations=True, max_unmappable=None):
356    def iterate_items(
357        self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None
358    ):
359        """
360        Generate mapped dataset items
361
362        Wrapper for _iterate_items that returns a DatasetItem, which can be
363        accessed as a dict returning the original item or (if a mapper is
364        available) the mapped item. Mapped or original versions of the item can
365        also be accessed via the `original` and `mapped_object` properties of
366        the DatasetItem.
367
368        Processors can define a method called `map_item` that can be used to map
369        an item from the dataset file before it is processed any further. This is
370        slower than storing the data file in the right format to begin with but
371        not all data sources allow for easy 'flat' mapping of items, e.g. tweets
372        are nested objects when retrieved from the twitter API that are easier
373        to store as a JSON file than as a flat CSV file, and it would be a shame
374        to throw away that data.
375
376        Note the two parameters warn_unmappable and map_missing. Items can be
377        unmappable in that their structure is too different to coerce into a
378        neat dictionary of the structure the data source expects. This makes it
379        'unmappable' and warn_unmappable determines what happens in this case.
380        It can also be of the right structure, but with some fields missing or
381        incomplete. map_missing determines what happens in that case. The
382        latter is for example possible when importing data via Zeeschuimer,
383        which produces unstably-structured data captured from social media
384        sites.
385
386        :param BasicProcessor processor:  A reference to the processor
387        iterating the dataset.
388        :param bool warn_unmappable:  If an item is not mappable, skip the item
389        and log a warning
390        :param max_unmappable:  Skip at most this many unmappable items; if
391        more are encountered, stop iterating. `None` to never stop.
392        :param map_missing: Indicates what to do with mapped items for which
393        some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
394        - 'default': fill missing fields with the default passed by map_item
395        - 'abort': raise a MappedItemIncompleteException if a field is missing
396        - a callback: replace missing field with the return value of the
397          callback. The MappedItem object is passed to the callback as the
398          first argument and the name of the missing field as the second.
399        - a dictionary with a key for each possible missing field: replace missing
400          field with a strategy for that field ('default', 'abort', or a callback)
401        :param get_annotations: Whether to also fetch annotations from the database.
402          This can be disabled to help speed up iteration.
403        :return generator:  A generator that yields DatasetItems
404        """
405        unmapped_items = 0
406        # Collect item_mapper for use with filter
407        item_mapper = False
408        own_processor = self.get_own_processor()
409        if own_processor and own_processor.map_item_method_available(dataset=self):
410            item_mapper = True
411
412        # Annotations are dynamically added, and we're handling them as 'extra' map_item fields.
413        if get_annotations:
414            annotation_fields = self.annotation_fields
415            num_annotations = 0 if not annotation_fields else self.num_annotations()
416            all_annotations = None
417            # If this dataset has less than n annotations, just retrieve them all before iterating
418            if 0 < num_annotations <= 500:
419                # Convert to dict for faster lookup when iterating over items
420                all_annotations = collections.defaultdict(list)
421                for annotation in self.get_annotations():
422                    all_annotations[annotation.item_id].append(annotation)
423
424        # missing field strategy can be for all fields at once, or per field
425        # if it is per field, it is a dictionary with field names and their strategy
426        # if it is for all fields, it may be a callback, 'abort', or 'default'
427        default_strategy = "default"
428        if type(map_missing) is not dict:
429            default_strategy = map_missing
430            map_missing = {}
431
432        # Loop through items
433        for i, item in enumerate(self._iterate_items(processor)):
434            # Save original to yield
435            original_item = item.copy()
436
437            # Map item
438            if item_mapper:
439                try:
440                    mapped_item = own_processor.get_mapped_item(item)
441                except MapItemException as e:
442                    if warn_unmappable:
443                        self.warn_unmappable_item(
444                            i, processor, e, warn_admins=unmapped_items is False
445                        )
446                        
447                    unmapped_items += 1
448                    if max_unmappable and unmapped_items > max_unmappable:
449                        break
450                    else:
451                        continue
452
453                # check if fields have been marked as 'missing' in the
454                # underlying data, and treat according to the chosen strategy
455                if mapped_item.get_missing_fields():
456                    for missing_field in mapped_item.get_missing_fields():
457                        strategy = map_missing.get(missing_field, default_strategy)
458
459                        if callable(strategy):
460                            # delegate handling to a callback
461                            mapped_item.data[missing_field] = strategy(
462                                mapped_item.data, missing_field
463                            )
464                        elif strategy == "abort":
465                            # raise an exception to be handled at the processor level
466                            raise MappedItemIncompleteException(
467                                f"Cannot process item, field {missing_field} missing in source data."
468                            )
469                        elif strategy == "default":
470                            # use whatever was passed to the object constructor
471                            mapped_item.data[missing_field] = mapped_item.data[
472                                missing_field
473                            ].value
474                        else:
475                            raise ValueError(
476                                "map_missing must be 'abort', 'default', or a callback."
477                            )
478            else:
479                mapped_item = original_item
480
481            # Add possible annotation values
482            if get_annotations and annotation_fields:
483                # We're always handling annotated data as a MappedItem object,
484                # even if no map_item() function is available for the data source.
485                if not isinstance(mapped_item, MappedItem):
486                    mapped_item = MappedItem(mapped_item)
487
488                # Retrieve from all annotations
489                if all_annotations:
490                    item_annotations = all_annotations.get(mapped_item.data["id"])
491                # Or get annotations per item for large datasets (more queries but less memory usage)
492                else:
493                    item_annotations = self.get_annotations_for_item(
494                        mapped_item.data["id"]
495                    )
496
497                # Loop through annotation fields
498                for (
499                    annotation_field_id,
500                    annotation_field_items,
501                ) in annotation_fields.items():
502                    # Set default value to an empty string
503                    value = ""
504                    # Set value if this item has annotations
505                    if item_annotations:
506                        # Items can have multiple annotations
507                        for annotation in item_annotations:
508                            if annotation.field_id == annotation_field_id:
509                                value = annotation.value
510                            if isinstance(value, list):
511                                value = ",".join(value)
512
513                    # Make sure labels are unique when iterating through items
514                    column_name = annotation_field_items["label"]
515                    label_count = 1
516                    while column_name in mapped_item.data:
517                        label_count += 1
518                        column_name = f"{annotation_field_items['label']}_{label_count}"
519
520                    # We're always adding an annotation value
521                    # as an empty string, even if it's absent.
522                    mapped_item.data[column_name] = value
523
524            # yield a DatasetItem, which is a dict with some special properties
525            yield DatasetItem(
526                mapper=item_mapper,
527                original=original_item,
528                mapped_object=mapped_item,
529                **(
530                    mapped_item.get_item_data()
531                    if type(mapped_item) is MappedItem
532                    else mapped_item
533                ),
534            )

Generate mapped dataset items

Wrapper for _iterate_items that returns a DatasetItem, which can be accessed as a dict returning the original item or (if a mapper is available) the mapped item. Mapped or original versions of the item can also be accessed via the original and mapped_object properties of the DatasetItem.

Processors can define a method called map_item that can be used to map an item from the dataset file before it is processed any further. This is slower than storing the data file in the right format to begin with but not all data sources allow for easy 'flat' mapping of items, e.g. tweets are nested objects when retrieved from the twitter API that are easier to store as a JSON file than as a flat CSV file, and it would be a shame to throw away that data.

Note the two parameters warn_unmappable and map_missing. Items can be unmappable in that their structure is too different to coerce into a neat dictionary of the structure the data source expects. This makes it 'unmappable' and warn_unmappable determines what happens in this case. It can also be of the right structure, but with some fields missing or incomplete. map_missing determines what happens in that case. The latter is for example possible when importing data via Zeeschuimer, which produces unstably-structured data captured from social media sites.

Parameters
  • BasicProcessor processor: A reference to the processor iterating the dataset.
  • bool warn_unmappable: If an item is not mappable, skip the item and log a warning
  • max_unmappable: Skip at most this many unmappable items; if more are encountered, stop iterating. None to never stop.
  • map_missing: Indicates what to do with mapped items for which some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
    • 'default': fill missing fields with the default passed by map_item
    • 'abort': raise a MappedItemIncompleteException if a field is missing
    • a callback: replace missing field with the return value of the callback. The MappedItem object is passed to the callback as the first argument and the name of the missing field as the second.
    • a dictionary with a key for each possible missing field: replace missing field with a strategy for that field ('default', 'abort', or a callback)
  • get_annotations: Whether to also fetch annotations from the database. This can be disabled to help speed up iteration.
Returns

A generator that yields DatasetItems

def sort_and_iterate_items(self, sort='', reverse=False, chunk_size=50000, **kwargs) -> dict:
536    def sort_and_iterate_items(
537        self, sort="", reverse=False, chunk_size=50000, **kwargs
538    ) -> dict:
539        """
540        Loop through items in a dataset, sorted by a given key.
541
542        This is a wrapper function for `iterate_items()` with the
543        added functionality of sorting a dataset.
544
545        :param sort:				The item key that determines the sort order.
546        :param reverse:				Whether to sort by largest values first.
547
548        :returns dict:				Yields iterated post
549        """
550
551        def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False):
552            """
553            Sort items based on the given key and order.
554
555            :param items_to_sort:  The items to sort
556            :param sort_key:  The key to sort by
557            :param reverse:  Whether to sort in reverse order
558            :return:  Sorted items
559            """
560            if reverse is False and (sort_key == "dataset-order" or sort_key == ""):
561                # Sort by dataset order
562                yield from items_to_sort
563            elif sort_key == "dataset-order" and reverse:
564                # Sort by dataset order in reverse
565                yield from reversed(list(items_to_sort))
566            else:
567                # Sort on the basis of a column value
568                if not convert_sort_to_float:
569                    yield from sorted(
570                        items_to_sort,
571                        key=lambda x: x.get(sort_key, ""),
572                        reverse=reverse,
573                    )
574                else:
575                    # Dataset fields can contain integers and empty strings.
576                    # Since these cannot be compared, we will convert every
577                    # empty string to 0.
578                    yield from sorted(
579                        items_to_sort,
580                        key=lambda x: convert_to_float(x.get(sort_key, ""), force=True),
581                        reverse=reverse,
582                    )
583
584        if self.num_rows < chunk_size:
585            try:
586                # First try to force-sort float values. If this doesn't work, it'll be alphabetical.
587                yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True)
588            except (TypeError, ValueError):
589                yield from sort_items(
590                    self.iterate_items(**kwargs),
591                    sort,
592                    reverse,
593                    convert_sort_to_float=False,
594                )
595
596        else:
597            # For large datasets, we will use chunk sorting
598            staging_area = self.get_staging_area()
599            buffer = []
600            chunk_files = []
601            convert_sort_to_float = True
602            fieldnames = self.get_columns()
603
604            def write_chunk(buffer, chunk_index):
605                """
606                Write a chunk of data to a temporary file
607
608                :param buffer:  The buffer containing the chunk of data
609                :param chunk_index:  The index of the chunk
610                :return:  The path to the temporary file
611                """
612                temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv")
613                with temp_file.open("w", encoding="utf-8") as chunk_file:
614                    writer = csv.DictWriter(chunk_file, fieldnames=fieldnames)
615                    writer.writeheader()
616                    writer.writerows(buffer)
617                return temp_file
618
619            # Divide the dataset into sorted chunks
620            for item in self.iterate_items(**kwargs):
621                buffer.append(item)
622                if len(buffer) >= chunk_size:
623                    try:
624                        buffer = list(
625                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
626                        )
627                    except (TypeError, ValueError):
628                        convert_sort_to_float = False
629                        buffer = list(
630                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
631                        )
632
633                    chunk_files.append(write_chunk(buffer, len(chunk_files)))
634                    buffer.clear()
635
636            # Sort and write any remaining items in the buffer
637            if buffer:
638                buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float))
639                chunk_files.append(write_chunk(buffer, len(chunk_files)))
640                buffer.clear()
641
642            # Merge sorted chunks into the final sorted file
643            sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv")
644            with sorted_file.open("w", encoding="utf-8") as outfile:
645                writer = csv.DictWriter(outfile, fieldnames=self.get_columns())
646                writer.writeheader()
647
648                # Open all chunk files for reading
649                chunk_readers = [
650                    csv.DictReader(chunk.open("r", encoding="utf-8"))
651                    for chunk in chunk_files
652                ]
653                heap = []
654
655                # Initialize the heap with the first row from each chunk
656                for i, reader in enumerate(chunk_readers):
657                    try:
658                        row = next(reader)
659                        if sort == "dataset-order" and reverse:
660                            # Use a reverse index for "dataset-order" and reverse=True
661                            sort_key = -i
662                        elif convert_sort_to_float:
663                            # Negate numeric keys for reverse sorting
664                            sort_key = (
665                                -convert_to_float(row.get(sort, ""))
666                                if reverse
667                                else convert_to_float(row.get(sort, ""))
668                            )
669                        else:
670                            if reverse:
671                                # For reverse string sorting, invert string comparison by creating a tuple
672                                # with an inverted string - this makes Python's tuple comparison work in reverse
673                                sort_key = (
674                                    tuple(-ord(c) for c in row.get(sort, "")),
675                                    -i,
676                                )
677                            else:
678                                sort_key = (row.get(sort, ""), i)
679                        heap.append((sort_key, i, row))
680                    except StopIteration:
681                        pass
682
683                # Use a heap to merge sorted chunks
684                import heapq
685
686                heapq.heapify(heap)
687                while heap:
688                    _, chunk_index, smallest_row = heapq.heappop(heap)
689                    writer.writerow(smallest_row)
690                    try:
691                        next_row = next(chunk_readers[chunk_index])
692                        if sort == "dataset-order" and reverse:
693                            # Use a reverse index for "dataset-order" and reverse=True
694                            sort_key = -chunk_index
695                        elif convert_sort_to_float:
696                            sort_key = (
697                                -convert_to_float(next_row.get(sort, ""))
698                                if reverse
699                                else convert_to_float(next_row.get(sort, ""))
700                            )
701                        else:
702                            # Use the same inverted comparison for string values
703                            if reverse:
704                                sort_key = (
705                                    tuple(-ord(c) for c in next_row.get(sort, "")),
706                                    -chunk_index,
707                                )
708                            else:
709                                sort_key = (next_row.get(sort, ""), chunk_index)
710                        heapq.heappush(heap, (sort_key, chunk_index, next_row))
711                    except StopIteration:
712                        pass
713
714            # Read the sorted file and yield each item
715            with sorted_file.open("r", encoding="utf-8") as infile:
716                reader = csv.DictReader(infile)
717                for item in reader:
718                    yield item
719
720            # Remove the temporary files
721            if staging_area.is_dir():
722                shutil.rmtree(staging_area)

Loop through items in a dataset, sorted by a given key.

This is a wrapper function for iterate_items() with the added functionality of sorting a dataset.

Parameters
  • sort: The item key that determines the sort order.
  • reverse: Whether to sort by largest values first.

:returns dict: Yields iterated post

def get_staging_area(self):
724    def get_staging_area(self):
725        """
726        Get path to a temporary folder in which files can be stored before
727        finishing
728
729        This folder must be created before use, but is guaranteed to not exist
730        yet. The folder may be used as a staging area for the dataset data
731        while it is being processed.
732
733        :return Path:  Path to folder
734        """
735        results_file = self.get_results_path()
736
737        results_dir_base = results_file.parent
738        results_dir = results_file.name.replace(".", "") + "-staging"
739        results_path = results_dir_base.joinpath(results_dir)
740        index = 1
741        while results_path.exists():
742            results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
743            index += 1
744
745        # create temporary folder
746        results_path.mkdir()
747
748        # Storing the staging area with the dataset so that it can be removed later
749        self.staging_areas.append(results_path)
750
751        return results_path

Get path to a temporary folder in which files can be stored before finishing

This folder must be created before use, but is guaranteed to not exist yet. The folder may be used as a staging area for the dataset data while it is being processed.

Returns

Path to folder

def remove_staging_areas(self):
753    def remove_staging_areas(self):
754        """
755        Remove any staging areas that were created and all files contained in them.
756        """
757        # Remove DataSet staging areas
758        if self.staging_areas:
759            for staging_area in self.staging_areas:
760                if staging_area.is_dir():
761                    shutil.rmtree(staging_area)

Remove any staging areas that were created and all files contained in them.

def finish(self, num_rows=0):
763    def finish(self, num_rows=0):
764        """
765        Declare the dataset finished
766        """
767        if self.data["is_finished"]:
768            raise RuntimeError("Cannot finish a finished dataset again")
769
770        self.db.update(
771            "datasets",
772            where={"key": self.data["key"]},
773            data={
774                "is_finished": True,
775                "num_rows": num_rows,
776                "progress": 1.0,
777                "timestamp_finished": int(time.time()),
778            },
779        )
780        self.data["is_finished"] = True
781        self.data["num_rows"] = num_rows

Declare the dataset finished

def copy(self, shallow=True):
783    def copy(self, shallow=True):
784        """
785        Copies the dataset, making a new version with a unique key
786
787
788        :param bool shallow:  Shallow copy: does not copy the result file, but
789        instead refers to the same file as the original dataset did
790        :return Dataset:  Copied dataset
791        """
792        parameters = self.parameters.copy()
793
794        # a key is partially based on the parameters. so by setting these extra
795        # attributes, we also ensure a unique key will be generated for the
796        # copy
797        # possibly todo: don't use time for uniqueness (but one shouldn't be
798        # copying a dataset multiple times per microsecond, that's not what
799        # this is for)
800        parameters["copied_from"] = self.key
801        parameters["copied_at"] = time.time()
802
803        copy = DataSet(
804            parameters=parameters,
805            db=self.db,
806            extension=self.result_file.split(".")[-1],
807            type=self.type,
808            modules=self.modules,
809        )
810        for field in self.data:
811            if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
812                continue
813
814            copy.__setattr__(field, self.data[field])
815
816        if shallow:
817            # use the same result file
818            copy.result_file = self.result_file
819        else:
820            # copy to new file with new key
821            shutil.copy(self.get_results_path(), copy.get_results_path())
822
823        if self.is_finished():
824            copy.finish(self.num_rows)
825
826        # make sure ownership is also copied
827        copy.copy_ownership_from(self)
828
829        return copy

Copies the dataset, making a new version with a unique key

Parameters
  • bool shallow: Shallow copy: does not copy the result file, but instead refers to the same file as the original dataset did
Returns

Copied dataset

def delete(self, commit=True, queue=None):
831    def delete(self, commit=True, queue=None):
832        """
833        Delete the dataset, and all its children
834
835        Deletes both database records and result files. Note that manipulating
836        a dataset object after it has been deleted is undefined behaviour.
837
838        :param bool commit:  Commit SQL DELETE query?
839        """
840        # first, recursively delete children
841        children = self.db.fetchall(
842            "SELECT * FROM datasets WHERE key_parent = %s", (self.key,)
843        )
844        for child in children:
845            try:
846                child = DataSet(key=child["key"], db=self.db, modules=self.modules)
847                child.delete(commit=commit)
848            except DataSetException:
849                # dataset already deleted - race condition?
850                pass
851
852        # delete any queued jobs for this dataset
853        try:
854            job = Job.get_by_remote_ID(self.key, self.db, self.type)
855            if job.is_claimed:
856                # tell API to stop any jobs running for this dataset
857                # level 2 = cancel job
858                # we're not interested in the result - if the API is available,
859                # it will do its thing, if it's not the backend is probably not
860                # running so the job also doesn't need to be interrupted
861                call_api(
862                    "cancel-job",
863                    {"remote_id": self.key, "jobtype": self.type, "level": 2},
864                    False,
865                )
866
867            # this deletes the job from the database
868            job.finish(True)
869
870        except JobNotFoundException:
871            pass
872
873        # delete this dataset's own annotations
874        self.db.delete("annotations", where={"dataset": self.key}, commit=commit)
875        # delete annotations that have been generated as part of this dataset
876        self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit)
877        # delete annotation fields from other datasets that were created as part of this dataset
878        for parent_dataset in self.get_genealogy():
879            field_deleted = False
880            annotation_fields = parent_dataset.annotation_fields
881            if annotation_fields:
882                for field_id in list(annotation_fields.keys()):
883                    if annotation_fields[field_id].get("from_dataset", "") == self.key:
884                        del annotation_fields[field_id]
885                        field_deleted = True
886            if field_deleted:
887                parent_dataset.save_annotation_fields(annotation_fields)
888
889        # delete dataset from database
890        self.db.delete("datasets", where={"key": self.key}, commit=commit)
891        self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
892        self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
893
894        # delete from drive
895        try:
896            if self.get_results_path().exists():
897                self.get_results_path().unlink()
898            if self.get_results_path().with_suffix(".log").exists():
899                self.get_results_path().with_suffix(".log").unlink()
900            if self.get_results_folder_path().exists():
901                shutil.rmtree(self.get_results_folder_path())
902
903        except FileNotFoundError:
904            # already deleted, apparently
905            pass
906        except PermissionError as e:
907            self.db.log.error(
908                f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}"
909            )

Delete the dataset, and all its children

Deletes both database records and result files. Note that manipulating a dataset object after it has been deleted is undefined behaviour.

Parameters
  • bool commit: Commit SQL DELETE query?
def update_children(self, **kwargs):
911    def update_children(self, **kwargs):
912        """
913        Update an attribute for all child datasets
914
915        Can be used to e.g. change the owner, version, finished status for all
916        datasets in a tree
917
918        :param kwargs:  Parameters corresponding to known dataset attributes
919        """
920        for child in self.get_children(update=True):
921            for attr, value in kwargs.items():
922                child.__setattr__(attr, value)
923
924            child.update_children(**kwargs)

Update an attribute for all child datasets

Can be used to e.g. change the owner, version, finished status for all datasets in a tree

Parameters
  • kwargs: Parameters corresponding to known dataset attributes
def is_finished(self):
926    def is_finished(self):
927        """
928        Check if dataset is finished
929        :return bool:
930        """
931        return bool(self.data["is_finished"])

Check if dataset is finished

Returns
def is_rankable(self, multiple_items=True):
933    def is_rankable(self, multiple_items=True):
934        """
935        Determine if a dataset is rankable
936
937        Rankable means that it is a CSV file with 'date' and 'value' columns
938        as well as one or more item label columns
939
940        :param bool multiple_items:  Consider datasets with multiple items per
941        item (e.g. word_1, word_2, etc)?
942
943        :return bool:  Whether the dataset is rankable or not
944        """
945        if (
946            self.get_results_path().suffix != ".csv"
947            or not self.get_results_path().exists()
948        ):
949            return False
950
951        column_options = {"date", "value", "item"}
952        if multiple_items:
953            column_options.add("word_1")
954
955        with self.get_results_path().open(encoding="utf-8") as infile:
956            reader = csv.DictReader(infile)
957            try:
958                return len(set(reader.fieldnames) & column_options) >= 3
959            except (TypeError, ValueError):
960                return False

Determine if a dataset is rankable

Rankable means that it is a CSV file with 'date' and 'value' columns as well as one or more item label columns

Parameters
  • bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)?
Returns

Whether the dataset is rankable or not

def is_accessible_by(self, username, role='owner'):
962    def is_accessible_by(self, username, role="owner"):
963        """
964        Check if dataset has given user as owner
965
966        :param str|User username: Username to check for
967        :return bool:
968        """
969        if type(username) is not str:
970            if hasattr(username, "get_id"):
971                username = username.get_id()
972            else:
973                raise TypeError("User must be a str or User object")
974
975        # 'normal' owners
976        if username in [
977            owner
978            for owner, meta in self.owners.items()
979            if (role is None or meta["role"] == role)
980        ]:
981            return True
982
983        # owners that are owner by being part of a tag
984        if username in itertools.chain(
985            *[
986                tagged_owners
987                for tag, tagged_owners in self.tagged_owners.items()
988                if (role is None or self.owners[f"tag:{tag}"]["role"] == role)
989            ]
990        ):
991            return True
992
993        return False

Check if dataset has given user as owner

Parameters
  • str|User username: Username to check for
Returns
def get_owners_users(self, role='owner'):
 995    def get_owners_users(self, role="owner"):
 996        """
 997        Get list of dataset owners
 998
 999        This returns a list of *users* that are considered owners. Tags are
1000        transparently replaced with the users with that tag.
1001
1002        :param str|None role:  Role to check for. If `None`, all owners are
1003        returned regardless of role.
1004
1005        :return set:  De-duplicated owner list
1006        """
1007        # 'normal' owners
1008        owners = [
1009            owner
1010            for owner, meta in self.owners.items()
1011            if (role is None or meta["role"] == role) and not owner.startswith("tag:")
1012        ]
1013
1014        # owners that are owner by being part of a tag
1015        owners.extend(
1016            itertools.chain(
1017                *[
1018                    tagged_owners
1019                    for tag, tagged_owners in self.tagged_owners.items()
1020                    if role is None or self.owners[f"tag:{tag}"]["role"] == role
1021                ]
1022            )
1023        )
1024
1025        # de-duplicate before returning
1026        return set(owners)

Get list of dataset owners

This returns a list of users that are considered owners. Tags are transparently replaced with the users with that tag.

Parameters
  • str|None role: Role to check for. If None, all owners are returned regardless of role.
Returns

De-duplicated owner list

def get_owners(self, role='owner'):
1028    def get_owners(self, role="owner"):
1029        """
1030        Get list of dataset owners
1031
1032        This returns a list of all owners, and does not transparently resolve
1033        tags (like `get_owners_users` does).
1034
1035        :param str|None role:  Role to check for. If `None`, all owners are
1036        returned regardless of role.
1037
1038        :return set:  De-duplicated owner list
1039        """
1040        return [
1041            owner
1042            for owner, meta in self.owners.items()
1043            if (role is None or meta["role"] == role)
1044        ]

Get list of dataset owners

This returns a list of all owners, and does not transparently resolve tags (like get_owners_users does).

Parameters
  • str|None role: Role to check for. If None, all owners are returned regardless of role.
Returns

De-duplicated owner list

def add_owner(self, username, role='owner'):
1046    def add_owner(self, username, role="owner"):
1047        """
1048        Set dataset owner
1049
1050        If the user is already an owner, but with a different role, the role is
1051        updated. If the user is already an owner with the same role, nothing happens.
1052
1053        :param str|User username:  Username to set as owner
1054        :param str|None role:  Role to add user with.
1055        """
1056        if type(username) is not str:
1057            if hasattr(username, "get_id"):
1058                username = username.get_id()
1059            else:
1060                raise TypeError("User must be a str or User object")
1061
1062        if username not in self.owners:
1063            self.owners[username] = {"name": username, "key": self.key, "role": role}
1064            self.db.insert("datasets_owners", data=self.owners[username], safe=True)
1065
1066        elif username in self.owners and self.owners[username]["role"] != role:
1067            self.db.update(
1068                "datasets_owners",
1069                data={"role": role},
1070                where={"name": username, "key": self.key},
1071            )
1072            self.owners[username]["role"] = role
1073
1074        if username.startswith("tag:"):
1075            # this is a bit more complicated than just adding to the list of
1076            # owners, so do a full refresh
1077            self.refresh_owners()
1078
1079        # make sure children's owners remain in sync
1080        for child in self.get_children(update=True):
1081            child.add_owner(username, role)
1082            # not recursive, since we're calling it from recursive code!
1083            child.copy_ownership_from(self, recursive=False)

Set dataset owner

If the user is already an owner, but with a different role, the role is updated. If the user is already an owner with the same role, nothing happens.

Parameters
  • str|User username: Username to set as owner
  • str|None role: Role to add user with.
def remove_owner(self, username):
1085    def remove_owner(self, username):
1086        """
1087        Remove dataset owner
1088
1089        If no owner is set, the dataset is assigned to the anonymous user.
1090        If the user is not an owner, nothing happens.
1091
1092        :param str|User username:  Username to set as owner
1093        """
1094        if type(username) is not str:
1095            if hasattr(username, "get_id"):
1096                username = username.get_id()
1097            else:
1098                raise TypeError("User must be a str or User object")
1099
1100        if username in self.owners:
1101            del self.owners[username]
1102            self.db.delete("datasets_owners", where={"name": username, "key": self.key})
1103
1104            if not self.owners:
1105                self.add_owner("anonymous")
1106
1107        if username in self.tagged_owners:
1108            del self.tagged_owners[username]
1109
1110        # make sure children's owners remain in sync
1111        for child in self.get_children(update=True):
1112            child.remove_owner(username)
1113            # not recursive, since we're calling it from recursive code!
1114            child.copy_ownership_from(self, recursive=False)

Remove dataset owner

If no owner is set, the dataset is assigned to the anonymous user. If the user is not an owner, nothing happens.

Parameters
  • str|User username: Username to set as owner
def refresh_owners(self):
1116    def refresh_owners(self):
1117        """
1118        Update internal owner cache
1119
1120        This makes sure that the list of *users* and *tags* which can access the
1121        dataset is up to date.
1122        """
1123        self.owners = {
1124            owner["name"]: owner
1125            for owner in self.db.fetchall(
1126                "SELECT * FROM datasets_owners WHERE key = %s", (self.key,)
1127            )
1128        }
1129
1130        # determine which users (if any) are owners of the dataset by having a
1131        # tag that is listed as an owner
1132        owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
1133        if owner_tags:
1134            tagged_owners = self.db.fetchall(
1135                "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)
1136            )
1137            self.tagged_owners = {
1138                owner_tag: [
1139                    user["name"] for user in tagged_owners if owner_tag in user["tags"]
1140                ]
1141                for owner_tag in owner_tags
1142            }
1143        else:
1144            self.tagged_owners = {}

Update internal owner cache

This makes sure that the list of users and tags which can access the dataset is up to date.

def copy_ownership_from(self, dataset, recursive=True):
1146    def copy_ownership_from(self, dataset, recursive=True):
1147        """
1148        Copy ownership
1149
1150        This is useful to e.g. make sure a dataset's ownership stays in sync
1151        with its parent
1152
1153        :param Dataset dataset:  Parent to copy from
1154        :return:
1155        """
1156        self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
1157
1158        for role in ("owner", "viewer"):
1159            owners = dataset.get_owners(role=role)
1160            for owner in owners:
1161                self.db.insert(
1162                    "datasets_owners",
1163                    data={"key": self.key, "name": owner, "role": role},
1164                    commit=False,
1165                    safe=True,
1166                )
1167
1168        self.db.commit()
1169        if recursive:
1170            for child in self.get_children(update=True):
1171                child.copy_ownership_from(self, recursive=recursive)

Copy ownership

This is useful to e.g. make sure a dataset's ownership stays in sync with its parent

Parameters
  • Dataset dataset: Parent to copy from
Returns
def get_parameters(self):
1173    def get_parameters(self):
1174        """
1175        Get dataset parameters
1176
1177        The dataset parameters are stored as JSON in the database - parse them
1178        and return the resulting object
1179
1180        :return:  Dataset parameters as originally stored
1181        """
1182        try:
1183            return json.loads(self.data["parameters"])
1184        except json.JSONDecodeError:
1185            return {}

Get dataset parameters

The dataset parameters are stored as JSON in the database - parse them and return the resulting object

Returns

Dataset parameters as originally stored

def get_columns(self):
1187    def get_columns(self):
1188        """
1189        Returns the dataset columns.
1190
1191        Useful for processor input forms. Can deal with both CSV and NDJSON
1192        files, the latter only if a `map_item` function is available in the
1193        processor that generated it. While in other cases one could use the
1194        keys of the JSON object, this is not always possible in follow-up code
1195        that uses the 'column' names, so for consistency this function acts as
1196        if no column can be parsed if no `map_item` function exists.
1197
1198        :return list:  List of dataset columns; empty list if unable to parse
1199        """
1200        if not self.get_results_path().exists():
1201            # no file to get columns from
1202            return []
1203
1204        if (self.get_results_path().suffix.lower() == ".csv") or (
1205            self.get_results_path().suffix.lower() == ".ndjson"
1206            and self.get_own_processor() is not None
1207            and self.get_own_processor().map_item_method_available(dataset=self)
1208        ):
1209            items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100)
1210            try:
1211                keys = list(next(items).keys())
1212                if self.annotation_fields:
1213                    for annotation_field in self.annotation_fields.values():
1214                        annotation_column = annotation_field["label"]
1215                        label_count = 1
1216                        while annotation_column in keys:
1217                            label_count += 1
1218                            annotation_column = (
1219                                f"{annotation_field['label']}_{label_count}"
1220                            )
1221                        keys.append(annotation_column)
1222                columns = keys
1223            except (StopIteration, NotImplementedError):
1224                # No items or otherwise unable to iterate
1225                columns = []
1226            finally:
1227                del items
1228        else:
1229            # Filetype not CSV or an NDJSON with `map_item`
1230            columns = []
1231
1232        return columns

Returns the dataset columns.

Useful for processor input forms. Can deal with both CSV and NDJSON files, the latter only if a map_item function is available in the processor that generated it. While in other cases one could use the keys of the JSON object, this is not always possible in follow-up code that uses the 'column' names, so for consistency this function acts as if no column can be parsed if no map_item function exists.

Returns

List of dataset columns; empty list if unable to parse

def update_label(self, label):
1234    def update_label(self, label):
1235        """
1236        Update label for this dataset
1237
1238        :param str label:  	New label
1239        :return str: 		The new label, as returned by get_label
1240        """
1241        self.parameters["label"] = label
1242
1243        self.db.update(
1244            "datasets",
1245            data={"parameters": json.dumps(self.parameters)},
1246            where={"key": self.key},
1247        )
1248        return self.get_label()

Update label for this dataset

Parameters
  • str label: New label
Returns
        The new label, as returned by get_label
def get_label(self, parameters=None, default='Query'):
1250    def get_label(self, parameters=None, default="Query"):
1251        """
1252        Generate a readable label for the dataset
1253
1254        :param dict parameters:  Parameters of the dataset
1255        :param str default:  Label to use if it cannot be inferred from the
1256        parameters
1257
1258        :return str:  Label
1259        """
1260        if not parameters:
1261            parameters = self.parameters
1262
1263        if parameters.get("label"):
1264            return parameters["label"]
1265        elif parameters.get("body_query") and parameters["body_query"] != "empty":
1266            return parameters["body_query"]
1267        elif parameters.get("body_match") and parameters["body_match"] != "empty":
1268            return parameters["body_match"]
1269        elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
1270            return parameters["subject_query"]
1271        elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
1272            return parameters["subject_match"]
1273        elif parameters.get("query"):
1274            label = parameters["query"]
1275            # Some legacy datasets have lists as query data
1276            if isinstance(label, list):
1277                label = ", ".join(label)
1278
1279            label = label if len(label) < 30 else label[:25] + "..."
1280            label = label.strip().replace("\n", ", ")
1281            return label
1282        elif parameters.get("country_flag") and parameters["country_flag"] != "all":
1283            return "Flag: %s" % parameters["country_flag"]
1284        elif parameters.get("country_name") and parameters["country_name"] != "all":
1285            return "Country: %s" % parameters["country_name"]
1286        elif parameters.get("filename"):
1287            return parameters["filename"]
1288        elif parameters.get("board") and "datasource" in parameters:
1289            return parameters["datasource"] + "/" + parameters["board"]
1290        elif (
1291            "datasource" in parameters
1292            and parameters["datasource"] in self.modules.datasources
1293        ):
1294            return (
1295                self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
1296            )
1297        else:
1298            return default

Generate a readable label for the dataset

Parameters
  • dict parameters: Parameters of the dataset
  • str default: Label to use if it cannot be inferred from the parameters
Returns

Label

def change_datasource(self, datasource):
1300    def change_datasource(self, datasource):
1301        """
1302        Change the datasource type for this dataset
1303
1304        :param str label:  New datasource type
1305        :return str:  The new datasource type
1306        """
1307
1308        self.parameters["datasource"] = datasource
1309
1310        self.db.update(
1311            "datasets",
1312            data={"parameters": json.dumps(self.parameters)},
1313            where={"key": self.key},
1314        )
1315        return datasource

Change the datasource type for this dataset

Parameters
  • str label: New datasource type
Returns

The new datasource type

def reserve_result_file(self, parameters=None, extension='csv'):
1317    def reserve_result_file(self, parameters=None, extension="csv"):
1318        """
1319        Generate a unique path to the results file for this dataset
1320
1321        This generates a file name for the data file of this dataset, and makes sure
1322        no file exists or will exist at that location other than the file we
1323        expect (i.e. the data for this particular dataset).
1324
1325        :param str extension: File extension, "csv" by default
1326        :param parameters:  Dataset parameters
1327        :return bool:  Whether the file path was successfully reserved
1328        """
1329        if self.data["is_finished"]:
1330            raise RuntimeError("Cannot reserve results file for a finished dataset")
1331
1332        # Use 'random' for random post queries
1333        if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
1334            file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"]
1335        # Use country code for country flag queries
1336        elif "country_flag" in parameters and parameters["country_flag"] != "all":
1337            file = (
1338                "countryflag-"
1339                + str(parameters["country_flag"])
1340                + "-"
1341                + self.data["key"]
1342            )
1343        # Use the query string for all other queries
1344        else:
1345            query_bit = self.data["query"].replace(" ", "-").lower()
1346            query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
1347            query_bit = query_bit[:100]  # Crop to avoid OSError
1348            file = query_bit + "-" + self.data["key"]
1349            file = re.sub(r"[-]+", "-", file)
1350
1351        self.data["result_file"] = file + "." + extension.lower()
1352        index = 1
1353        while self.get_results_path().is_file():
1354            self.data["result_file"] = file + "-" + str(index) + "." + extension.lower()
1355            index += 1
1356
1357        updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
1358                                 data={"result_file": self.data["result_file"]})
1359        return updated > 0

Generate a unique path to the results file for this dataset

This generates a file name for the data file of this dataset, and makes sure no file exists or will exist at that location other than the file we expect (i.e. the data for this particular dataset).

Parameters
  • str extension: File extension, "csv" by default
  • parameters: Dataset parameters
Returns

Whether the file path was successfully reserved

def get_key(self, query, parameters, parent='', time_offset=0):
1361    def get_key(self, query, parameters, parent="", time_offset=0):
1362        """
1363        Generate a unique key for this dataset that can be used to identify it
1364
1365        The key is a hash of a combination of the query string and parameters.
1366        You never need to call this, really: it's used internally.
1367
1368        :param str query:  Query string
1369        :param parameters:  Dataset parameters
1370        :param parent: Parent dataset's key (if applicable)
1371        :param time_offset:  Offset to add to the time component of the dataset
1372        key. This can be used to ensure a unique key even if the parameters and
1373        timing is otherwise identical to an existing dataset's
1374
1375        :return str:  Dataset key
1376        """
1377        # Return a hash based on parameters
1378        # we're going to use the hash of the parameters to uniquely identify
1379        # the dataset, so make sure it's always in the same order, or we might
1380        # end up creating multiple keys for the same dataset if python
1381        # decides to return the dict in a different order
1382        param_key = collections.OrderedDict()
1383        for key in sorted(parameters):
1384            param_key[key] = parameters[key]
1385
1386        # we additionally use the current time as a salt - this should usually
1387        # ensure a unique key for the dataset. if for some reason there is a
1388        # hash collision
1389        param_key["_salt"] = int(time.time()) + time_offset
1390
1391        parent_key = str(parent) if parent else ""
1392        plain_key = repr(param_key) + str(query) + parent_key
1393        hashed_key = hash_to_md5(plain_key)
1394
1395        if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1396            # key exists, generate a new one
1397            return self.get_key(
1398                query, parameters, parent, time_offset=random.randint(1, 10)
1399            )
1400        else:
1401            return hashed_key

Generate a unique key for this dataset that can be used to identify it

The key is a hash of a combination of the query string and parameters. You never need to call this, really: it's used internally.

Parameters
  • str query: Query string
  • parameters: Dataset parameters
  • parent: Parent dataset's key (if applicable)
  • time_offset: Offset to add to the time component of the dataset key. This can be used to ensure a unique key even if the parameters and timing is otherwise identical to an existing dataset's
Returns

Dataset key

def set_key(self, key):
1403    def set_key(self, key):
1404        """
1405        Change dataset key
1406
1407        In principe, keys should never be changed. But there are rare cases
1408        where it is useful to do so, in particular when importing a dataset
1409        from another 4CAT instance; in that case it makes sense to try and
1410        ensure that the key is the same as it was before. This function sets
1411        the dataset key and updates any dataset references to it.
1412
1413        :param str key:  Key to set
1414        :return str:  Key that was set. If the desired key already exists, the
1415        original key is kept.
1416        """
1417        key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1418        if key_exists or not key:
1419            return self.key
1420
1421        old_key = self.key
1422        self.db.update("datasets", data={"key": key}, where={"key": old_key})
1423
1424        # update references
1425        self.db.update(
1426            "datasets", data={"key_parent": key}, where={"key_parent": old_key}
1427        )
1428        self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1429        self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1430        self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1431
1432        # for good measure
1433        self.db.commit()
1434        self.key = key
1435
1436        return self.key

Change dataset key

In principe, keys should never be changed. But there are rare cases where it is useful to do so, in particular when importing a dataset from another 4CAT instance; in that case it makes sense to try and ensure that the key is the same as it was before. This function sets the dataset key and updates any dataset references to it.

Parameters
  • str key: Key to set
Returns

Key that was set. If the desired key already exists, the original key is kept.

def get_status(self):
1438    def get_status(self):
1439        """
1440        Get Dataset status
1441
1442        :return string: Dataset status
1443        """
1444        return self.data["status"]

Get Dataset status

Returns

Dataset status

def update_status(self, status, is_final=False):
1446    def update_status(self, status, is_final=False):
1447        """
1448        Update dataset status
1449
1450        The status is a string that may be displayed to a user to keep them
1451        updated and informed about the progress of a dataset. No memory is kept
1452        of earlier dataset statuses; the current status is overwritten when
1453        updated.
1454
1455        Statuses are also written to the dataset log file.
1456
1457        :param string status:  Dataset status
1458        :param bool is_final:  If this is `True`, subsequent calls to this
1459        method while the object is instantiated will not update the dataset
1460        status.
1461        :return bool:  Status update successful?
1462        """
1463        if self.no_status_updates:
1464            return
1465
1466        # for presets, copy the updated status to the preset(s) this is part of
1467        if self.preset_parent is None:
1468            self.preset_parent = [
1469                parent
1470                for parent in self.get_genealogy()
1471                if parent.type.find("preset-") == 0 and parent.key != self.key
1472            ][:1]
1473
1474        if self.preset_parent:
1475            for preset_parent in self.preset_parent:
1476                if not preset_parent.is_finished():
1477                    preset_parent.update_status(status)
1478
1479        self.data["status"] = status
1480        updated = self.db.update(
1481            "datasets", where={"key": self.data["key"]}, data={"status": status}
1482        )
1483
1484        if is_final:
1485            self.no_status_updates = True
1486
1487        self.log(status)
1488
1489        return updated > 0

Update dataset status

The status is a string that may be displayed to a user to keep them updated and informed about the progress of a dataset. No memory is kept of earlier dataset statuses; the current status is overwritten when updated.

Statuses are also written to the dataset log file.

Parameters
  • string status: Dataset status
  • bool is_final: If this is True, subsequent calls to this method while the object is instantiated will not update the dataset status.
Returns

Status update successful?

def update_progress(self, progress):
1491    def update_progress(self, progress):
1492        """
1493        Update dataset progress
1494
1495        The progress can be used to indicate to a user how close the dataset
1496        is to completion.
1497
1498        :param float progress:  Between 0 and 1.
1499        :return:
1500        """
1501        progress = min(1, max(0, progress))  # clamp
1502        if type(progress) is int:
1503            progress = float(progress)
1504
1505        self.data["progress"] = progress
1506        updated = self.db.update(
1507            "datasets", where={"key": self.data["key"]}, data={"progress": progress}
1508        )
1509        return updated > 0

Update dataset progress

The progress can be used to indicate to a user how close the dataset is to completion.

Parameters
  • float progress: Between 0 and 1.
Returns
def get_progress(self):
1511    def get_progress(self):
1512        """
1513        Get dataset progress
1514
1515        :return float:  Progress, between 0 and 1
1516        """
1517        return self.data["progress"]

Get dataset progress

Returns

Progress, between 0 and 1

def finish_with_error(self, error):
1519    def finish_with_error(self, error):
1520        """
1521        Set error as final status, and finish with 0 results
1522
1523        This is a convenience function to avoid having to repeat
1524        "update_status" and "finish" a lot.
1525
1526        :param str error:  Error message for final dataset status.
1527        :return:
1528        """
1529        self.update_status(error, is_final=True)
1530        self.finish(0)
1531
1532        return None

Set error as final status, and finish with 0 results

This is a convenience function to avoid having to repeat "update_status" and "finish" a lot.

Parameters
  • str error: Error message for final dataset status.
Returns
def update_version(self, version):
1534    def update_version(self, version):
1535        """
1536        Update software version used for this dataset
1537
1538        This can be used to verify the code that was used to process this dataset.
1539
1540        :param string version:  Version identifier
1541        :return bool:  Update successul?
1542        """
1543        try:
1544            # this fails if the processor type is unknown
1545            # edge case, but let's not crash...
1546            processor_path = self.modules.processors.get(self.data["type"]).filepath
1547        except AttributeError:
1548            processor_path = ""
1549
1550        updated = self.db.update(
1551            "datasets",
1552            where={"key": self.data["key"]},
1553            data={
1554                "software_version": version[0],
1555                "software_source": version[1],
1556                "software_file": processor_path,
1557            },
1558        )
1559
1560        return updated > 0

Update software version used for this dataset

This can be used to verify the code that was used to process this dataset.

Parameters
  • string version: Version identifier
Returns

Update successul?

def delete_parameter(self, parameter, instant=True):
1562    def delete_parameter(self, parameter, instant=True):
1563        """
1564        Delete a parameter from the dataset metadata
1565
1566        :param string parameter:  Parameter to delete
1567        :param bool instant:  Also delete parameters in this instance object?
1568        :return bool:  Update successul?
1569        """
1570        parameters = self.parameters.copy()
1571        if parameter in parameters:
1572            del parameters[parameter]
1573        else:
1574            return False
1575
1576        updated = self.db.update(
1577            "datasets",
1578            where={"key": self.data["key"]},
1579            data={"parameters": json.dumps(parameters)},
1580        )
1581
1582        if instant:
1583            self.parameters = parameters
1584
1585        return updated > 0

Delete a parameter from the dataset metadata

Parameters
  • string parameter: Parameter to delete
  • bool instant: Also delete parameters in this instance object?
Returns

Update successul?

def get_version_url(self, file):
1587    def get_version_url(self, file):
1588        """
1589        Get a versioned github URL for the version this dataset was processed with
1590
1591        :param file:  File to link within the repository
1592        :return:  URL, or an empty string
1593        """
1594        if not self.data["software_source"]:
1595            return ""
1596
1597        filepath = self.data.get("software_file", "")
1598        if filepath.startswith("/extensions/"):
1599            # go to root of extension
1600            filepath = "/" + "/".join(filepath.split("/")[3:])
1601
1602        return (
1603            self.data["software_source"]
1604            + "/blob/"
1605            + self.data["software_version"]
1606            + filepath
1607        )

Get a versioned github URL for the version this dataset was processed with

Parameters
  • file: File to link within the repository
Returns

URL, or an empty string

def top_parent(self):
1609    def top_parent(self):
1610        """
1611        Get root dataset
1612
1613        Traverses the tree of datasets this one is part of until it finds one
1614        with no source_dataset dataset, then returns that dataset.
1615
1616        :return Dataset: Parent dataset
1617        """
1618        genealogy = self.get_genealogy()
1619        return genealogy[0]

Get root dataset

Traverses the tree of datasets this one is part of until it finds one with no source_dataset dataset, then returns that dataset.

Returns

Parent dataset

def get_genealogy(self):
1621    def get_genealogy(self):
1622        """
1623        Get genealogy of this dataset
1624
1625        Creates a list of DataSet objects, with the first one being the
1626        'top' dataset, and each subsequent one being a child of the previous
1627        one, ending with the current dataset.
1628
1629        :return list:  Dataset genealogy, oldest dataset first
1630        """
1631        if not self._genealogy:
1632            key_parent = self.key_parent
1633            genealogy = []
1634
1635            while key_parent:
1636                try:
1637                    parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1638                except DataSetException:
1639                    break
1640
1641                genealogy.append(parent)
1642                if parent.key_parent:
1643                    key_parent = parent.key_parent
1644                else:
1645                    break
1646
1647            genealogy.reverse()
1648            self._genealogy = genealogy
1649
1650        genealogy = self._genealogy
1651        genealogy.append(self)
1652
1653        return genealogy

Get genealogy of this dataset

Creates a list of DataSet objects, with the first one being the 'top' dataset, and each subsequent one being a child of the previous one, ending with the current dataset.

Returns

Dataset genealogy, oldest dataset first

def get_children(self, update=False):
1655    def get_children(self, update=False):
1656        """
1657        Get children of this dataset
1658
1659        :param bool update:  Update the list of children from database if True, else return cached value
1660        :return list:  List of child datasets
1661        """
1662        if self._children is not None and not update:
1663            return self._children
1664
1665        analyses = self.db.fetchall(
1666            "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC",
1667            (self.key,),
1668        )
1669        self._children = [
1670            DataSet(data=analysis, db=self.db, modules=self.modules)
1671            for analysis in analyses
1672        ]
1673        return self._children

Get children of this dataset

Parameters
  • bool update: Update the list of children from database if True, else return cached value
Returns

List of child datasets

def get_all_children(self, recursive=True, update=True):
1675    def get_all_children(self, recursive=True, update=True):
1676        """
1677        Get all children of this dataset
1678
1679        Results are returned as a non-hierarchical list, i.e. the result does
1680        not reflect the actual dataset hierarchy (but all datasets in the
1681        result will have the original dataset as an ancestor somewhere)
1682
1683        :return list:  List of DataSets
1684        """
1685        children = self.get_children(update=update)
1686        results = children.copy()
1687        if recursive:
1688            for child in children:
1689                results += child.get_all_children(recursive=recursive, update=update)
1690
1691        return results

Get all children of this dataset

Results are returned as a non-hierarchical list, i.e. the result does not reflect the actual dataset hierarchy (but all datasets in the result will have the original dataset as an ancestor somewhere)

Returns

List of DataSets

def nearest(self, type_filter):
1693    def nearest(self, type_filter):
1694        """
1695        Return nearest dataset that matches the given type
1696
1697        Starting with this dataset, traverse the hierarchy upwards and return
1698        whichever dataset matches the given type.
1699
1700        :param str type_filter:  Type filter. Can contain wildcards and is matched
1701        using `fnmatch.fnmatch`.
1702        :return:  Earliest matching dataset, or `None` if none match.
1703        """
1704        genealogy = self.get_genealogy()
1705        for dataset in reversed(genealogy):
1706            if fnmatch.fnmatch(dataset.type, type_filter):
1707                return dataset
1708
1709        return None

Return nearest dataset that matches the given type

Starting with this dataset, traverse the hierarchy upwards and return whichever dataset matches the given type.

Parameters
  • str type_filter: Type filter. Can contain wildcards and is matched using fnmatch.fnmatch.
Returns

Earliest matching dataset, or None if none match.

def get_breadcrumbs(self):
1711    def get_breadcrumbs(self):
1712        """
1713        Get breadcrumbs navlink for use in permalinks
1714
1715        Returns a string representing this dataset's genealogy that may be used
1716        to uniquely identify it.
1717
1718        :return str: Nav link
1719        """
1720        if not self.key_parent:
1721            return self.key
1722
1723        genealogy = self.get_genealogy()
1724        return ",".join([d.key for d in genealogy])

Get breadcrumbs navlink for use in permalinks

Returns a string representing this dataset's genealogy that may be used to uniquely identify it.

Returns

Nav link

def get_compatible_processors(self, config=None):
1726    def get_compatible_processors(self, config=None):
1727        """
1728        Get list of processors compatible with this dataset
1729
1730        Checks whether this dataset type is one that is listed as being accepted
1731        by the processor, for each known type: if the processor does not
1732        specify accepted types (via the `is_compatible_with` method), it is
1733        assumed it accepts any top-level datasets
1734
1735        :param ConfigManager|None config:  Configuration reader to determine
1736        compatibility through. This may not be the same reader the dataset was
1737        instantiated with, e.g. when checking whether some other user should
1738        be able to run processors on this dataset.
1739        :return dict:  Compatible processors, `name => class` mapping
1740        """
1741        processors = self.modules.processors
1742
1743        available = {}
1744        for processor_type, processor in processors.items():
1745            if processor.is_from_collector():
1746                continue
1747
1748            own_processor = self.get_own_processor()
1749            if own_processor and own_processor.exclude_followup_processors(
1750                processor_type
1751            ):
1752                continue
1753
1754            # consider a processor compatible if its is_compatible_with
1755            # method returns True *or* if it has no explicit compatibility
1756            # check and this dataset is top-level (i.e. has no parent)
1757            if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1758                    or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)):
1759                available[processor_type] = processor
1760
1761        return available

Get list of processors compatible with this dataset

Checks whether this dataset type is one that is listed as being accepted by the processor, for each known type: if the processor does not specify accepted types (via the is_compatible_with method), it is assumed it accepts any top-level datasets

Parameters
  • ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
Returns

Compatible processors, name => class mapping

def get_place_in_queue(self, update=False):
1763    def get_place_in_queue(self, update=False):
1764        """
1765        Determine dataset's position in queue
1766
1767        If the dataset is already finished, the position is -1. Else, the
1768        position is the number of datasets to be completed before this one will
1769        be processed. A position of 0 would mean that the dataset is currently
1770        being executed, or that the backend is not running.
1771
1772        :param bool update:  Update the queue position from database if True, else return cached value
1773        :return int:  Queue position
1774        """
1775        if self.is_finished() or not self.data.get("job"):
1776            self._queue_position = -1
1777            return self._queue_position
1778        elif not update and self._queue_position is not None:
1779            # Use cached value
1780            return self._queue_position
1781        else:
1782            # Collect queue position from database via the job
1783            try:
1784                job = Job.get_by_ID(self.data["job"], self.db)
1785                self._queue_position = job.get_place_in_queue()
1786            except JobNotFoundException:
1787                self._queue_position = -1
1788
1789            return self._queue_position

Determine dataset's position in queue

If the dataset is already finished, the position is -1. Else, the position is the number of datasets to be completed before this one will be processed. A position of 0 would mean that the dataset is currently being executed, or that the backend is not running.

Parameters
  • bool update: Update the queue position from database if True, else return cached value
Returns

Queue position

def get_own_processor(self):
1791    def get_own_processor(self):
1792        """
1793        Get the processor class that produced this dataset
1794
1795        :return:  Processor class, or `None` if not available.
1796        """
1797        processor_type = self.parameters.get("type", self.data.get("type"))
1798
1799        return self.modules.processors.get(processor_type)

Get the processor class that produced this dataset

Returns

Processor class, or None if not available.

def get_available_processors(self, config=None, exclude_hidden=False):
1801    def get_available_processors(self, config=None, exclude_hidden=False):
1802        """
1803        Get list of processors that may be run for this dataset
1804
1805        Returns all compatible processors except for those that are already
1806        queued or finished and have no options. Processors that have been
1807        run but have options are included so they may be run again with a
1808        different configuration
1809
1810        :param ConfigManager|None config:  Configuration reader to determine
1811        compatibility through. This may not be the same reader the dataset was
1812        instantiated with, e.g. when checking whether some other user should
1813        be able to run processors on this dataset.
1814        :param bool exclude_hidden:  Exclude processors that should be displayed
1815        in the UI? If `False`, all processors are returned.
1816
1817        :return dict:  Available processors, `name => properties` mapping
1818        """
1819        if self.available_processors:
1820            # Update to reflect exclude_hidden parameter which may be different from last call
1821            # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1822            return {
1823                processor_type: processor
1824                for processor_type, processor in self.available_processors.items()
1825                if not exclude_hidden or not processor.is_hidden
1826            }
1827
1828        processors = self.get_compatible_processors(config=config)
1829
1830        for analysis in self.get_children(update=True):
1831            if analysis.type not in processors:
1832                continue
1833
1834            if not processors[analysis.type].get_options(config=config):
1835                # No variable options; this processor has been run so remove
1836                del processors[analysis.type]
1837                continue
1838
1839            if exclude_hidden and processors[analysis.type].is_hidden:
1840                del processors[analysis.type]
1841
1842        self.available_processors = processors
1843        return processors

Get list of processors that may be run for this dataset

Returns all compatible processors except for those that are already queued or finished and have no options. Processors that have been run but have options are included so they may be run again with a different configuration

Parameters
  • ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
  • bool exclude_hidden: Exclude processors that should be displayed in the UI? If False, all processors are returned.
Returns

Available processors, name => properties mapping

def get_parent(self):
1883    def get_parent(self):
1884        """
1885        Get parent dataset
1886
1887        :return DataSet:  Parent dataset, or `None` if not applicable
1888        """
1889        return (
1890            DataSet(key=self.key_parent, db=self.db, modules=self.modules)
1891            if self.key_parent
1892            else None
1893        )

Get parent dataset

Returns

Parent dataset, or None if not applicable

def detach(self):
1895    def detach(self):
1896        """
1897        Makes the datasets standalone, i.e. not having any source_dataset dataset
1898        """
1899        self.link_parent("")

Makes the datasets standalone, i.e. not having any source_dataset dataset

def is_dataset(self):
1901    def is_dataset(self):
1902        """
1903        Easy way to confirm this is a dataset.
1904        Used for checking processor and dataset compatibility,
1905        which needs to handle both processors and datasets.
1906        """
1907        return True

Easy way to confirm this is a dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.

def is_top_dataset(self):
1909    def is_top_dataset(self):
1910        """
1911        Easy way to confirm this is a top dataset.
1912        Used for checking processor and dataset compatibility,
1913        which needs to handle both processors and datasets.
1914        """
1915        if self.key_parent:
1916            return False
1917        return True

Easy way to confirm this is a top dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.

def is_expiring(self, config):
1919    def is_expiring(self, config):
1920        """
1921        Determine if dataset is set to expire
1922
1923        Similar to `is_expired`, but checks if the dataset will be deleted in
1924        the future, not if it should be deleted right now.
1925
1926        :param ConfigManager config:  Configuration reader (context-aware)
1927        :return bool|int:  `False`, or the expiration date as a Unix timestamp.
1928        """
1929        # has someone opted out of deleting this?
1930        if self.parameters.get("keep"):
1931            return False
1932
1933        # is this dataset explicitly marked as expiring after a certain time?
1934        if self.parameters.get("expires-after"):
1935            return self.parameters.get("expires-after")
1936
1937        # is the data source configured to have its datasets expire?
1938        expiration = config.get("datasources.expiration", {})
1939        if not expiration.get(self.parameters.get("datasource")):
1940            return False
1941
1942        # is there a timeout for this data source?
1943        if expiration.get(self.parameters.get("datasource")).get("timeout"):
1944            return self.timestamp + expiration.get(
1945                self.parameters.get("datasource")
1946            ).get("timeout")
1947
1948        return False

Determine if dataset is set to expire

Similar to is_expired, but checks if the dataset will be deleted in the future, not if it should be deleted right now.

Parameters
  • ConfigManager config: Configuration reader (context-aware)
Returns

False, or the expiration date as a Unix timestamp.

def is_expired(self, config):
1950    def is_expired(self, config):
1951        """
1952        Determine if dataset should be deleted
1953
1954        Datasets can be set to expire, but when they should be deleted depends
1955        on a number of factor. This checks them all.
1956
1957        :param ConfigManager config:  Configuration reader (context-aware)
1958        :return bool:
1959        """
1960        # has someone opted out of deleting this?
1961        if not self.is_expiring(config):
1962            return False
1963
1964        # is this dataset explicitly marked as expiring after a certain time?
1965        future = (
1966            time.time() + 3600
1967        )  # ensure we don't delete datasets with invalid expiration times
1968        if (
1969            self.parameters.get("expires-after")
1970            and convert_to_int(self.parameters["expires-after"], future) < time.time()
1971        ):
1972            return True
1973
1974        # is the data source configured to have its datasets expire?
1975        expiration = config.get("datasources.expiration", {})
1976        if not expiration.get(self.parameters.get("datasource")):
1977            return False
1978
1979        # is the dataset older than the set timeout?
1980        if expiration.get(self.parameters.get("datasource")).get("timeout"):
1981            return (
1982                self.timestamp
1983                + expiration[self.parameters.get("datasource")]["timeout"]
1984                < time.time()
1985            )
1986
1987        return False

Determine if dataset should be deleted

Datasets can be set to expire, but when they should be deleted depends on a number of factor. This checks them all.

Parameters
  • ConfigManager config: Configuration reader (context-aware)
Returns
def is_from_collector(self):
1989    def is_from_collector(self):
1990        """
1991        Check if this dataset was made by a processor that collects data, i.e.
1992        a search or import worker.
1993
1994        :return bool:
1995        """
1996        return self.type.endswith("-search") or self.type.endswith("-import")

Check if this dataset was made by a processor that collects data, i.e. a search or import worker.

Returns
def get_extension(self):
1998    def get_extension(self):
1999        """
2000        Gets the file extention this dataset produces.
2001        Also checks whether the results file exists.
2002        Used for checking processor and dataset compatibility.
2003
2004        :return str extension:  Extension, e.g. `csv`
2005        """
2006        if self.get_results_path().exists():
2007            return self.get_results_path().suffix[1:]
2008
2009        return False

Gets the file extention this dataset produces. Also checks whether the results file exists. Used for checking processor and dataset compatibility.

Returns

Extension, e.g. csv

def get_media_type(self):
2011    def get_media_type(self):
2012        """
2013        Gets the media type of the dataset file.
2014
2015        :return str: media type, e.g., "text"
2016        """
2017        own_processor = self.get_own_processor()
2018        if hasattr(self, "media_type"):
2019            # media type can be defined explicitly in the dataset; this is the priority
2020            return self.media_type
2021        elif own_processor is not None:
2022            # or media type can be defined in the processor
2023            # some processors can set different media types for different datasets (e.g., import_media)
2024            if hasattr(own_processor, "media_type"):
2025                return own_processor.media_type
2026
2027        # Default to text
2028        return self.parameters.get("media_type", "text")

Gets the media type of the dataset file.

Returns

media type, e.g., "text"

def get_metadata(self):
2030    def get_metadata(self):
2031        """
2032        Get dataset metadata
2033
2034        This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
2035        as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
2036        update its database (and ensure compatibility with the exporting version of 4CAT).
2037        """
2038        metadata = self.db.fetchone(
2039            "SELECT * FROM datasets WHERE key = %s", (self.key,)
2040        )
2041
2042        # get 4CAT version (presumably to ensure export is compatible with import)
2043        metadata["current_4CAT_version"] = get_software_version()
2044        return metadata

Get dataset metadata

This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to update its database (and ensure compatibility with the exporting version of 4CAT).

def get_result_url(self):
2046    def get_result_url(self):
2047        """
2048        Gets the 4CAT frontend URL of a dataset file.
2049
2050        Uses the FlaskConfig attributes (i.e., SERVER_NAME and
2051        SERVER_HTTPS) plus hardcoded '/result/'.
2052        TODO: create more dynamic method of obtaining url.
2053        """
2054        filename = self.get_results_path().name
2055
2056        # we cheat a little here by using the modules' config reader, but these
2057        # will never be context-dependent values anyway
2058        url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \
2059                        self.modules.config.get("flask.server_name") + '/result/' + filename
2060        return url_to_file

Gets the 4CAT frontend URL of a dataset file.

Uses the FlaskConfig attributes (i.e., SERVER_NAME and SERVER_HTTPS) plus hardcoded '/result/'. TODO: create more dynamic method of obtaining url.

def warn_unmappable_item( self, item_count, processor=None, error_message=None, warn_admins=True):
2062    def warn_unmappable_item(
2063        self, item_count, processor=None, error_message=None, warn_admins=True
2064    ):
2065        """
2066        Log an item that is unable to be mapped and warn administrators.
2067
2068        :param int item_count:			Item index
2069        :param Processor processor:		Processor calling function8
2070        """
2071        dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
2072
2073        # Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
2074        closest_dataset = (
2075            processor.dataset
2076            if processor is not None and processor.dataset is not None
2077            else self
2078        )
2079        # Log error to dataset log
2080        closest_dataset.log(dataset_error_message)
2081
2082        if warn_admins:
2083            if processor is not None:
2084                processor.log.warning(
2085                    f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}."
2086                )
2087            elif hasattr(self.db, "log"):
2088                # borrow the database's log handler
2089                self.db.log.warning(
2090                    f"Unable to map item all items for dataset {closest_dataset.key}."
2091                )
2092            else:
2093                # No other log available
2094                raise DataSetException(
2095                    f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn"
2096                )

Log an item that is unable to be mapped and warn administrators.

Parameters
  • int item_count: Item index
  • Processor processor: Processor calling function8
def has_annotations(self) -> bool:
2099    def has_annotations(self) -> bool:
2100        """
2101        Whether this dataset has annotations
2102        """
2103
2104        annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,))
2105
2106        return True if annotation else False

Whether this dataset has annotations

def num_annotations(self) -> int:
2108    def num_annotations(self) -> int:
2109        """
2110        Get the amount of annotations
2111        """
2112        return self.db.fetchone(
2113            "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,)
2114        )["count"]

Get the amount of annotations

def get_annotation(self, data: dict):
2116    def get_annotation(self, data: dict):
2117        """
2118        Retrieves a specific annotation if it exists.
2119
2120        :param data:		A dictionary with which to get the annotations from.
2121                                                To get specific annotations, include either an `id` field or
2122                                                `field_id` and `item_id` fields.
2123
2124        return Annotation:	Annotation object.
2125        """
2126
2127        if "id" not in data or ("field_id" not in data and "item_id" not in data):
2128            return None
2129
2130        if "dataset" not in data:
2131            data["dataset"] = self.key
2132
2133        return Annotation(data=data, db=self.db)

Retrieves a specific annotation if it exists.

Parameters
  • data: A dictionary with which to get the annotations from. To get specific annotations, include either an id field or field_id and item_id fields.

return Annotation: Annotation object.

def get_annotations(self) -> list:
2135    def get_annotations(self) -> list:
2136        """
2137        Retrieves all annotations for this dataset.
2138
2139        return list: 	List of Annotation objects.
2140        """
2141
2142        return Annotation.get_annotations_for_dataset(self.db, self.key)

Retrieves all annotations for this dataset.

return list: List of Annotation objects.

def get_annotations_for_item(self, item_id: str) -> list:
2144    def get_annotations_for_item(self, item_id: str) -> list:
2145        """
2146        Retrieves all annotations from this dataset for a specific item (e.g. social media post).
2147        """
2148        return Annotation.get_annotations_for_dataset(
2149            self.db, self.key, item_id=item_id
2150        )

Retrieves all annotations from this dataset for a specific item (e.g. social media post).

def has_annotation_fields(self) -> bool:
2152    def has_annotation_fields(self) -> bool:
2153        """
2154        Returns True if there's annotation fields saved tot the dataset table
2155        Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.).
2156        """
2157
2158        return True if self.annotation_fields else False

Returns True if there's annotation fields saved tot the dataset table Annotation fields are metadata that describe a type of annotation (with info on id, type, etc.).

def get_annotation_field_labels(self) -> list:
2160    def get_annotation_field_labels(self) -> list:
2161        """
2162        Retrieves the saved annotation field labels for this dataset.
2163        These are stored in the annotations table.
2164
2165        :return list: List of annotation field labels.
2166        """
2167
2168        annotation_fields = self.annotation_fields
2169
2170        if not annotation_fields:
2171            return []
2172
2173        labels = [v["label"] for v in annotation_fields.values()]
2174
2175        return labels

Retrieves the saved annotation field labels for this dataset. These are stored in the annotations table.

Returns

List of annotation field labels.

def save_annotations(self, annotations: list) -> int:
2177    def save_annotations(self, annotations: list) -> int:
2178        """
2179        Takes a list of annotations and saves them to the annotations table.
2180        If a field is not yet present in the `annotation_fields` column in
2181        the datasets table, it also adds it there.
2182
2183        :param list annotations:		List of dictionaries with annotation items. Must have `item_id`, `field_id`,
2184                                                                        and `label`.
2185                                                                        `item_id` is for the specific item being annotated (e.g. a social media post)
2186                                                                        `field_id` refers to the annotation field.
2187                                                                        `label` is a human-readable description of this annotation.
2188                                                                        E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd",
2189                                                                         "value": "Yes"}]
2190
2191        :returns int:					How many annotations were saved.
2192
2193        """
2194
2195        if not annotations:
2196            return 0
2197
2198        count = 0
2199        annotation_fields = self.annotation_fields
2200
2201        # Add some dataset data to annotations, if not present
2202        for annotation_data in annotations:
2203            # Check if the required fields are present
2204            if "item_id" not in annotation_data:
2205                raise AnnotationException(
2206                    "Can't save annotations; annotation must have an `item_id` referencing "
2207                    "the item it annotated, got %s" % annotation_data
2208                )
2209            if "field_id" not in annotation_data:
2210                raise AnnotationException(
2211                    "Can't save annotations; annotation must have a `field_id` field, "
2212                    "got %s" % annotation_data
2213                )
2214            if "label" not in annotation_data or not isinstance(
2215                annotation_data["label"], str
2216            ):
2217                raise AnnotationException(
2218                    "Can't save annotations; annotation must have a `label` field, "
2219                    "got %s" % annotation_data
2220                )
2221
2222            # Set dataset key
2223            if not annotation_data.get("dataset"):
2224                annotation_data["dataset"] = self.key
2225
2226            # Set default author to this dataset owner
2227            # If this annotation is made by a processor, it will have the processor name
2228            if not annotation_data.get("author"):
2229                annotation_data["author"] = self.get_owners()[0]
2230
2231            # Create Annotation object, which also saves it to the database
2232            # If this dataset/item ID/label combination already exists, this retrieves the
2233            # existing data and updates it with new values.
2234            Annotation(data=annotation_data, db=self.db)
2235            count += 1
2236
2237        # Save annotation fields if things changed
2238        if annotation_fields != self.annotation_fields:
2239            self.save_annotation_fields(annotation_fields)
2240
2241        return count

Takes a list of annotations and saves them to the annotations table. If a field is not yet present in the annotation_fields column in the datasets table, it also adds it there.

Parameters
  • list annotations: List of dictionaries with annotation items. Must have item_id, field_id, and label. item_id is for the specific item being annotated (e.g. a social media post) field_id refers to the annotation field. label is a human-readable description of this annotation. E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", "value": "Yes"}]

:returns int: How many annotations were saved.

def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2243    def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2244        """
2245        Save annotation field data to the datasets table (in the `annotation_fields` column).
2246        If changes to the annotation fields affect existing annotations,
2247        this function will also call `update_annotations_via_fields()` to change them.
2248
2249        :param dict new_fields:  		New annotation fields, with a field ID as key.
2250
2251        :param bool add:				Whether we're merely adding new fields
2252                                                                        or replacing the whole batch. If add is False,
2253                                                                        `new_fields` should contain all fields.
2254
2255        :return int:					The number of annotation fields saved.
2256
2257        """
2258
2259        # Get existing annotation fields to see if stuff changed.
2260        old_fields = self.annotation_fields
2261        changes = False
2262
2263        # Do some validation
2264        # Annotation field must be valid JSON.
2265        try:
2266            json.dumps(new_fields)
2267        except ValueError:
2268            raise AnnotationException(
2269                "Can't save annotation fields: not valid JSON (%s)" % new_fields
2270            )
2271
2272        # No duplicate IDs
2273        if len(new_fields) != len(set(new_fields)):
2274            raise AnnotationException(
2275                "Can't save annotation fields: field IDs must be unique"
2276            )
2277
2278        # Annotation fields must at minimum have `type` and `label` keys.
2279        for field_id, annotation_field in new_fields.items():
2280            if not isinstance(field_id, str):
2281                raise AnnotationException(
2282                    "Can't save annotation fields: field ID %s is not a valid string"
2283                    % field_id
2284                )
2285            if "label" not in annotation_field:
2286                raise AnnotationException(
2287                    "Can't save annotation fields: all fields must have a label"
2288                    % field_id
2289                )
2290            if "type" not in annotation_field:
2291                raise AnnotationException(
2292                    "Can't save annotation fields: all fields must have a type"
2293                    % field_id
2294                )
2295
2296        # Check if fields are removed
2297        if not add:
2298            for field_id in old_fields.keys():
2299                if field_id not in new_fields:
2300                    changes = True
2301
2302        # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin
2303        # dataset
2304        for field_id in new_fields.keys():
2305            if field_id in old_fields and old_fields[field_id].get("from_dataset"):
2306                old_fields[field_id]["label"] = new_fields[field_id][
2307                    "label"
2308                ]  # Only labels could've been changed
2309                new_fields[field_id] = old_fields[field_id]
2310
2311        # If we're just adding fields, add them to the old fields.
2312        # If the field already exists, overwrite the old field.
2313        if add and old_fields:
2314            all_fields = old_fields
2315            for field_id, annotation_field in new_fields.items():
2316                all_fields[field_id] = annotation_field
2317            new_fields = all_fields
2318
2319        # We're saving the new annotation fields as-is.
2320        # Ordering of fields is preserved this way.
2321        self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)})
2322        self.annotation_fields = new_fields
2323
2324        # If anything changed with the annotation fields, possibly update
2325        # existing annotations (e.g. to delete them or change their labels).
2326        if changes:
2327            Annotation.update_annotations_via_fields(
2328                self.key, old_fields, new_fields, self.db
2329            )
2330
2331        return len(new_fields)

Save annotation field data to the datasets table (in the annotation_fields column). If changes to the annotation fields affect existing annotations, this function will also call update_annotations_via_fields() to change them.

Parameters
  • dict new_fields: New annotation fields, with a field ID as key.

  • bool add: Whether we're merely adding new fields or replacing the whole batch. If add is False, new_fields should contain all fields.

Returns
                                The number of annotation fields saved.
def get_annotation_metadata(self) -> dict:
2333    def get_annotation_metadata(self) -> dict:
2334        """
2335        Retrieves all the data for this dataset from the annotations table.
2336        """
2337
2338        annotation_data = self.db.fetchall(
2339            "SELECT * FROM annotations WHERE dataset = '%s';" % self.key
2340        )
2341        return annotation_data

Retrieves all the data for this dataset from the annotations table.