Edit on GitHub

common.lib.dataset

   1import collections
   2import itertools
   3import datetime
   4import zipfile
   5import fnmatch
   6import random
   7import shutil
   8import json
   9import time
  10import csv
  11import re
  12import os
  13
  14from common.lib.annotation import Annotation
  15from common.lib.job import Job, JobNotFoundException
  16
  17from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version, call_api, hash_to_md5, convert_to_float
  18from common.lib.item_mapping import MappedItem, DatasetItem
  19from common.lib.fourcat_module import FourcatModule
  20from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException,
  21                                   MapItemException, MappedItemIncompleteException, AnnotationException)
  22
  23
  24class DataSet(FourcatModule):
  25    """
  26    Provide interface to safely register and run operations on a dataset
  27
  28    A dataset is a collection of:
  29    - A unique identifier
  30    - A set of parameters that demarcate the data contained within
  31    - The data
  32
  33    The data is usually stored in a file on the disk; the parameters are stored
  34    in a database. The handling of the data, et cetera, is done by other
  35    workers; this class defines method to create and manipulate the dataset's
  36    properties.
  37    """
  38
  39    # Attributes must be created here to ensure getattr and setattr work properly
  40    data = None
  41    key = ""
  42
  43    _children = None
  44    available_processors = None
  45    _genealogy = None
  46    preset_parent = None
  47    parameters = None
  48    modules = None
  49    annotation_fields = None
  50
  51    owners = None
  52    tagged_owners = None
  53
  54    db = None
  55    folder = None
  56    is_new = True
  57
  58    no_status_updates = False
  59    disposable_files = None
  60    _queue_position = None
  61
  62    def __init__(
  63            self,
  64            parameters=None,
  65            key=None,
  66            job=None,
  67            data=None,
  68            db=None,
  69            parent="",
  70            extension=None,
  71            type=None,
  72            is_private=True,
  73            owner="anonymous",
  74            modules=None
  75    ):
  76        """
  77        Create new dataset object
  78
  79        If the dataset is not in the database yet, it is added.
  80
  81        :param dict parameters:  Only when creating a new dataset. Dataset
  82        parameters, free-form dictionary.
  83        :param str key: Dataset key. If given, dataset with this key is loaded.
  84        :param int job: Job ID. If given, dataset corresponding to job is
  85        loaded.
  86        :param dict data: Dataset data, corresponding to a row in the datasets
  87        database table. If not given, retrieved from database depending on key.
  88        :param db:  Database connection
  89        :param str parent:  Only when creating a new dataset. Parent dataset
  90        key to which the one being created is a child.
  91        :param str extension: Only when creating a new dataset. Extension of
  92        dataset result file.
  93        :param str type: Only when creating a new dataset. Type of the dataset,
  94        corresponding to the type property of a processor class.
  95        :param bool is_private: Only when creating a new dataset. Whether the
  96        dataset is private or public.
  97        :param str owner: Only when creating a new dataset. The user name of
  98        the dataset's creator.
  99        :param modules: Module cache. If not given, will be loaded when needed
 100        (expensive). Used to figure out what processors are compatible with
 101        this dataset.
 102        """
 103        self.db = db
 104
 105        # Ensure mutable attributes are set in __init__ as they are unique to each DataSet
 106        self.data = {}
 107        self.parameters = {}
 108        self.available_processors = {}
 109        self._genealogy = None
 110        self.disposable_files = []
 111        self.modules = modules
 112
 113        if key is not None:
 114            self.key = key
 115            current = self.db.fetchone(
 116                "SELECT * FROM datasets WHERE key = %s", (self.key,)
 117            )
 118            if not current:
 119                raise DataSetNotFoundException(
 120                    "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given"
 121                    % key
 122                )
 123
 124        elif job is not None:
 125            current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),))
 126            if not current:
 127                raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
 128
 129            self.key = current["key"]
 130        elif data is not None:
 131            current = data
 132            if (
 133                    "query" not in data
 134                    or "key" not in data
 135                    or "parameters" not in data
 136                    or "key_parent" not in data
 137            ):
 138                raise DataSetException(
 139                    "DataSet() requires a complete dataset record for its 'data' argument"
 140                )
 141
 142            self.key = current["key"]
 143        else:
 144            if parameters is None:
 145                raise DataSetException(
 146                    "DataSet() requires either 'key', or 'parameters' to be given"
 147                )
 148
 149            if not type:
 150                raise DataSetException("Datasets must have their type set explicitly")
 151
 152            query = self.get_label(parameters, default=type)
 153            self.key = self.get_key(query, parameters, parent)
 154            current = self.db.fetchone(
 155                "SELECT * FROM datasets WHERE key = %s AND query = %s",
 156                (self.key, query),
 157            )
 158
 159        if current:
 160            self.data = current
 161            self.parameters = json.loads(self.data["parameters"])
 162            self.annotation_fields = json.loads(self.data["annotation_fields"]) \
 163                if self.data.get("annotation_fields") else {}
 164            self.is_new = False
 165        else:
 166            self.data = {"type": type}  # get_own_processor needs this
 167            own_processor = self.get_own_processor()
 168            version = get_software_commit(own_processor)
 169            self.data = {
 170                "key": self.key,
 171                "query": self.get_label(parameters, default=type),
 172                "parameters": json.dumps(parameters),
 173                "result_file": "",
 174                "creator": owner,
 175                "status": "",
 176                "type": type,
 177                "timestamp": int(time.time()),
 178                "is_finished": False,
 179                "is_private": is_private,
 180                "software_version": version[0],
 181                "software_source": version[1],
 182                "software_file": "",
 183                "num_rows": 0,
 184                "progress": 0.0,
 185                "key_parent": parent,
 186                "annotation_fields": "{}"
 187            }
 188            self.parameters = parameters
 189            self.annotation_fields = {}
 190
 191            self.db.insert("datasets", data=self.data)
 192            self.refresh_owners()
 193            self.add_owner(owner)
 194
 195            # Find desired extension from processor if not explicitly set
 196            if extension is None:
 197                if own_processor:
 198                    extension = own_processor.get_extension(
 199                        parent_dataset=DataSet(key=parent, db=db, modules=self.modules)
 200                        if parent
 201                        else None
 202                    )
 203                # Still no extension, default to 'csv'
 204                if not extension:
 205                    extension = "csv"
 206
 207            # Reserve filename and update data['result_file']
 208            self.reserve_result_file(parameters, extension)
 209
 210        self.refresh_owners()
 211
 212    def check_dataset_finished(self):
 213        """
 214        Checks if dataset is finished. Returns path to results file is not empty,
 215        or 'empty_file' when there were not matches.
 216
 217        Only returns a path if the dataset is complete. In other words, if this
 218        method returns a path, a file with the complete results for this dataset
 219        will exist at that location.
 220
 221        :return: A path to the results file, 'empty_file', or `None`
 222        """
 223        if self.data["is_finished"] and self.data["num_rows"] > 0:
 224            return self.get_results_path()
 225        elif self.data["is_finished"] and self.data["num_rows"] == 0:
 226            return 'empty'
 227        else:
 228            return None
 229
 230    def get_results_path(self):
 231        """
 232        Get path to results file
 233
 234        Always returns a path, that will at some point contain the dataset
 235        data, but may not do so yet. Use this to get the location to write
 236        generated results to.
 237
 238        :return Path:  A path to the results file
 239        """
 240        # alas we need to instantiate a config reader here - no way around it
 241        if not self.folder:
 242            self.folder = self.modules.config.get('PATH_DATA')
 243        return self.folder.joinpath(self.data["result_file"])
 244
 245    def get_results_folder_path(self):
 246        """
 247        Get path to folder containing accompanying results
 248
 249        Returns a path that may not yet be created
 250
 251        :return Path:  A path to the results file
 252        """
 253        return self.get_results_path().parent.joinpath("folder_" + self.key)
 254
 255    def get_log_path(self):
 256        """
 257        Get path to dataset log file
 258
 259        Each dataset has a single log file that documents its creation. This
 260        method returns the path to that file. It is identical to the path of
 261        the dataset result file, with 'log' as its extension instead.
 262
 263        :return Path:  A path to the log file
 264        """
 265        return self.get_results_path().with_suffix(".log")
 266
 267    def clear_log(self):
 268        """
 269        Clears the dataset log file
 270
 271        If the log file does not exist, it is created empty. The log file will
 272        have the same file name as the dataset result file, with the 'log'
 273        extension.
 274        """
 275        log_path = self.get_log_path()
 276        with log_path.open("w"):
 277            pass
 278
 279    def log(self, log):
 280        """
 281        Write log message to file
 282
 283        Writes the log message to the log file on a new line, including a
 284        timestamp at the start of the line. Note that this assumes the log file
 285        already exists - it should have been created/cleared with clear_log()
 286        prior to calling this.
 287
 288        :param str log:  Log message to write
 289        """
 290        log_path = self.get_log_path()
 291        with log_path.open("a", encoding="utf-8") as outfile:
 292            outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
 293
 294    def _iterate_items(self, processor=None, offset=0, *args, **kwargs):
 295        """
 296        A generator that iterates through a CSV or NDJSON file
 297
 298        This is an internal method and should not be called directly. Rather,
 299        call iterate_items() and use the generated dictionary and its properties.
 300
 301        If a reference to a processor is provided, with every iteration,
 302        the processor's 'interrupted' flag is checked, and if set a
 303        ProcessorInterruptedException is raised, which by default is caught
 304        in the worker and subsequently stops execution gracefully.
 305
 306        There are two file types that can be iterated (currently): CSV files
 307        and NDJSON (newline-delimited JSON) files. In the future, one could
 308        envision adding a pathway to retrieve items from e.g. a MongoDB
 309        collection directly instead of from a static file
 310
 311        :param BasicProcessor processor:  A reference to the processor
 312        iterating the dataset.
 313        :param offset int:  How many items to skip.
 314        :return generator:  A generator that yields each item as a dictionary
 315        """
 316        path = self.get_results_path()
 317
 318        # Yield through items one by one
 319        if path.suffix.lower() == ".csv":
 320            with path.open("rb") as infile:
 321                wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
 322                reader = csv.DictReader(wrapped_infile)
 323
 324                if not self.get_own_processor():
 325                    # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not
 326                    first_item = next(reader)
 327                    if first_item is None or any(
 328                            [True for key in first_item if type(key) is not str]
 329                    ):
 330                        raise NotImplementedError(
 331                            f"Cannot iterate through CSV file (deprecated processor {self.type})"
 332                        )
 333                    yield first_item
 334
 335                for i, item in enumerate(reader):
 336                    if hasattr(processor, "interrupted") and processor.interrupted:
 337                        raise ProcessorInterruptedException(
 338                            "Processor interrupted while iterating through CSV file"
 339                        )
 340
 341                    if i < offset:
 342                        continue
 343
 344                    yield item
 345
 346        elif path.suffix.lower() == ".ndjson":
 347            # In NDJSON format each line in the file is a self-contained JSON
 348            with path.open(encoding="utf-8") as infile:
 349                for i, line in enumerate(infile):
 350                    if hasattr(processor, "interrupted") and processor.interrupted:
 351                        raise ProcessorInterruptedException(
 352                            "Processor interrupted while iterating through NDJSON file"
 353                        )
 354
 355                    if i < offset:
 356                        continue
 357
 358                    yield json.loads(line)
 359
 360        else:
 361            raise NotImplementedError(f"Cannot iterate through {path.suffix} file")
 362
 363    def _iterate_archive_contents(
 364            self,
 365            staging_area=None,
 366            immediately_delete=True,
 367            filename_filter=None,
 368            processor=None,
 369            offset=0,
 370            *args, **kwargs
 371        ):
 372        """
 373        A generator that iterates through files in an archive
 374
 375        With every iteration, the processor's 'interrupted' flag is checked,
 376        and if set a ProcessorInterruptedException is raised, which by default
 377        is caught and subsequently stops execution gracefully.
 378
 379        Files are temporarily unzipped and deleted after use.
 380
 381        :param Path staging_area:  Where to store the files while they're
 382          being worked with. If omitted, a temporary folder is created and
 383          marked for deletion after all files have been yielded
 384        :param bool immediately_delete:  Temporary files are removed after
 385          yielding; False keeps files until the staging_area is removed
 386        :param list filename_filter:  Whitelist of filenames to iterate. If
 387          empty, do not filter
 388        :param BasicProcessor processor:  A reference to the processor
 389          iterating the dataset.
 390        :param int offset:  Skip this many files before yielding (warning: may
 391          skip a metadata file too!)
 392        :return:  An iterator with a dictionary for each file, containing an
 393          `id`, a `path`, and all attributes of the `ZipInfo` object as keys
 394        """
 395        path = self.get_results_path()
 396        if not path.exists():
 397            return
 398
 399        if not staging_area:
 400            staging_area = self.get_staging_area()
 401
 402        if not staging_area.exists() or not staging_area.is_dir():
 403            raise RuntimeError(f"Staging area {staging_area} is not a valid folder")
 404
 405        iterations = 0
 406        with zipfile.ZipFile(path, "r") as archive_file:
 407            # sorting is important because it ensures .metadata.json is read
 408            # first
 409            archive_contents = sorted(archive_file.infolist(), key=lambda x: x.filename)
 410            for archived_file in archive_contents:
 411
 412                if filename_filter and archived_file.filename not in filename_filter:
 413                    continue
 414
 415                if archived_file.is_dir():
 416                    # do not yield folders - we'll get to the files in them
 417                    continue
 418
 419                if iterations < offset:
 420                    iterations += 1
 421                    continue
 422
 423                if hasattr(processor, "interrupted") and processor.interrupted:
 424                    raise ProcessorInterruptedException(
 425                        "Processor interrupted while iterating through Zip archive"
 426                    )
 427
 428                iterations += 1
 429                temp_file = staging_area.joinpath(archived_file.filename)
 430                archive_file.extract(archived_file.filename, staging_area)
 431
 432                # iterated items are expected as a dictionary
 433                # we thus make a dictionary from the ZipInfo object
 434                # and use the path (inside the archive) as a unique ID
 435                yield {
 436                    "id": archived_file.filename,
 437                    "path": temp_file,
 438                    **{
 439                        attribute: getattr(archived_file, attribute) for attribute in dir(archived_file) if not attribute.startswith("_")
 440                    }
 441                }
 442
 443                if immediately_delete:
 444                    # this, effectively, triggers when the *next* item is
 445                    # asked for, or if it is the last file
 446                    temp_file.unlink()
 447
 448    def iterate_items(
 449            self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None,
 450            offset=0, *args, **kwargs
 451    ):
 452        """
 453        Generate mapped dataset items
 454
 455        Wrapper for _iterate_items that returns a DatasetItem, which can be
 456        accessed as a dict returning the original item or (if a mapper is
 457        available) the mapped item. Mapped or original versions of the item can
 458        also be accessed via the `original` and `mapped_object` properties of
 459        the DatasetItem.
 460
 461        Processors can define a method called `map_item` that can be used to map
 462        an item from the dataset file before it is processed any further. This is
 463        slower than storing the data file in the right format to begin with but
 464        not all data sources allow for easy 'flat' mapping of items, e.g. tweets
 465        are nested objects when retrieved from the twitter API that are easier
 466        to store as a JSON file than as a flat CSV file, and it would be a shame
 467        to throw away that data.
 468
 469        Note the two parameters warn_unmappable and map_missing. Items can be
 470        unmappable in that their structure is too different to coerce into a
 471        neat dictionary of the structure the data source expects. This makes it
 472        'unmappable' and warn_unmappable determines what happens in this case.
 473        It can also be of the right structure, but with some fields missing or
 474        incomplete. map_missing determines what happens in that case. The
 475        latter is for example possible when importing data via Zeeschuimer,
 476        which produces unstably-structured data captured from social media
 477        sites.
 478
 479        :param BasicProcessor processor:  A reference to the processor
 480        iterating the dataset.
 481        :param bool warn_unmappable:  If an item is not mappable, skip the item
 482        and log a warning
 483        :param max_unmappable:  Skip at most this many unmappable items; if
 484        more are encountered, stop iterating. `None` to never stop.
 485        :param map_missing: Indicates what to do with mapped items for which
 486        some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
 487        - 'default': fill missing fields with the default passed by map_item
 488        - 'abort': raise a MappedItemIncompleteException if a field is missing
 489        - a callback: replace missing field with the return value of the
 490          callback. The MappedItem object is passed to the callback as the
 491          first argument and the name of the missing field as the second.
 492        - a dictionary with a key for each possible missing field: replace missing
 493          field with a strategy for that field ('default', 'abort', or a callback)
 494        :param get_annotations: Whether to also fetch annotations from the database.
 495          This can be disabled to help speed up iteration.
 496        :param offset: After how many rows we should yield items.
 497        :param bool immediately_delete:  Only used when iterating a file
 498          archive. Defaults to `True`, if set to `False`, files are not deleted
 499          from the staging area after the iteration, so they can be re-used.
 500        :param staging_area:  Only used when iterating a file archive. Where to
 501          store the files while they're being worked with. If omitted, a
 502          temporary folder is created and marked for deletion after all files
 503          have been yielded.
 504        :param list filename_filter:  Only used when iterating a file archive.
 505          Whitelist of filenames to iterate, others are skipped. If empty, do
 506          not filter.
 507        :return generator:  A generator that yields DatasetItems
 508        """
 509        unmapped_items = 0
 510
 511        # Collect item_mapper for use with filter
 512        item_mapper = False
 513        own_processor = self.get_own_processor()
 514        if own_processor and own_processor.map_item_method_available(dataset=self):
 515            item_mapper = True
 516
 517        # Annotations are dynamically added, and we're handling them as 'extra' map_item fields.
 518        # If we're getting annotations, we're caching items so we don't need to retrieve annotations one-by-one.
 519        get_annotations = True if self.annotation_fields and get_annotations else False
 520        if get_annotations:
 521            annotation_fields = self.annotation_fields.copy()
 522            item_batch_size = 500
 523            dataset_item_cache = []
 524            annotations_before = int(time.time())
 525
 526            # Append a number to annotation labels if there's duplicate ones
 527            annotation_labels = {}
 528            for (annotation_field_id, annotation_field_items,) in annotation_fields.items():
 529                unique_label = annotation_field_items["label"]
 530                counter = 1
 531                while unique_label in annotation_labels.values():
 532                    counter += 1
 533                    unique_label = f"{annotation_field_items['label']}_{counter}"
 534                annotation_labels[annotation_field_id] = unique_label
 535
 536        # missing field strategy can be for all fields at once, or per field
 537        # if it is per field, it is a dictionary with field names and their strategy
 538        # if it is for all fields, it may be a callback, 'abort', or 'default'
 539        default_strategy = "default"
 540        if type(map_missing) is not dict:
 541            default_strategy = map_missing
 542            map_missing = {}
 543
 544        iterator = self._iterate_items if self.get_extension() != "zip" else self._iterate_archive_contents
 545
 546        # Loop through items
 547        for i, item in enumerate(iterator(processor=processor, offset=offset, *args, **kwargs)):
 548            # Save original to yield
 549            original_item = item.copy()
 550
 551            # Map item
 552            if item_mapper:
 553                try:
 554                    mapped_item = own_processor.get_mapped_item(item)
 555                except MapItemException as e:
 556                    if warn_unmappable:
 557                        self.warn_unmappable_item(
 558                            i, processor, e, warn_admins=unmapped_items is False
 559                        )
 560
 561                    unmapped_items += 1
 562                    if max_unmappable and unmapped_items > max_unmappable:
 563                        break
 564                    else:
 565                        continue
 566
 567                # check if fields have been marked as 'missing' in the
 568                # underlying data, and treat according to the chosen strategy
 569                if mapped_item.get_missing_fields():
 570                    for missing_field in mapped_item.get_missing_fields():
 571                        strategy = map_missing.get(missing_field, default_strategy)
 572
 573                        if callable(strategy):
 574                            # delegate handling to a callback
 575                            mapped_item.data[missing_field] = strategy(
 576                                mapped_item.data, missing_field
 577                            )
 578                        elif strategy == "abort":
 579                            # raise an exception to be handled at the processor level
 580                            raise MappedItemIncompleteException(
 581                                f"Cannot process item, field {missing_field} missing in source data."
 582                            )
 583                        elif strategy == "default":
 584                            # use whatever was passed to the object constructor
 585                            mapped_item.data[missing_field] = mapped_item.data[
 586                                missing_field
 587                            ].value
 588                        else:
 589                            raise ValueError(
 590                                "map_missing must be 'abort', 'default', or a callback."
 591                            )
 592            else:
 593                mapped_item = original_item
 594
 595            # yield a DatasetItem, which is a dict with some special properties
 596            dataset_item = DatasetItem(
 597                mapper=item_mapper,
 598                original=original_item,
 599                mapped_object=mapped_item,
 600                data_file=original_item["path"] if "path" in original_item and issubclass(type(original_item["path"]), os.PathLike) else None,
 601                **(
 602                    mapped_item.get_item_data()
 603                    if type(mapped_item) is MappedItem
 604                    else mapped_item
 605                ),
 606            )
 607
 608            # If we're getting annotations, yield in items batches so we don't need to get annotations per item.
 609            if get_annotations:
 610                dataset_item_cache.append(dataset_item)
 611
 612                # When we reach the batch limit or the end of the dataset,
 613                # get the annotations for cached items and yield the entire thing.
 614                if len(dataset_item_cache) >= item_batch_size or i == (self.num_rows - 1):
 615
 616                    item_ids = [dataset_item.get("id") for dataset_item in dataset_item_cache]
 617
 618                    # Dict with item ids for fast lookup
 619                    annotations_dict = collections.defaultdict(dict)
 620                    annotations = self.get_annotations_for_item(item_ids, before=annotations_before)
 621                    for item_annotation in annotations:
 622                        item_id = item_annotation.item_id
 623                        if item_annotation:
 624                            annotations_dict[item_id][item_annotation.field_id] = item_annotation.value
 625
 626                    # Process each dataset item
 627                    for dataset_item in dataset_item_cache:
 628                        item_id = dataset_item.get("id")
 629                        item_annotations = annotations_dict.get(item_id, {})
 630
 631                        for annotation_field_id, annotation_field_items in annotation_fields.items():
 632                            # Get annotation value
 633                            value = item_annotations.get(annotation_field_id, "")
 634
 635                            # Convert list to string if needed
 636                            if isinstance(value, list):
 637                                value = ",".join(value)
 638                            elif value != "":
 639                                value = str(value)  # Ensure string type
 640                            else:
 641                                value = ""
 642
 643                            dataset_item[annotation_labels[annotation_field_id]] = value
 644
 645                        yield dataset_item
 646
 647                    dataset_item_cache = []
 648
 649            else:
 650                yield dataset_item
 651
 652
 653    def sort_and_iterate_items(
 654            self, sort="", reverse=False, chunk_size=50000, **kwargs
 655    ) -> dict:
 656        """
 657        Loop through items in a dataset, sorted by a given key.
 658
 659        This is a wrapper function for `iterate_items()` with the
 660        added functionality of sorting a dataset.
 661
 662        :param sort:				The item key that determines the sort order.
 663        :param reverse:				Whether to sort by largest values first.
 664        :param chunk_size:          How many items to write
 665
 666        :returns dict:				Yields iterated post
 667        """
 668
 669        def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False):
 670            """
 671            Sort items based on the given key and order.
 672
 673            :param items_to_sort:  The items to sort
 674            :param sort_key:  The key to sort by
 675            :param reverse:  Whether to sort in reverse order
 676            :return:  Sorted items
 677            """
 678            if reverse is False and (sort_key == "dataset-order" or sort_key == ""):
 679                # Sort by dataset order
 680                yield from items_to_sort
 681            elif sort_key == "dataset-order" and reverse:
 682                # Sort by dataset order in reverse
 683                yield from reversed(list(items_to_sort))
 684            else:
 685                # Sort on the basis of a column value
 686                if not convert_sort_to_float:
 687                    yield from sorted(
 688                        items_to_sort,
 689                        key=lambda x: x.get(sort_key, ""),
 690                        reverse=reverse,
 691                    )
 692                else:
 693                    # Dataset fields can contain integers and empty strings.
 694                    # Since these cannot be compared, we will convert every
 695                    # empty string to 0.
 696                    yield from sorted(
 697                        items_to_sort,
 698                        key=lambda x: convert_to_float(x.get(sort_key, ""), force=True),
 699                        reverse=reverse,
 700                    )
 701
 702        if self.num_rows < chunk_size:
 703            try:
 704                # First try to force-sort float values. If this doesn't work, it'll be alphabetical.
 705                yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True)
 706            except (TypeError, ValueError):
 707                yield from sort_items(
 708                    self.iterate_items(**kwargs),
 709                    sort,
 710                    reverse,
 711                    convert_sort_to_float=False
 712                )
 713
 714        else:
 715            # For large datasets, we will use chunk sorting
 716            staging_area = self.get_staging_area()
 717            buffer = []
 718            chunk_files = []
 719            convert_sort_to_float = True
 720            fieldnames = self.get_columns()
 721
 722            def write_chunk(buffer, chunk_index):
 723                """
 724                Write a chunk of data to a temporary file
 725
 726                :param buffer:  The buffer containing the chunk of data
 727                :param chunk_index:  The index of the chunk
 728                :return:  The path to the temporary file
 729                """
 730                temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv")
 731                with temp_file.open("w", encoding="utf-8") as chunk_file:
 732                    writer = csv.DictWriter(chunk_file, fieldnames=fieldnames)
 733                    writer.writeheader()
 734                    writer.writerows(buffer)
 735                return temp_file
 736
 737            # Divide the dataset into sorted chunks
 738            for item in self.iterate_items(**kwargs):
 739                buffer.append(item)
 740                if len(buffer) >= chunk_size:
 741                    try:
 742                        buffer = list(
 743                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 744                        )
 745                    except (TypeError, ValueError):
 746                        convert_sort_to_float = False
 747                        buffer = list(
 748                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 749                        )
 750
 751                    chunk_files.append(write_chunk(buffer, len(chunk_files)))
 752                    buffer.clear()
 753
 754            # Sort and write any remaining items in the buffer
 755            if buffer:
 756                buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float))
 757                chunk_files.append(write_chunk(buffer, len(chunk_files)))
 758                buffer.clear()
 759
 760            # Merge sorted chunks into the final sorted file
 761            sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv")
 762            with sorted_file.open("w", encoding="utf-8") as outfile:
 763                writer = csv.DictWriter(outfile, fieldnames=self.get_columns())
 764                writer.writeheader()
 765
 766                # Open all chunk files for reading
 767                chunk_readers = [
 768                    csv.DictReader(chunk.open("r", encoding="utf-8"))
 769                    for chunk in chunk_files
 770                ]
 771                heap = []
 772
 773                # Initialize the heap with the first row from each chunk
 774                for i, reader in enumerate(chunk_readers):
 775                    try:
 776                        row = next(reader)
 777                        if sort == "dataset-order" and reverse:
 778                            # Use a reverse index for "dataset-order" and reverse=True
 779                            sort_key = -i
 780                        elif convert_sort_to_float:
 781                            # Negate numeric keys for reverse sorting
 782                            sort_key = (
 783                                -convert_to_float(row.get(sort, ""))
 784                                if reverse
 785                                else convert_to_float(row.get(sort, ""))
 786                            )
 787                        else:
 788                            if reverse:
 789                                # For reverse string sorting, invert string comparison by creating a tuple
 790                                # with an inverted string - this makes Python's tuple comparison work in reverse
 791                                sort_key = (
 792                                    tuple(-ord(c) for c in row.get(sort, "")),
 793                                    -i,
 794                                )
 795                            else:
 796                                sort_key = (row.get(sort, ""), i)
 797                        heap.append((sort_key, i, row))
 798                    except StopIteration:
 799                        pass
 800
 801                # Use a heap to merge sorted chunks
 802                import heapq
 803
 804                heapq.heapify(heap)
 805                while heap:
 806                    _, chunk_index, smallest_row = heapq.heappop(heap)
 807                    writer.writerow(smallest_row)
 808                    try:
 809                        next_row = next(chunk_readers[chunk_index])
 810                        if sort == "dataset-order" and reverse:
 811                            # Use a reverse index for "dataset-order" and reverse=True
 812                            sort_key = -chunk_index
 813                        elif convert_sort_to_float:
 814                            sort_key = (
 815                                -convert_to_float(next_row.get(sort, ""))
 816                                if reverse
 817                                else convert_to_float(next_row.get(sort, ""))
 818                            )
 819                        else:
 820                            # Use the same inverted comparison for string values
 821                            if reverse:
 822                                sort_key = (
 823                                    tuple(-ord(c) for c in next_row.get(sort, "")),
 824                                    -chunk_index,
 825                                )
 826                            else:
 827                                sort_key = (next_row.get(sort, ""), chunk_index)
 828                        heapq.heappush(heap, (sort_key, chunk_index, next_row))
 829                    except StopIteration:
 830                        pass
 831
 832            # Read the sorted file and yield each item
 833            with sorted_file.open("r", encoding="utf-8") as infile:
 834                reader = csv.DictReader(infile)
 835                for item in reader:
 836                    yield item
 837
 838            # Remove the temporary files
 839            if staging_area.is_dir():
 840                shutil.rmtree(staging_area)
 841
 842    def get_staging_area(self):
 843        """
 844        Get path to a temporary folder in which files can be stored before
 845        finishing
 846
 847        This folder must be created before use, but is guaranteed to not exist
 848        yet. The folder may be used as a staging area for the dataset data
 849        while it is being processed.
 850
 851        :return Path:  Path to folder
 852        """
 853        results_file = self.get_results_path()
 854
 855        results_dir_base = results_file.parent
 856        results_dir = results_file.name.replace(".", "") + "-staging"
 857        results_path = results_dir_base.joinpath(results_dir)
 858        index = 1
 859        while results_path.exists():
 860            results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
 861            index += 1
 862
 863        # create temporary folder
 864        results_path.mkdir()
 865
 866        # Storing the staging area with the dataset so that it can be removed later
 867        self.disposable_files.append(results_path)
 868
 869        return results_path
 870
 871    def remove_disposable_files(self):
 872        """
 873        Remove any disposable files and folders, such as staging areas
 874
 875        Called from BasicProcessor after processing a dataset finishes.
 876        """
 877        # Remove DataSet staging areas
 878        if self.disposable_files:
 879            for disposable_file in self.disposable_files:
 880                if disposable_file.exists():
 881                    shutil.rmtree(disposable_file)
 882
 883    def finish(self, num_rows=0):
 884        """
 885        Declare the dataset finished
 886        """
 887        if self.data["is_finished"]:
 888            raise RuntimeError("Cannot finish a finished dataset again")
 889
 890        self.db.update(
 891            "datasets",
 892            where={"key": self.data["key"]},
 893            data={
 894                "is_finished": True,
 895                "num_rows": num_rows,
 896                "progress": 1.0,
 897                "timestamp_finished": int(time.time()),
 898            },
 899        )
 900        self.data["is_finished"] = True
 901        self.data["num_rows"] = num_rows
 902
 903    def copy(self, shallow=True):
 904        """
 905        Copies the dataset, making a new version with a unique key
 906
 907
 908        :param bool shallow:  Shallow copy: does not copy the result file, but
 909        instead refers to the same file as the original dataset did
 910        :return Dataset:  Copied dataset
 911        """
 912        parameters = self.parameters.copy()
 913
 914        # a key is partially based on the parameters. so by setting these extra
 915        # attributes, we also ensure a unique key will be generated for the
 916        # copy
 917        # possibly todo: don't use time for uniqueness (but one shouldn't be
 918        # copying a dataset multiple times per microsecond, that's not what
 919        # this is for)
 920        parameters["copied_from"] = self.key
 921        parameters["copied_at"] = time.time()
 922
 923        copy = DataSet(
 924            parameters=parameters,
 925            db=self.db,
 926            extension=self.result_file.split(".")[-1],
 927            type=self.type,
 928            modules=self.modules
 929        )
 930
 931        for field in self.data:
 932            if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
 933                continue
 934            copy.__setattr__(field, self.data[field])
 935
 936        if shallow:
 937            # use the same result file
 938            copy.result_file = self.result_file
 939        else:
 940            # copy to new file with new key
 941            shutil.copy(self.get_results_path(), copy.get_results_path())
 942
 943        if self.is_finished():
 944            copy.finish(self.num_rows)
 945
 946        # make sure ownership is also copied
 947        copy.copy_ownership_from(self)
 948
 949        return copy
 950
 951    def delete(self, commit=True, queue=None):
 952        """
 953        Delete the dataset, and all its children
 954
 955        Deletes both database records and result files. Note that manipulating
 956        a dataset object after it has been deleted is undefined behaviour.
 957
 958        :param bool commit:  Commit SQL DELETE query?
 959        """
 960        # first, recursively delete children
 961        children = self.db.fetchall(
 962            "SELECT * FROM datasets WHERE key_parent = %s", (self.key,)
 963        )
 964        for child in children:
 965            try:
 966                child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 967                child.delete(commit=commit)
 968            except DataSetException:
 969                # dataset already deleted - race condition?
 970                pass
 971
 972        # delete any queued jobs for this dataset
 973        try:
 974            job = Job.get_by_remote_ID(self.key, self.db, self.type)
 975            if job.is_claimed:
 976                # tell API to stop any jobs running for this dataset
 977                # level 2 = cancel job
 978                # we're not interested in the result - if the API is available,
 979                # it will do its thing, if it's not the backend is probably not
 980                # running so the job also doesn't need to be interrupted
 981                call_api(
 982                    "cancel-job",
 983                    {"remote_id": self.key, "jobtype": self.type, "level": 2},
 984                    False,
 985                )
 986
 987            # this deletes the job from the database
 988            job.finish(True)
 989
 990        except JobNotFoundException:
 991            pass
 992
 993        # delete this dataset's own annotations
 994        self.db.delete("annotations", where={"dataset": self.key}, commit=commit)
 995        # delete annotations that have been generated as part of this dataset
 996        self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit)
 997        # delete annotation fields on parent dataset(s) stemming from this dataset
 998        for related_dataset in self.get_genealogy(update_cache=True):
 999            field_deleted = False
1000            annotation_fields = related_dataset.annotation_fields
1001            if annotation_fields:
1002                for field_id in list(annotation_fields.keys()):
1003                    if annotation_fields[field_id].get("from_dataset", "") == self.key:
1004                        del annotation_fields[field_id]
1005                        field_deleted = True
1006            if field_deleted:
1007                related_dataset.save_annotation_fields(annotation_fields)
1008
1009        # delete dataset from database
1010        self.db.delete("datasets", where={"key": self.key}, commit=commit)
1011        self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
1012        self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
1013
1014        # delete from drive
1015        try:
1016            if self.get_results_path().exists():
1017                self.get_results_path().unlink()
1018            if self.get_results_path().with_suffix(".log").exists():
1019                self.get_results_path().with_suffix(".log").unlink()
1020            if self.get_results_folder_path().exists():
1021                shutil.rmtree(self.get_results_folder_path())
1022
1023        except FileNotFoundError:
1024            # already deleted, apparently
1025            pass
1026        except PermissionError as e:
1027            self.db.log.error(
1028                f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}"
1029            )
1030
1031    def update_children(self, **kwargs):
1032        """
1033        Update an attribute for all child datasets
1034
1035        Can be used to e.g. change the owner, version, finished status for all
1036        datasets in a tree
1037
1038        :param kwargs:  Parameters corresponding to known dataset attributes
1039        """
1040        for child in self.get_children(update=True):
1041            for attr, value in kwargs.items():
1042                child.__setattr__(attr, value)
1043
1044            child.update_children(**kwargs)
1045
1046    def is_finished(self):
1047        """
1048        Check if dataset is finished
1049        :return bool:
1050        """
1051        return bool(self.data["is_finished"])
1052
1053    def is_rankable(self, multiple_items=True):
1054        """
1055        Determine if a dataset is rankable
1056
1057        Rankable means that it is a CSV file with 'date' and 'value' columns
1058        as well as one or more item label columns
1059
1060        :param bool multiple_items:  Consider datasets with multiple items per
1061        item (e.g. word_1, word_2, etc)?
1062
1063        :return bool:  Whether the dataset is rankable or not
1064        """
1065        if (
1066                self.get_results_path().suffix != ".csv"
1067                or not self.get_results_path().exists()
1068        ):
1069            return False
1070
1071        column_options = {"date", "value", "item"}
1072        if multiple_items:
1073            column_options.add("word_1")
1074
1075        with self.get_results_path().open(encoding="utf-8") as infile:
1076            reader = csv.DictReader(infile)
1077            try:
1078                return len(set(reader.fieldnames) & column_options) >= 3
1079            except (TypeError, ValueError):
1080                return False
1081
1082    def is_accessible_by(self, username, role="owner"):
1083        """
1084        Check if dataset has given user as owner
1085
1086        :param str|User username: Username to check for
1087        :return bool:
1088        """
1089        if type(username) is not str:
1090            if hasattr(username, "get_id"):
1091                username = username.get_id()
1092            else:
1093                raise TypeError("User must be a str or User object")
1094
1095        # 'normal' owners
1096        if username in [
1097            owner
1098            for owner, meta in self.owners.items()
1099            if (role is None or meta["role"] == role)
1100        ]:
1101            return True
1102
1103        # owners that are owner by being part of a tag
1104        if username in itertools.chain(
1105                *[
1106                    tagged_owners
1107                    for tag, tagged_owners in self.tagged_owners.items()
1108                    if (role is None or self.owners[f"tag:{tag}"]["role"] == role)
1109                ]
1110        ):
1111            return True
1112
1113        return False
1114
1115    def get_owners_users(self, role="owner"):
1116        """
1117        Get list of dataset owners
1118
1119        This returns a list of *users* that are considered owners. Tags are
1120        transparently replaced with the users with that tag.
1121
1122        :param str|None role:  Role to check for. If `None`, all owners are
1123        returned regardless of role.
1124
1125        :return set:  De-duplicated owner list
1126        """
1127        # 'normal' owners
1128        owners = [
1129            owner
1130            for owner, meta in self.owners.items()
1131            if (role is None or meta["role"] == role) and not owner.startswith("tag:")
1132        ]
1133
1134        # owners that are owner by being part of a tag
1135        owners.extend(
1136            itertools.chain(
1137                *[
1138                    tagged_owners
1139                    for tag, tagged_owners in self.tagged_owners.items()
1140                    if role is None or self.owners[f"tag:{tag}"]["role"] == role
1141                ]
1142            )
1143        )
1144
1145        # de-duplicate before returning
1146        return set(owners)
1147
1148    def get_owners(self, role="owner"):
1149        """
1150        Get list of dataset owners
1151
1152        This returns a list of all owners, and does not transparently resolve
1153        tags (like `get_owners_users` does).
1154
1155        :param str|None role:  Role to check for. If `None`, all owners are
1156        returned regardless of role.
1157
1158        :return set:  De-duplicated owner list
1159        """
1160        return [
1161            owner
1162            for owner, meta in self.owners.items()
1163            if (role is None or meta["role"] == role)
1164        ]
1165
1166    def add_owner(self, username, role="owner"):
1167        """
1168        Set dataset owner
1169
1170        If the user is already an owner, but with a different role, the role is
1171        updated. If the user is already an owner with the same role, nothing happens.
1172
1173        :param str|User username:  Username to set as owner
1174        :param str|None role:  Role to add user with.
1175        """
1176        if type(username) is not str:
1177            if hasattr(username, "get_id"):
1178                username = username.get_id()
1179            else:
1180                raise TypeError("User must be a str or User object")
1181
1182        if username not in self.owners:
1183            self.owners[username] = {"name": username, "key": self.key, "role": role}
1184            self.db.insert("datasets_owners", data=self.owners[username], safe=True)
1185
1186        elif username in self.owners and self.owners[username]["role"] != role:
1187            self.db.update(
1188                "datasets_owners",
1189                data={"role": role},
1190                where={"name": username, "key": self.key},
1191            )
1192            self.owners[username]["role"] = role
1193
1194        if username.startswith("tag:"):
1195            # this is a bit more complicated than just adding to the list of
1196            # owners, so do a full refresh
1197            self.refresh_owners()
1198
1199        # make sure children's owners remain in sync
1200        for child in self.get_children(update=True):
1201            child.add_owner(username, role)
1202            # not recursive, since we're calling it from recursive code!
1203            child.copy_ownership_from(self, recursive=False)
1204
1205    def remove_owner(self, username):
1206        """
1207        Remove dataset owner
1208
1209        If no owner is set, the dataset is assigned to the anonymous user.
1210        If the user is not an owner, nothing happens.
1211
1212        :param str|User username:  Username to set as owner
1213        """
1214        if type(username) is not str:
1215            if hasattr(username, "get_id"):
1216                username = username.get_id()
1217            else:
1218                raise TypeError("User must be a str or User object")
1219
1220        if username in self.owners:
1221            del self.owners[username]
1222            self.db.delete("datasets_owners", where={"name": username, "key": self.key})
1223
1224            if not self.owners:
1225                self.add_owner("anonymous")
1226
1227        if username in self.tagged_owners:
1228            del self.tagged_owners[username]
1229
1230        # make sure children's owners remain in sync
1231        for child in self.get_children(update=True):
1232            child.remove_owner(username)
1233            # not recursive, since we're calling it from recursive code!
1234            child.copy_ownership_from(self, recursive=False)
1235
1236    def refresh_owners(self):
1237        """
1238        Update internal owner cache
1239
1240        This makes sure that the list of *users* and *tags* which can access the
1241        dataset is up to date.
1242        """
1243        self.owners = {
1244            owner["name"]: owner
1245            for owner in self.db.fetchall(
1246                "SELECT * FROM datasets_owners WHERE key = %s", (self.key,)
1247            )
1248        }
1249
1250        # determine which users (if any) are owners of the dataset by having a
1251        # tag that is listed as an owner
1252        owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
1253        if owner_tags:
1254            tagged_owners = self.db.fetchall(
1255                "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)
1256            )
1257            self.tagged_owners = {
1258                owner_tag: [
1259                    user["name"] for user in tagged_owners if owner_tag in user["tags"]
1260                ]
1261                for owner_tag in owner_tags
1262            }
1263        else:
1264            self.tagged_owners = {}
1265
1266    def copy_ownership_from(self, dataset, recursive=True):
1267        """
1268        Copy ownership
1269
1270        This is useful to e.g. make sure a dataset's ownership stays in sync
1271        with its parent
1272
1273        :param Dataset dataset:  Parent to copy from
1274        :return:
1275        """
1276        self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
1277
1278        for role in ("owner", "viewer"):
1279            owners = dataset.get_owners(role=role)
1280            for owner in owners:
1281                self.db.insert(
1282                    "datasets_owners",
1283                    data={"key": self.key, "name": owner, "role": role},
1284                    commit=False,
1285                    safe=True,
1286                )
1287
1288        self.db.commit()
1289        if recursive:
1290            for child in self.get_children(update=True):
1291                child.copy_ownership_from(self, recursive=recursive)
1292
1293    def get_parameters(self):
1294        """
1295        Get dataset parameters
1296
1297        The dataset parameters are stored as JSON in the database - parse them
1298        and return the resulting object
1299
1300        :return:  Dataset parameters as originally stored
1301        """
1302        try:
1303            return json.loads(self.data["parameters"])
1304        except json.JSONDecodeError:
1305            return {}
1306
1307    def get_columns(self):
1308        """
1309        Returns the dataset columns.
1310
1311        Useful for processor input forms. Can deal with both CSV and NDJSON
1312        files, the latter only if a `map_item` function is available in the
1313        processor that generated it. While in other cases one could use the
1314        keys of the JSON object, this is not always possible in follow-up code
1315        that uses the 'column' names, so for consistency this function acts as
1316        if no column can be parsed if no `map_item` function exists.
1317
1318        :return list:  List of dataset columns; empty list if unable to parse
1319        """
1320        if not self.get_results_path().exists():
1321            # no file to get columns from
1322            return []
1323
1324        if (self.get_results_path().suffix.lower() == ".csv") or (
1325                self.get_results_path().suffix.lower() == ".ndjson"
1326                and self.get_own_processor() is not None
1327                and self.get_own_processor().map_item_method_available(dataset=self)
1328        ):
1329            items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100)
1330            try:
1331                keys = list(next(items).keys())
1332                if self.annotation_fields:
1333                    for annotation_field in self.annotation_fields.values():
1334                        annotation_column = annotation_field["label"]
1335                        label_count = 1
1336                        while annotation_column in keys:
1337                            label_count += 1
1338                            annotation_column = (
1339                                f"{annotation_field['label']}_{label_count}"
1340                            )
1341                        keys.append(annotation_column)
1342                columns = keys
1343            except (StopIteration, NotImplementedError):
1344                # No items or otherwise unable to iterate
1345                columns = []
1346            finally:
1347                del items
1348        else:
1349            # Filetype not CSV or an NDJSON with `map_item`
1350            columns = []
1351
1352        return columns
1353
1354    def update_label(self, label):
1355        """
1356        Update label for this dataset
1357
1358        :param str label:  	New label
1359        :return str: 		The new label, as returned by get_label
1360        """
1361        self.parameters["label"] = label
1362
1363        self.db.update(
1364            "datasets",
1365            data={"parameters": json.dumps(self.parameters)},
1366            where={"key": self.key},
1367        )
1368        return self.get_label()
1369
1370    def get_label(self, parameters=None, default="Query"):
1371        """
1372        Generate a readable label for the dataset
1373
1374        :param dict parameters:  Parameters of the dataset
1375        :param str default:  Label to use if it cannot be inferred from the
1376        parameters
1377
1378        :return str:  Label
1379        """
1380        if not parameters:
1381            parameters = self.parameters
1382
1383        if parameters.get("label"):
1384            return parameters["label"]
1385        elif parameters.get("body_query") and parameters["body_query"] != "empty":
1386            return parameters["body_query"]
1387        elif parameters.get("body_match") and parameters["body_match"] != "empty":
1388            return parameters["body_match"]
1389        elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
1390            return parameters["subject_query"]
1391        elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
1392            return parameters["subject_match"]
1393        elif parameters.get("query"):
1394            label = parameters["query"]
1395            # Some legacy datasets have lists as query data
1396            if isinstance(label, list):
1397                label = ", ".join(label)
1398
1399            label = label if len(label) < 30 else label[:25] + "..."
1400            label = label.strip().replace("\n", ", ")
1401            return label
1402        elif parameters.get("country_flag") and parameters["country_flag"] != "all":
1403            return "Flag: %s" % parameters["country_flag"]
1404        elif parameters.get("country_name") and parameters["country_name"] != "all":
1405            return "Country: %s" % parameters["country_name"]
1406        elif parameters.get("filename"):
1407            return parameters["filename"]
1408        elif parameters.get("board") and "datasource" in parameters:
1409            return parameters["datasource"] + "/" + parameters["board"]
1410        elif (
1411                "datasource" in parameters
1412                and parameters["datasource"] in self.modules.datasources
1413        ):
1414            return (
1415                    self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
1416            )
1417        else:
1418            return default
1419
1420    def change_datasource(self, datasource):
1421        """
1422        Change the datasource type for this dataset
1423
1424        :param str label:  New datasource type
1425        :return str:  The new datasource type
1426        """
1427
1428        self.parameters["datasource"] = datasource
1429
1430        self.db.update(
1431            "datasets",
1432            data={"parameters": json.dumps(self.parameters)},
1433            where={"key": self.key},
1434        )
1435        return datasource
1436
1437    def reserve_result_file(self, parameters=None, extension="csv"):
1438        """
1439        Generate a unique path to the results file for this dataset
1440
1441        This generates a file name for the data file of this dataset, and makes sure
1442        no file exists or will exist at that location other than the file we
1443        expect (i.e. the data for this particular dataset).
1444
1445        :param str extension: File extension, "csv" by default
1446        :param parameters:  Dataset parameters
1447        :return bool:  Whether the file path was successfully reserved
1448        """
1449        if self.data["is_finished"]:
1450            raise RuntimeError("Cannot reserve results file for a finished dataset")
1451
1452        # Use 'random' for random post queries
1453        if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
1454            file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"]
1455        # Use country code for country flag queries
1456        elif "country_flag" in parameters and parameters["country_flag"] != "all":
1457            file = (
1458                    "countryflag-"
1459                    + str(parameters["country_flag"])
1460                    + "-"
1461                    + self.data["key"]
1462            )
1463        # Use the query string for all other queries
1464        else:
1465            query_bit = self.data["query"].replace(" ", "-").lower()
1466            query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
1467            query_bit = query_bit[:100]  # Crop to avoid OSError
1468            file = query_bit + "-" + self.data["key"]
1469            file = re.sub(r"[-]+", "-", file)
1470
1471        self.data["result_file"] = file + "." + extension.lower()
1472        index = 1
1473        while self.get_results_path().is_file():
1474            self.data["result_file"] = file + "-" + str(index) + "." + extension.lower()
1475            index += 1
1476
1477        updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
1478                                 data={"result_file": self.data["result_file"]})
1479        return updated > 0
1480
1481    def get_key(self, query, parameters, parent="", time_offset=0):
1482        """
1483        Generate a unique key for this dataset that can be used to identify it
1484
1485        The key is a hash of a combination of the query string and parameters.
1486        You never need to call this, really: it's used internally.
1487
1488        :param str query:  Query string
1489        :param parameters:  Dataset parameters
1490        :param parent: Parent dataset's key (if applicable)
1491        :param time_offset:  Offset to add to the time component of the dataset
1492        key. This can be used to ensure a unique key even if the parameters and
1493        timing is otherwise identical to an existing dataset's
1494
1495        :return str:  Dataset key
1496        """
1497        # Return a hash based on parameters
1498        # we're going to use the hash of the parameters to uniquely identify
1499        # the dataset, so make sure it's always in the same order, or we might
1500        # end up creating multiple keys for the same dataset if python
1501        # decides to return the dict in a different order
1502        param_key = collections.OrderedDict()
1503        for key in sorted(parameters):
1504            param_key[key] = parameters[key]
1505
1506        # we additionally use the current time as a salt - this should usually
1507        # ensure a unique key for the dataset. if for some reason there is a
1508        # hash collision
1509        param_key["_salt"] = int(time.time()) + time_offset
1510
1511        parent_key = str(parent) if parent else ""
1512        plain_key = repr(param_key) + str(query) + parent_key
1513        hashed_key = hash_to_md5(plain_key)
1514
1515        if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1516            # key exists, generate a new one
1517            return self.get_key(
1518                query, parameters, parent, time_offset=random.randint(1, 10)
1519            )
1520        else:
1521            return hashed_key
1522
1523    def set_key(self, key):
1524        """
1525        Change dataset key
1526
1527        In principe, keys should never be changed. But there are rare cases
1528        where it is useful to do so, in particular when importing a dataset
1529        from another 4CAT instance; in that case it makes sense to try and
1530        ensure that the key is the same as it was before. This function sets
1531        the dataset key and updates any dataset references to it.
1532
1533        :param str key:  Key to set
1534        :return str:  Key that was set. If the desired key already exists, the
1535        original key is kept.
1536        """
1537        key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1538        if key_exists or not key:
1539            return self.key
1540
1541        old_key = self.key
1542        self.db.update("datasets", data={"key": key}, where={"key": old_key})
1543
1544        # update references
1545        self.db.update(
1546            "datasets", data={"key_parent": key}, where={"key_parent": old_key}
1547        )
1548        self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1549        self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1550        self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1551
1552        # for good measure
1553        self.db.commit()
1554        self.key = key
1555
1556        return self.key
1557
1558    def get_status(self):
1559        """
1560        Get Dataset status
1561
1562        :return string: Dataset status
1563        """
1564        return self.data["status"]
1565
1566    def update_status(self, status, is_final=False):
1567        """
1568        Update dataset status
1569
1570        The status is a string that may be displayed to a user to keep them
1571        updated and informed about the progress of a dataset. No memory is kept
1572        of earlier dataset statuses; the current status is overwritten when
1573        updated.
1574
1575        Statuses are also written to the dataset log file.
1576
1577        :param string status:  Dataset status
1578        :param bool is_final:  If this is `True`, subsequent calls to this
1579        method while the object is instantiated will not update the dataset
1580        status.
1581        :return bool:  Status update successful?
1582        """
1583        if self.no_status_updates:
1584            return
1585
1586        # for presets, copy the updated status to the preset(s) this is part of
1587        if self.preset_parent is None:
1588            self.preset_parent = [
1589                                     parent
1590                                     for parent in self.get_genealogy()
1591                                     if parent.type.find("preset-") == 0 and parent.key != self.key
1592                                 ][:1]
1593
1594        if self.preset_parent:
1595            for preset_parent in self.preset_parent:
1596                if not preset_parent.is_finished():
1597                    preset_parent.update_status(status)
1598
1599        self.data["status"] = status
1600        updated = self.db.update(
1601            "datasets", where={"key": self.data["key"]}, data={"status": status}
1602        )
1603
1604        if is_final:
1605            self.no_status_updates = True
1606
1607        self.log(status)
1608
1609        return updated > 0
1610
1611    def update_progress(self, progress):
1612        """
1613        Update dataset progress
1614
1615        The progress can be used to indicate to a user how close the dataset
1616        is to completion.
1617
1618        :param float progress:  Between 0 and 1.
1619        :return:
1620        """
1621        progress = min(1, max(0, progress))  # clamp
1622        if type(progress) is int:
1623            progress = float(progress)
1624
1625        self.data["progress"] = progress
1626        updated = self.db.update(
1627            "datasets", where={"key": self.data["key"]}, data={"progress": progress}
1628        )
1629        return updated > 0
1630
1631    def get_progress(self):
1632        """
1633        Get dataset progress
1634
1635        :return float:  Progress, between 0 and 1
1636        """
1637        return self.data["progress"]
1638
1639    def finish_with_error(self, error):
1640        """
1641        Set error as final status, and finish with 0 results
1642
1643        This is a convenience function to avoid having to repeat
1644        "update_status" and "finish" a lot.
1645
1646        :param str error:  Error message for final dataset status.
1647        :return:
1648        """
1649        self.update_status(error, is_final=True)
1650        self.finish(0)
1651
1652        return None
1653
1654    def update_version(self, version):
1655        """
1656        Update software version used for this dataset
1657
1658        This can be used to verify the code that was used to process this dataset.
1659
1660        :param string version:  Version identifier
1661        :return bool:  Update successul?
1662        """
1663        try:
1664            # this fails if the processor type is unknown
1665            # edge case, but let's not crash...
1666            processor_path = self.modules.processors.get(self.data["type"]).filepath
1667        except AttributeError:
1668            processor_path = ""
1669
1670        updated = self.db.update(
1671            "datasets",
1672            where={"key": self.data["key"]},
1673            data={
1674                "software_version": version[0],
1675                "software_source": version[1],
1676                "software_file": processor_path,
1677            },
1678        )
1679
1680        return updated > 0
1681
1682    def delete_parameter(self, parameter, instant=True):
1683        """
1684        Delete a parameter from the dataset metadata
1685
1686        :param string parameter:  Parameter to delete
1687        :param bool instant:  Also delete parameters in this instance object?
1688        :return bool:  Update successul?
1689        """
1690        parameters = self.parameters.copy()
1691        if parameter in parameters:
1692            del parameters[parameter]
1693        else:
1694            return False
1695
1696        updated = self.db.update(
1697            "datasets",
1698            where={"key": self.data["key"]},
1699            data={"parameters": json.dumps(parameters)},
1700        )
1701
1702        if instant:
1703            self.parameters = parameters
1704
1705        return updated > 0
1706
1707    def get_version_url(self, file):
1708        """
1709        Get a versioned github URL for the version this dataset was processed with
1710
1711        :param file:  File to link within the repository
1712        :return:  URL, or an empty string
1713        """
1714        if not self.data["software_source"]:
1715            return ""
1716
1717        filepath = self.data.get("software_file", "")
1718        if filepath.startswith("/config/extensions/"):
1719            # go to root of extension
1720            filepath = "/" + "/".join(filepath.split("/")[3:])
1721
1722        return (
1723                self.data["software_source"]
1724                + "/blob/"
1725                + self.data["software_version"]
1726                + filepath
1727        )
1728
1729    def top_parent(self):
1730        """
1731        Get root dataset
1732
1733        Traverses the tree of datasets this one is part of until it finds one
1734        with no source_dataset dataset, then returns that dataset.
1735
1736        :return Dataset: Parent dataset
1737        """
1738        genealogy = self.get_genealogy()
1739        return genealogy[0]
1740
1741    def get_genealogy(self, update_cache=False):
1742        """
1743        Get genealogy of this dataset
1744
1745        Creates a list of DataSet objects, with the first one being the
1746        'top' dataset, and each subsequent one being a child of the previous
1747        one, ending with the current dataset.
1748
1749        :param bool update_cache:  Update the cached genealogy if True, else return cached value
1750        :return list:  Dataset genealogy, oldest dataset first
1751        """
1752        if not self._genealogy or update_cache:
1753            key_parent = self.key_parent
1754            genealogy = []
1755
1756            while key_parent:
1757                try:
1758                    parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1759                except DataSetException:
1760                    break
1761
1762                genealogy.append(parent)
1763                if parent.key_parent:
1764                    key_parent = parent.key_parent
1765                else:
1766                    break
1767
1768            genealogy.reverse()
1769
1770            # add self to the end
1771            genealogy.append(self)
1772            # cache the result
1773            self._genealogy = genealogy
1774
1775        # return a copy to prevent external modification
1776        return list(self._genealogy)
1777
1778    def get_children(self, update=False):
1779        """
1780        Get children of this dataset
1781
1782        :param bool update:  Update the list of children from database if True, else return cached value
1783        :return list:  List of child datasets
1784        """
1785        if self._children is not None and not update:
1786            return self._children
1787
1788        analyses = self.db.fetchall(
1789            "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC",
1790            (self.key,),
1791        )
1792        self._children = [
1793            DataSet(data=analysis, db=self.db, modules=self.modules)
1794            for analysis in analyses
1795        ]
1796        return self._children
1797
1798    def get_all_children(self, recursive=True, update=True):
1799        """
1800        Get all children of this dataset
1801
1802        Results are returned as a non-hierarchical list, i.e. the result does
1803        not reflect the actual dataset hierarchy (but all datasets in the
1804        result will have the original dataset as an ancestor somewhere)
1805
1806        :return list:  List of DataSets
1807        """
1808        children = self.get_children(update=update)
1809        results = children.copy()
1810        if recursive:
1811            for child in children:
1812                results += child.get_all_children(recursive=recursive, update=update)
1813
1814        return results
1815
1816    def nearest(self, type_filter):
1817        """
1818        Return nearest dataset that matches the given type
1819
1820        Starting with this dataset, traverse the hierarchy upwards and return
1821        whichever dataset matches the given type.
1822
1823        :param str type_filter:  Type filter. Can contain wildcards and is matched
1824        using `fnmatch.fnmatch`.
1825        :return:  Earliest matching dataset, or `None` if none match.
1826        """
1827        genealogy = self.get_genealogy()
1828        for dataset in reversed(genealogy):
1829            if fnmatch.fnmatch(dataset.type, type_filter):
1830                return dataset
1831
1832        return None
1833
1834    def get_breadcrumbs(self):
1835        """
1836        Get breadcrumbs navlink for use in permalinks
1837
1838        Returns a string representing this dataset's genealogy that may be used
1839        to uniquely identify it.
1840
1841        :return str: Nav link
1842        """
1843        if not self.key_parent:
1844            return self.key
1845
1846        genealogy = self.get_genealogy()
1847        return ",".join([d.key for d in genealogy])
1848
1849    def get_compatible_processors(self, config=None):
1850        """
1851        Get list of processors compatible with this dataset
1852
1853        Checks whether this dataset type is one that is listed as being accepted
1854        by the processor, for each known type: if the processor does not
1855        specify accepted types (via the `is_compatible_with` method), it is
1856        assumed it accepts any top-level datasets
1857
1858        :param ConfigManager|None config:  Configuration reader to determine
1859        compatibility through. This may not be the same reader the dataset was
1860        instantiated with, e.g. when checking whether some other user should
1861        be able to run processors on this dataset.
1862        :return dict:  Compatible processors, `name => class` mapping
1863        """
1864        processors = self.modules.processors
1865
1866        available = {}
1867        for processor_type, processor in processors.items():
1868            if processor.is_from_collector():
1869                continue
1870
1871            own_processor = self.get_own_processor()
1872            if own_processor and own_processor.exclude_followup_processors(
1873                    processor_type
1874            ):
1875                continue
1876
1877            # consider a processor compatible if its is_compatible_with
1878            # method returns True *or* if it has no explicit compatibility
1879            # check and this dataset is top-level (i.e. has no parent)
1880            if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1881                    or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)):
1882                available[processor_type] = processor
1883
1884        return available
1885
1886    def get_place_in_queue(self, update=False):
1887        """
1888        Determine dataset's position in queue
1889
1890        If the dataset is already finished, the position is -1. Else, the
1891        position is the number of datasets to be completed before this one will
1892        be processed. A position of 0 would mean that the dataset is currently
1893        being executed, or that the backend is not running.
1894
1895        :param bool update:  Update the queue position from database if True, else return cached value
1896        :return int:  Queue position
1897        """
1898        if self.is_finished() or not self.data.get("job"):
1899            self._queue_position = -1
1900            return self._queue_position
1901        elif not update and self._queue_position is not None:
1902            # Use cached value
1903            return self._queue_position
1904        else:
1905            # Collect queue position from database via the job
1906            try:
1907                job = Job.get_by_ID(self.data["job"], self.db)
1908                self._queue_position = job.get_place_in_queue()
1909            except JobNotFoundException:
1910                self._queue_position = -1
1911
1912            return self._queue_position
1913
1914    def get_own_processor(self):
1915        """
1916        Get the processor class that produced this dataset
1917
1918        :return:  Processor class, or `None` if not available.
1919        """
1920        processor_type = self.parameters.get("type", self.data.get("type"))
1921
1922        return self.modules.processors.get(processor_type)
1923
1924    def get_available_processors(self, config=None, exclude_hidden=False):
1925        """
1926        Get list of processors that may be run for this dataset
1927
1928        Returns all compatible processors except for those that are already
1929        queued or finished and have no options. Processors that have been
1930        run but have options are included so they may be run again with a
1931        different configuration
1932
1933        :param ConfigManager|None config:  Configuration reader to determine
1934        compatibility through. This may not be the same reader the dataset was
1935        instantiated with, e.g. when checking whether some other user should
1936        be able to run processors on this dataset.
1937        :param bool exclude_hidden:  Exclude processors that should be displayed
1938        in the UI? If `False`, all processors are returned.
1939
1940        :return dict:  Available processors, `name => properties` mapping
1941        """
1942        if self.available_processors:
1943            # Update to reflect exclude_hidden parameter which may be different from last call
1944            # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1945            return {
1946                processor_type: processor
1947                for processor_type, processor in self.available_processors.items()
1948                if not exclude_hidden or not processor.is_hidden
1949            }
1950
1951        processors = self.get_compatible_processors(config=config)
1952
1953        for analysis in self.get_children(update=True):
1954            if analysis.type not in processors:
1955                continue
1956
1957            if not processors[analysis.type].get_options(config=config):
1958                # No variable options; this processor has been run so remove
1959                del processors[analysis.type]
1960                continue
1961
1962            if exclude_hidden and processors[analysis.type].is_hidden:
1963                del processors[analysis.type]
1964
1965        self.available_processors = processors
1966        return processors
1967
1968    def link_job(self, job):
1969        """
1970        Link this dataset to a job ID
1971
1972        Updates the dataset data to include a reference to the job that will be
1973        executing (or has already executed) this job.
1974
1975        Note that if no job can be found for this dataset, this method silently
1976        fails.
1977
1978        :param Job job:  The job that will run this dataset
1979
1980        :todo: If the job column ever gets used, make sure it always contains
1981               a valid value, rather than silently failing this method.
1982        """
1983        if type(job) is not Job:
1984            raise TypeError("link_job requires a Job object as its argument")
1985
1986        if "id" not in job.data:
1987            try:
1988                job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"])
1989            except JobNotFoundException:
1990                return
1991
1992        self.db.update(
1993            "datasets", where={"key": self.key}, data={"job": job.data["id"]}
1994        )
1995
1996    def link_parent(self, key_parent):
1997        """
1998        Set source_dataset key for this dataset
1999
2000        :param key_parent:  Parent key. Not checked for validity
2001        """
2002        self.db.update(
2003            "datasets", where={"key": self.key}, data={"key_parent": key_parent}
2004        )
2005        # reset caches
2006        self.data["key_parent"] = key_parent
2007        self._genealogy = None  
2008
2009    def get_parent(self):
2010        """
2011        Get parent dataset
2012
2013        :return DataSet:  Parent dataset, or `None` if not applicable
2014        """
2015        return (
2016            DataSet(key=self.key_parent, db=self.db, modules=self.modules)
2017            if self.key_parent
2018            else None
2019        )
2020
2021    def detach(self):
2022        """
2023        Makes the datasets standalone, i.e. not having any source_dataset dataset
2024        """
2025        self.link_parent("")
2026
2027    def is_dataset(self):
2028        """
2029        Easy way to confirm this is a dataset.
2030        Used for checking processor and dataset compatibility,
2031        which needs to handle both processors and datasets.
2032        """
2033        return True
2034
2035    def is_top_dataset(self):
2036        """
2037        Easy way to confirm this is a top dataset.
2038        Used for checking processor and dataset compatibility,
2039        which needs to handle both processors and datasets.
2040        """
2041        if self.key_parent:
2042            return False
2043        return True
2044
2045    def is_expiring(self, config):
2046        """
2047        Determine if dataset is set to expire
2048
2049        Similar to `is_expired`, but checks if the dataset will be deleted in
2050        the future, not if it should be deleted right now.
2051
2052        :param ConfigManager config:  Configuration reader (context-aware)
2053        :return bool|int:  `False`, or the expiration date as a Unix timestamp.
2054        """
2055        # has someone opted out of deleting this?
2056        if self.parameters.get("keep"):
2057            return False
2058
2059        # is this dataset explicitly marked as expiring after a certain time?
2060        if self.parameters.get("expires-after"):
2061            return self.parameters.get("expires-after")
2062
2063        # is the data source configured to have its datasets expire?
2064        expiration = config.get("datasources.expiration", {})
2065        if not expiration.get(self.parameters.get("datasource")):
2066            return False
2067
2068        # is there a timeout for this data source?
2069        if expiration.get(self.parameters.get("datasource")).get("timeout"):
2070            return self.timestamp + expiration.get(
2071                self.parameters.get("datasource")
2072            ).get("timeout")
2073
2074        return False
2075
2076    def is_expired(self, config):
2077        """
2078        Determine if dataset should be deleted
2079
2080        Datasets can be set to expire, but when they should be deleted depends
2081        on a number of factor. This checks them all.
2082
2083        :param ConfigManager config:  Configuration reader (context-aware)
2084        :return bool:
2085        """
2086        # has someone opted out of deleting this?
2087        if not self.is_expiring(config):
2088            return False
2089
2090        # is this dataset explicitly marked as expiring after a certain time?
2091        future = (
2092                time.time() + 3600
2093        )  # ensure we don't delete datasets with invalid expiration times
2094        if (
2095                self.parameters.get("expires-after")
2096                and convert_to_int(self.parameters["expires-after"], future) < time.time()
2097        ):
2098            return True
2099
2100        # is the data source configured to have its datasets expire?
2101        expiration = config.get("datasources.expiration", {})
2102        if not expiration.get(self.parameters.get("datasource")):
2103            return False
2104
2105        # is the dataset older than the set timeout?
2106        if expiration.get(self.parameters.get("datasource")).get("timeout"):
2107            return (
2108                    self.timestamp
2109                    + expiration[self.parameters.get("datasource")]["timeout"]
2110                    < time.time()
2111            )
2112
2113        return False
2114
2115    def is_from_collector(self):
2116        """
2117        Check if this dataset was made by a processor that collects data, i.e.
2118        a search or import worker.
2119
2120        :return bool:
2121        """
2122        return self.type.endswith("-search") or self.type.endswith("-import")
2123
2124    def get_extension(self):
2125        """
2126        Gets the file extension this dataset produces.
2127        Also checks whether the results file exists.
2128        Used for checking processor and dataset compatibility.
2129
2130        :return str extension:  Extension, e.g. `csv`
2131        """
2132        if self.get_results_path().exists():
2133            return self.get_results_path().suffix[1:]
2134
2135        return False
2136    
2137    def is_filter(self):
2138        """
2139        Check whether a dataset is a filter dataset.
2140
2141        :return bool:  True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown).
2142        """
2143        own_processor = self.get_own_processor()
2144        if own_processor is None:
2145            # Deprecated datasets do not have a processor
2146            return None
2147        return own_processor.is_filter()
2148
2149    def get_media_type(self):
2150        """
2151        Gets the media type of the dataset file.
2152
2153        :return str: media type, e.g., "text"
2154        """
2155        own_processor = self.get_own_processor()
2156        if hasattr(self, "media_type"):
2157            # media type can be defined explicitly in the dataset; this is the priority
2158            return self.media_type
2159        elif own_processor is not None:
2160            # or media type can be defined in the processor
2161            # some processors can set different media types for different datasets (e.g., import_media)
2162            if hasattr(own_processor, "media_type"):
2163                return own_processor.media_type
2164
2165        # Default to text
2166        return self.parameters.get("media_type", "text")
2167
2168    def get_metadata(self):
2169        """
2170        Get dataset metadata
2171
2172        This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
2173        as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
2174        update its database (and ensure compatibility with the exporting version of 4CAT).
2175        """
2176        metadata = self.db.fetchone(
2177            "SELECT * FROM datasets WHERE key = %s", (self.key,)
2178        )
2179
2180        # get 4CAT version (presumably to ensure export is compatible with import)
2181        metadata["current_4CAT_version"] = get_software_version()
2182        return metadata
2183
2184    def get_result_url(self):
2185        """
2186        Gets the 4CAT frontend URL of a dataset file.
2187
2188        Uses the FlaskConfig attributes (i.e., SERVER_NAME and
2189        SERVER_HTTPS) plus hardcoded '/result/'.
2190        TODO: create more dynamic method of obtaining url.
2191        """
2192        filename = self.get_results_path().name
2193
2194        # we cheat a little here by using the modules' config reader, but these
2195        # will never be context-dependent values anyway
2196        url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \
2197                      self.modules.config.get("flask.server_name") + '/result/' + filename
2198        return url_to_file
2199
2200    def warn_unmappable_item(
2201            self, item_count, processor=None, error_message=None, warn_admins=True
2202    ):
2203        """
2204        Log an item that is unable to be mapped and warn administrators.
2205
2206        :param int item_count:			Item index
2207        :param Processor processor:		Processor calling function8
2208        """
2209        dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
2210
2211        # Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
2212        closest_dataset = (
2213            processor.dataset
2214            if processor is not None and processor.dataset is not None
2215            else self
2216        )
2217        # Log error to dataset log
2218        closest_dataset.log(dataset_error_message)
2219
2220        if warn_admins:
2221            if processor is not None:
2222                processor.log.warning(
2223                    f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}."
2224                )
2225            elif hasattr(self.db, "log"):
2226                # borrow the database's log handler
2227                self.db.log.warning(
2228                    f"Unable to map item all items for dataset {closest_dataset.key}."
2229                )
2230            else:
2231                # No other log available
2232                raise DataSetException(
2233                    f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn"
2234                )
2235
2236    # Annotation functions (most of it is handled in Annotations)
2237    def has_annotations(self) -> bool:
2238        """
2239        Whether this dataset has annotations
2240        """
2241
2242        annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,))
2243
2244        return True if annotation else False
2245
2246    def num_annotations(self) -> int:
2247        """
2248        Get the amount of annotations
2249        """
2250        return self.db.fetchone(
2251            "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,)
2252        )["count"]
2253
2254    def get_annotation(self, data: dict):
2255        """
2256        Retrieves a specific annotation if it exists.
2257
2258        :param data:		A dictionary with which to get the annotations from.
2259                                                To get specific annotations, include either an `id` field or
2260                                                `field_id` and `item_id` fields.
2261
2262        return Annotation:	Annotation object.
2263        """
2264
2265        if "id" not in data or ("field_id" not in data and "item_id" not in data):
2266            return None
2267
2268        if "dataset" not in data:
2269            data["dataset"] = self.key
2270
2271        return Annotation(data=data, db=self.db)
2272
2273    def get_annotations(self) -> list:
2274        """
2275        Retrieves all annotations for this dataset.
2276
2277        return list: 	List of Annotation objects.
2278        """
2279
2280        return Annotation.get_annotations_for_dataset(self.db, self.key)
2281
2282    def get_annotations_for_item(self, item_id: str | list, before=0) -> list:
2283        """
2284        Retrieves all annotations from this dataset for a specific item (e.g. social media post).
2285        :param str item_id:  The ID of the annotation item
2286        :param int before:   The upper timestamp range for annotations.
2287        """
2288        return Annotation.get_annotations_for_dataset(
2289            self.db, self.key, item_id=item_id, before=before
2290        )
2291
2292    def has_annotation_fields(self) -> bool:
2293        """
2294        Returns True if there's annotation fields saved tot the dataset table
2295        Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.).
2296        """
2297
2298        return True if self.annotation_fields else False
2299
2300    def get_annotation_field_labels(self) -> list:
2301        """
2302        Retrieves the saved annotation field labels for this dataset.
2303        These are stored in the annotations table.
2304
2305        :return list: List of annotation field labels.
2306        """
2307
2308        annotation_fields = self.annotation_fields
2309
2310        if not annotation_fields:
2311            return []
2312
2313        labels = [v["label"] for v in annotation_fields.values()]
2314
2315        return labels
2316
2317    def save_annotations(self, annotations: list) -> int:
2318        """
2319        Takes a list of annotations and saves them to the annotations table.
2320        If a field is not yet present in the `annotation_fields` column in
2321        the datasets table, it also adds it there.
2322
2323        :param list annotations:		List of dictionaries with annotation items. Must have `item_id`, `field_id`,
2324                                                                        and `label`.
2325                                                                        `item_id` is for the specific item being annotated (e.g. a social media post)
2326                                                                        `field_id` refers to the annotation field.
2327                                                                        `label` is a human-readable description of this annotation.
2328                                                                        E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd",
2329                                                                         "value": "Yes"}]
2330
2331        :returns int:					How many annotations were saved.
2332
2333        """
2334
2335        if not annotations:
2336            return 0
2337
2338        count = 0
2339        annotation_fields = self.annotation_fields
2340
2341        # Add some dataset data to annotations, if not present
2342        for annotation_data in annotations:
2343            # Check if the required fields are present
2344            if not annotation_data.get("item_id"):
2345                raise AnnotationException(
2346                    "Can't save annotations; annotation must have an `item_id` referencing "
2347                    "the item it annotated, got %s" % annotation_data
2348                )
2349            if not annotation_data.get("field_id"):
2350                raise AnnotationException(
2351                    "Can't save annotations; annotation must have a `field_id` field, "
2352                    "got %s" % annotation_data
2353                )
2354            if not annotation_data.get("label") or not isinstance(
2355                    annotation_data["label"], str
2356            ):
2357                raise AnnotationException(
2358                    "Can't save annotations; annotation must have a `label` field, "
2359                    "got %s" % annotation_data
2360                )
2361
2362            # Set dataset key
2363            if not annotation_data.get("dataset"):
2364                annotation_data["dataset"] = self.key
2365
2366            # Set default author to this dataset owner
2367            # If this annotation is made by a processor, it will have the processor name
2368            if not annotation_data.get("author"):
2369                annotation_data["author"] = self.get_owners()[0]
2370
2371            # Create Annotation object, which also saves it to the database
2372            # If this dataset/item_id/field_id combination already exists, this retrieves the
2373            # existing data and updates it with new values.
2374            Annotation(data=annotation_data, db=self.db)
2375            count += 1
2376
2377        # Save annotation fields if things changed
2378        if annotation_fields != self.annotation_fields:
2379            self.save_annotation_fields(annotation_fields)
2380
2381        return count
2382
2383    def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2384        """
2385        Save annotation field data to the datasets table (in the `annotation_fields` column).
2386        If changes to the annotation fields affect existing annotations,
2387        this function will also call `update_annotations_via_fields()` to change them.
2388
2389        :param dict new_fields:  		New annotation fields, with a field ID as key.
2390
2391        :param bool add:				Whether we're merely adding new fields
2392                                                                        or replacing the whole batch. If add is False,
2393                                                                        `new_fields` should contain all fields.
2394
2395        :return int:					The number of annotation fields saved.
2396
2397        """
2398
2399        # Get existing annotation fields to see if stuff changed.
2400        old_fields = self.annotation_fields
2401        changes = False
2402
2403        # Annotation field must be valid JSON.
2404        try:
2405            json.dumps(new_fields)
2406        except ValueError:
2407            raise AnnotationException(
2408                "Can't save annotation fields: not valid JSON (%s)" % new_fields
2409            )
2410
2411        # No duplicate IDs
2412        if len(new_fields) != len(set(new_fields)):
2413            raise AnnotationException(
2414                "Can't save annotation fields: field IDs must be unique"
2415            )
2416
2417        # Annotation fields must at minimum have `type` and `label` keys.
2418        for field_id, annotation_field in new_fields.items():
2419            if not isinstance(field_id, str):
2420                raise AnnotationException(
2421                    "Can't save annotation fields: field ID %s is not a valid string"
2422                    % field_id
2423                )
2424            if "label" not in annotation_field:
2425                raise AnnotationException(
2426                    "Can't save annotation fields: all fields must have a label"
2427                    % field_id
2428                )
2429            if "type" not in annotation_field:
2430                raise AnnotationException(
2431                    "Can't save annotation fields: all fields must have a type"
2432                    % field_id
2433                )
2434
2435        # Check if fields are removed
2436        if not add and old_fields:
2437            for field_id in old_fields.keys():
2438                if field_id not in new_fields:
2439                    changes = True
2440
2441        # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin
2442        # dataset
2443        for field_id in new_fields.keys():
2444            if field_id in old_fields and old_fields[field_id].get("from_dataset"):
2445                old_fields[field_id]["label"] = new_fields[field_id][
2446                    "label"
2447                ]  # Only labels could've been changed
2448                new_fields[field_id] = old_fields[field_id]
2449
2450        # If we're just adding fields, add them to the old fields.
2451        # If the field already exists, overwrite the old field.
2452        if add and old_fields:
2453            all_fields = old_fields
2454            for field_id, annotation_field in new_fields.items():
2455                all_fields[field_id] = annotation_field
2456            new_fields = all_fields
2457
2458        # We're saving the new annotation fields as-is.
2459        # Ordering of fields is preserved this way.
2460        self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)})
2461        self.annotation_fields = new_fields
2462
2463        # If anything changed with the annotation fields, possibly update
2464        # existing annotations (e.g. to delete them or change their labels).
2465        if changes:
2466            Annotation.update_annotations_via_fields(
2467                self.key, old_fields, new_fields, self.db
2468            )
2469
2470        return len(new_fields)
2471
2472    def get_annotation_metadata(self) -> dict:
2473        """
2474        Retrieves all the data for this dataset from the annotations table.
2475        """
2476
2477        annotation_data = self.db.fetchall(
2478            "SELECT * FROM annotations WHERE dataset = '%s';" % self.key
2479        )
2480        return annotation_data
2481
2482    def __getattr__(self, attr):
2483        """
2484        Getter so we don't have to use .data all the time
2485
2486        :param attr:  Data key to get
2487        :return:  Value
2488        """
2489
2490        if attr in dir(self):
2491            # an explicitly defined attribute should always be called in favour
2492            # of this passthrough
2493            attribute = getattr(self, attr)
2494            return attribute
2495        elif attr in self.data:
2496            return self.data[attr]
2497        else:
2498            raise AttributeError("DataSet instance has no attribute %s" % attr)
2499
2500    def __setattr__(self, attr, value):
2501        """
2502        Setter so we can flexibly update the database
2503
2504        Also updates internal data stores (.data etc). If the attribute is
2505        unknown, it is stored within the 'parameters' attribute.
2506
2507        :param str attr:  Attribute to update
2508        :param value:  New value
2509        """
2510
2511        # don't override behaviour for *actual* class attributes
2512        if attr in dir(self):
2513            super().__setattr__(attr, value)
2514            return
2515
2516        if attr not in self.data:
2517            self.parameters[attr] = value
2518            attr = "parameters"
2519            value = self.parameters
2520
2521        if attr == "parameters":
2522            value = json.dumps(value)
2523
2524        self.db.update("datasets", where={"key": self.key}, data={attr: value})
2525
2526        self.data[attr] = value
2527
2528        if attr == "parameters":
2529            self.parameters = json.loads(value)
class DataSet(common.lib.fourcat_module.FourcatModule):
  25class DataSet(FourcatModule):
  26    """
  27    Provide interface to safely register and run operations on a dataset
  28
  29    A dataset is a collection of:
  30    - A unique identifier
  31    - A set of parameters that demarcate the data contained within
  32    - The data
  33
  34    The data is usually stored in a file on the disk; the parameters are stored
  35    in a database. The handling of the data, et cetera, is done by other
  36    workers; this class defines method to create and manipulate the dataset's
  37    properties.
  38    """
  39
  40    # Attributes must be created here to ensure getattr and setattr work properly
  41    data = None
  42    key = ""
  43
  44    _children = None
  45    available_processors = None
  46    _genealogy = None
  47    preset_parent = None
  48    parameters = None
  49    modules = None
  50    annotation_fields = None
  51
  52    owners = None
  53    tagged_owners = None
  54
  55    db = None
  56    folder = None
  57    is_new = True
  58
  59    no_status_updates = False
  60    disposable_files = None
  61    _queue_position = None
  62
  63    def __init__(
  64            self,
  65            parameters=None,
  66            key=None,
  67            job=None,
  68            data=None,
  69            db=None,
  70            parent="",
  71            extension=None,
  72            type=None,
  73            is_private=True,
  74            owner="anonymous",
  75            modules=None
  76    ):
  77        """
  78        Create new dataset object
  79
  80        If the dataset is not in the database yet, it is added.
  81
  82        :param dict parameters:  Only when creating a new dataset. Dataset
  83        parameters, free-form dictionary.
  84        :param str key: Dataset key. If given, dataset with this key is loaded.
  85        :param int job: Job ID. If given, dataset corresponding to job is
  86        loaded.
  87        :param dict data: Dataset data, corresponding to a row in the datasets
  88        database table. If not given, retrieved from database depending on key.
  89        :param db:  Database connection
  90        :param str parent:  Only when creating a new dataset. Parent dataset
  91        key to which the one being created is a child.
  92        :param str extension: Only when creating a new dataset. Extension of
  93        dataset result file.
  94        :param str type: Only when creating a new dataset. Type of the dataset,
  95        corresponding to the type property of a processor class.
  96        :param bool is_private: Only when creating a new dataset. Whether the
  97        dataset is private or public.
  98        :param str owner: Only when creating a new dataset. The user name of
  99        the dataset's creator.
 100        :param modules: Module cache. If not given, will be loaded when needed
 101        (expensive). Used to figure out what processors are compatible with
 102        this dataset.
 103        """
 104        self.db = db
 105
 106        # Ensure mutable attributes are set in __init__ as they are unique to each DataSet
 107        self.data = {}
 108        self.parameters = {}
 109        self.available_processors = {}
 110        self._genealogy = None
 111        self.disposable_files = []
 112        self.modules = modules
 113
 114        if key is not None:
 115            self.key = key
 116            current = self.db.fetchone(
 117                "SELECT * FROM datasets WHERE key = %s", (self.key,)
 118            )
 119            if not current:
 120                raise DataSetNotFoundException(
 121                    "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given"
 122                    % key
 123                )
 124
 125        elif job is not None:
 126            current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),))
 127            if not current:
 128                raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
 129
 130            self.key = current["key"]
 131        elif data is not None:
 132            current = data
 133            if (
 134                    "query" not in data
 135                    or "key" not in data
 136                    or "parameters" not in data
 137                    or "key_parent" not in data
 138            ):
 139                raise DataSetException(
 140                    "DataSet() requires a complete dataset record for its 'data' argument"
 141                )
 142
 143            self.key = current["key"]
 144        else:
 145            if parameters is None:
 146                raise DataSetException(
 147                    "DataSet() requires either 'key', or 'parameters' to be given"
 148                )
 149
 150            if not type:
 151                raise DataSetException("Datasets must have their type set explicitly")
 152
 153            query = self.get_label(parameters, default=type)
 154            self.key = self.get_key(query, parameters, parent)
 155            current = self.db.fetchone(
 156                "SELECT * FROM datasets WHERE key = %s AND query = %s",
 157                (self.key, query),
 158            )
 159
 160        if current:
 161            self.data = current
 162            self.parameters = json.loads(self.data["parameters"])
 163            self.annotation_fields = json.loads(self.data["annotation_fields"]) \
 164                if self.data.get("annotation_fields") else {}
 165            self.is_new = False
 166        else:
 167            self.data = {"type": type}  # get_own_processor needs this
 168            own_processor = self.get_own_processor()
 169            version = get_software_commit(own_processor)
 170            self.data = {
 171                "key": self.key,
 172                "query": self.get_label(parameters, default=type),
 173                "parameters": json.dumps(parameters),
 174                "result_file": "",
 175                "creator": owner,
 176                "status": "",
 177                "type": type,
 178                "timestamp": int(time.time()),
 179                "is_finished": False,
 180                "is_private": is_private,
 181                "software_version": version[0],
 182                "software_source": version[1],
 183                "software_file": "",
 184                "num_rows": 0,
 185                "progress": 0.0,
 186                "key_parent": parent,
 187                "annotation_fields": "{}"
 188            }
 189            self.parameters = parameters
 190            self.annotation_fields = {}
 191
 192            self.db.insert("datasets", data=self.data)
 193            self.refresh_owners()
 194            self.add_owner(owner)
 195
 196            # Find desired extension from processor if not explicitly set
 197            if extension is None:
 198                if own_processor:
 199                    extension = own_processor.get_extension(
 200                        parent_dataset=DataSet(key=parent, db=db, modules=self.modules)
 201                        if parent
 202                        else None
 203                    )
 204                # Still no extension, default to 'csv'
 205                if not extension:
 206                    extension = "csv"
 207
 208            # Reserve filename and update data['result_file']
 209            self.reserve_result_file(parameters, extension)
 210
 211        self.refresh_owners()
 212
 213    def check_dataset_finished(self):
 214        """
 215        Checks if dataset is finished. Returns path to results file is not empty,
 216        or 'empty_file' when there were not matches.
 217
 218        Only returns a path if the dataset is complete. In other words, if this
 219        method returns a path, a file with the complete results for this dataset
 220        will exist at that location.
 221
 222        :return: A path to the results file, 'empty_file', or `None`
 223        """
 224        if self.data["is_finished"] and self.data["num_rows"] > 0:
 225            return self.get_results_path()
 226        elif self.data["is_finished"] and self.data["num_rows"] == 0:
 227            return 'empty'
 228        else:
 229            return None
 230
 231    def get_results_path(self):
 232        """
 233        Get path to results file
 234
 235        Always returns a path, that will at some point contain the dataset
 236        data, but may not do so yet. Use this to get the location to write
 237        generated results to.
 238
 239        :return Path:  A path to the results file
 240        """
 241        # alas we need to instantiate a config reader here - no way around it
 242        if not self.folder:
 243            self.folder = self.modules.config.get('PATH_DATA')
 244        return self.folder.joinpath(self.data["result_file"])
 245
 246    def get_results_folder_path(self):
 247        """
 248        Get path to folder containing accompanying results
 249
 250        Returns a path that may not yet be created
 251
 252        :return Path:  A path to the results file
 253        """
 254        return self.get_results_path().parent.joinpath("folder_" + self.key)
 255
 256    def get_log_path(self):
 257        """
 258        Get path to dataset log file
 259
 260        Each dataset has a single log file that documents its creation. This
 261        method returns the path to that file. It is identical to the path of
 262        the dataset result file, with 'log' as its extension instead.
 263
 264        :return Path:  A path to the log file
 265        """
 266        return self.get_results_path().with_suffix(".log")
 267
 268    def clear_log(self):
 269        """
 270        Clears the dataset log file
 271
 272        If the log file does not exist, it is created empty. The log file will
 273        have the same file name as the dataset result file, with the 'log'
 274        extension.
 275        """
 276        log_path = self.get_log_path()
 277        with log_path.open("w"):
 278            pass
 279
 280    def log(self, log):
 281        """
 282        Write log message to file
 283
 284        Writes the log message to the log file on a new line, including a
 285        timestamp at the start of the line. Note that this assumes the log file
 286        already exists - it should have been created/cleared with clear_log()
 287        prior to calling this.
 288
 289        :param str log:  Log message to write
 290        """
 291        log_path = self.get_log_path()
 292        with log_path.open("a", encoding="utf-8") as outfile:
 293            outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))
 294
 295    def _iterate_items(self, processor=None, offset=0, *args, **kwargs):
 296        """
 297        A generator that iterates through a CSV or NDJSON file
 298
 299        This is an internal method and should not be called directly. Rather,
 300        call iterate_items() and use the generated dictionary and its properties.
 301
 302        If a reference to a processor is provided, with every iteration,
 303        the processor's 'interrupted' flag is checked, and if set a
 304        ProcessorInterruptedException is raised, which by default is caught
 305        in the worker and subsequently stops execution gracefully.
 306
 307        There are two file types that can be iterated (currently): CSV files
 308        and NDJSON (newline-delimited JSON) files. In the future, one could
 309        envision adding a pathway to retrieve items from e.g. a MongoDB
 310        collection directly instead of from a static file
 311
 312        :param BasicProcessor processor:  A reference to the processor
 313        iterating the dataset.
 314        :param offset int:  How many items to skip.
 315        :return generator:  A generator that yields each item as a dictionary
 316        """
 317        path = self.get_results_path()
 318
 319        # Yield through items one by one
 320        if path.suffix.lower() == ".csv":
 321            with path.open("rb") as infile:
 322                wrapped_infile = NullAwareTextIOWrapper(infile, encoding="utf-8")
 323                reader = csv.DictReader(wrapped_infile)
 324
 325                if not self.get_own_processor():
 326                    # Processor was deprecated or removed; CSV file is likely readable but some legacy types are not
 327                    first_item = next(reader)
 328                    if first_item is None or any(
 329                            [True for key in first_item if type(key) is not str]
 330                    ):
 331                        raise NotImplementedError(
 332                            f"Cannot iterate through CSV file (deprecated processor {self.type})"
 333                        )
 334                    yield first_item
 335
 336                for i, item in enumerate(reader):
 337                    if hasattr(processor, "interrupted") and processor.interrupted:
 338                        raise ProcessorInterruptedException(
 339                            "Processor interrupted while iterating through CSV file"
 340                        )
 341
 342                    if i < offset:
 343                        continue
 344
 345                    yield item
 346
 347        elif path.suffix.lower() == ".ndjson":
 348            # In NDJSON format each line in the file is a self-contained JSON
 349            with path.open(encoding="utf-8") as infile:
 350                for i, line in enumerate(infile):
 351                    if hasattr(processor, "interrupted") and processor.interrupted:
 352                        raise ProcessorInterruptedException(
 353                            "Processor interrupted while iterating through NDJSON file"
 354                        )
 355
 356                    if i < offset:
 357                        continue
 358
 359                    yield json.loads(line)
 360
 361        else:
 362            raise NotImplementedError(f"Cannot iterate through {path.suffix} file")
 363
 364    def _iterate_archive_contents(
 365            self,
 366            staging_area=None,
 367            immediately_delete=True,
 368            filename_filter=None,
 369            processor=None,
 370            offset=0,
 371            *args, **kwargs
 372        ):
 373        """
 374        A generator that iterates through files in an archive
 375
 376        With every iteration, the processor's 'interrupted' flag is checked,
 377        and if set a ProcessorInterruptedException is raised, which by default
 378        is caught and subsequently stops execution gracefully.
 379
 380        Files are temporarily unzipped and deleted after use.
 381
 382        :param Path staging_area:  Where to store the files while they're
 383          being worked with. If omitted, a temporary folder is created and
 384          marked for deletion after all files have been yielded
 385        :param bool immediately_delete:  Temporary files are removed after
 386          yielding; False keeps files until the staging_area is removed
 387        :param list filename_filter:  Whitelist of filenames to iterate. If
 388          empty, do not filter
 389        :param BasicProcessor processor:  A reference to the processor
 390          iterating the dataset.
 391        :param int offset:  Skip this many files before yielding (warning: may
 392          skip a metadata file too!)
 393        :return:  An iterator with a dictionary for each file, containing an
 394          `id`, a `path`, and all attributes of the `ZipInfo` object as keys
 395        """
 396        path = self.get_results_path()
 397        if not path.exists():
 398            return
 399
 400        if not staging_area:
 401            staging_area = self.get_staging_area()
 402
 403        if not staging_area.exists() or not staging_area.is_dir():
 404            raise RuntimeError(f"Staging area {staging_area} is not a valid folder")
 405
 406        iterations = 0
 407        with zipfile.ZipFile(path, "r") as archive_file:
 408            # sorting is important because it ensures .metadata.json is read
 409            # first
 410            archive_contents = sorted(archive_file.infolist(), key=lambda x: x.filename)
 411            for archived_file in archive_contents:
 412
 413                if filename_filter and archived_file.filename not in filename_filter:
 414                    continue
 415
 416                if archived_file.is_dir():
 417                    # do not yield folders - we'll get to the files in them
 418                    continue
 419
 420                if iterations < offset:
 421                    iterations += 1
 422                    continue
 423
 424                if hasattr(processor, "interrupted") and processor.interrupted:
 425                    raise ProcessorInterruptedException(
 426                        "Processor interrupted while iterating through Zip archive"
 427                    )
 428
 429                iterations += 1
 430                temp_file = staging_area.joinpath(archived_file.filename)
 431                archive_file.extract(archived_file.filename, staging_area)
 432
 433                # iterated items are expected as a dictionary
 434                # we thus make a dictionary from the ZipInfo object
 435                # and use the path (inside the archive) as a unique ID
 436                yield {
 437                    "id": archived_file.filename,
 438                    "path": temp_file,
 439                    **{
 440                        attribute: getattr(archived_file, attribute) for attribute in dir(archived_file) if not attribute.startswith("_")
 441                    }
 442                }
 443
 444                if immediately_delete:
 445                    # this, effectively, triggers when the *next* item is
 446                    # asked for, or if it is the last file
 447                    temp_file.unlink()
 448
 449    def iterate_items(
 450            self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None,
 451            offset=0, *args, **kwargs
 452    ):
 453        """
 454        Generate mapped dataset items
 455
 456        Wrapper for _iterate_items that returns a DatasetItem, which can be
 457        accessed as a dict returning the original item or (if a mapper is
 458        available) the mapped item. Mapped or original versions of the item can
 459        also be accessed via the `original` and `mapped_object` properties of
 460        the DatasetItem.
 461
 462        Processors can define a method called `map_item` that can be used to map
 463        an item from the dataset file before it is processed any further. This is
 464        slower than storing the data file in the right format to begin with but
 465        not all data sources allow for easy 'flat' mapping of items, e.g. tweets
 466        are nested objects when retrieved from the twitter API that are easier
 467        to store as a JSON file than as a flat CSV file, and it would be a shame
 468        to throw away that data.
 469
 470        Note the two parameters warn_unmappable and map_missing. Items can be
 471        unmappable in that their structure is too different to coerce into a
 472        neat dictionary of the structure the data source expects. This makes it
 473        'unmappable' and warn_unmappable determines what happens in this case.
 474        It can also be of the right structure, but with some fields missing or
 475        incomplete. map_missing determines what happens in that case. The
 476        latter is for example possible when importing data via Zeeschuimer,
 477        which produces unstably-structured data captured from social media
 478        sites.
 479
 480        :param BasicProcessor processor:  A reference to the processor
 481        iterating the dataset.
 482        :param bool warn_unmappable:  If an item is not mappable, skip the item
 483        and log a warning
 484        :param max_unmappable:  Skip at most this many unmappable items; if
 485        more are encountered, stop iterating. `None` to never stop.
 486        :param map_missing: Indicates what to do with mapped items for which
 487        some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
 488        - 'default': fill missing fields with the default passed by map_item
 489        - 'abort': raise a MappedItemIncompleteException if a field is missing
 490        - a callback: replace missing field with the return value of the
 491          callback. The MappedItem object is passed to the callback as the
 492          first argument and the name of the missing field as the second.
 493        - a dictionary with a key for each possible missing field: replace missing
 494          field with a strategy for that field ('default', 'abort', or a callback)
 495        :param get_annotations: Whether to also fetch annotations from the database.
 496          This can be disabled to help speed up iteration.
 497        :param offset: After how many rows we should yield items.
 498        :param bool immediately_delete:  Only used when iterating a file
 499          archive. Defaults to `True`, if set to `False`, files are not deleted
 500          from the staging area after the iteration, so they can be re-used.
 501        :param staging_area:  Only used when iterating a file archive. Where to
 502          store the files while they're being worked with. If omitted, a
 503          temporary folder is created and marked for deletion after all files
 504          have been yielded.
 505        :param list filename_filter:  Only used when iterating a file archive.
 506          Whitelist of filenames to iterate, others are skipped. If empty, do
 507          not filter.
 508        :return generator:  A generator that yields DatasetItems
 509        """
 510        unmapped_items = 0
 511
 512        # Collect item_mapper for use with filter
 513        item_mapper = False
 514        own_processor = self.get_own_processor()
 515        if own_processor and own_processor.map_item_method_available(dataset=self):
 516            item_mapper = True
 517
 518        # Annotations are dynamically added, and we're handling them as 'extra' map_item fields.
 519        # If we're getting annotations, we're caching items so we don't need to retrieve annotations one-by-one.
 520        get_annotations = True if self.annotation_fields and get_annotations else False
 521        if get_annotations:
 522            annotation_fields = self.annotation_fields.copy()
 523            item_batch_size = 500
 524            dataset_item_cache = []
 525            annotations_before = int(time.time())
 526
 527            # Append a number to annotation labels if there's duplicate ones
 528            annotation_labels = {}
 529            for (annotation_field_id, annotation_field_items,) in annotation_fields.items():
 530                unique_label = annotation_field_items["label"]
 531                counter = 1
 532                while unique_label in annotation_labels.values():
 533                    counter += 1
 534                    unique_label = f"{annotation_field_items['label']}_{counter}"
 535                annotation_labels[annotation_field_id] = unique_label
 536
 537        # missing field strategy can be for all fields at once, or per field
 538        # if it is per field, it is a dictionary with field names and their strategy
 539        # if it is for all fields, it may be a callback, 'abort', or 'default'
 540        default_strategy = "default"
 541        if type(map_missing) is not dict:
 542            default_strategy = map_missing
 543            map_missing = {}
 544
 545        iterator = self._iterate_items if self.get_extension() != "zip" else self._iterate_archive_contents
 546
 547        # Loop through items
 548        for i, item in enumerate(iterator(processor=processor, offset=offset, *args, **kwargs)):
 549            # Save original to yield
 550            original_item = item.copy()
 551
 552            # Map item
 553            if item_mapper:
 554                try:
 555                    mapped_item = own_processor.get_mapped_item(item)
 556                except MapItemException as e:
 557                    if warn_unmappable:
 558                        self.warn_unmappable_item(
 559                            i, processor, e, warn_admins=unmapped_items is False
 560                        )
 561
 562                    unmapped_items += 1
 563                    if max_unmappable and unmapped_items > max_unmappable:
 564                        break
 565                    else:
 566                        continue
 567
 568                # check if fields have been marked as 'missing' in the
 569                # underlying data, and treat according to the chosen strategy
 570                if mapped_item.get_missing_fields():
 571                    for missing_field in mapped_item.get_missing_fields():
 572                        strategy = map_missing.get(missing_field, default_strategy)
 573
 574                        if callable(strategy):
 575                            # delegate handling to a callback
 576                            mapped_item.data[missing_field] = strategy(
 577                                mapped_item.data, missing_field
 578                            )
 579                        elif strategy == "abort":
 580                            # raise an exception to be handled at the processor level
 581                            raise MappedItemIncompleteException(
 582                                f"Cannot process item, field {missing_field} missing in source data."
 583                            )
 584                        elif strategy == "default":
 585                            # use whatever was passed to the object constructor
 586                            mapped_item.data[missing_field] = mapped_item.data[
 587                                missing_field
 588                            ].value
 589                        else:
 590                            raise ValueError(
 591                                "map_missing must be 'abort', 'default', or a callback."
 592                            )
 593            else:
 594                mapped_item = original_item
 595
 596            # yield a DatasetItem, which is a dict with some special properties
 597            dataset_item = DatasetItem(
 598                mapper=item_mapper,
 599                original=original_item,
 600                mapped_object=mapped_item,
 601                data_file=original_item["path"] if "path" in original_item and issubclass(type(original_item["path"]), os.PathLike) else None,
 602                **(
 603                    mapped_item.get_item_data()
 604                    if type(mapped_item) is MappedItem
 605                    else mapped_item
 606                ),
 607            )
 608
 609            # If we're getting annotations, yield in items batches so we don't need to get annotations per item.
 610            if get_annotations:
 611                dataset_item_cache.append(dataset_item)
 612
 613                # When we reach the batch limit or the end of the dataset,
 614                # get the annotations for cached items and yield the entire thing.
 615                if len(dataset_item_cache) >= item_batch_size or i == (self.num_rows - 1):
 616
 617                    item_ids = [dataset_item.get("id") for dataset_item in dataset_item_cache]
 618
 619                    # Dict with item ids for fast lookup
 620                    annotations_dict = collections.defaultdict(dict)
 621                    annotations = self.get_annotations_for_item(item_ids, before=annotations_before)
 622                    for item_annotation in annotations:
 623                        item_id = item_annotation.item_id
 624                        if item_annotation:
 625                            annotations_dict[item_id][item_annotation.field_id] = item_annotation.value
 626
 627                    # Process each dataset item
 628                    for dataset_item in dataset_item_cache:
 629                        item_id = dataset_item.get("id")
 630                        item_annotations = annotations_dict.get(item_id, {})
 631
 632                        for annotation_field_id, annotation_field_items in annotation_fields.items():
 633                            # Get annotation value
 634                            value = item_annotations.get(annotation_field_id, "")
 635
 636                            # Convert list to string if needed
 637                            if isinstance(value, list):
 638                                value = ",".join(value)
 639                            elif value != "":
 640                                value = str(value)  # Ensure string type
 641                            else:
 642                                value = ""
 643
 644                            dataset_item[annotation_labels[annotation_field_id]] = value
 645
 646                        yield dataset_item
 647
 648                    dataset_item_cache = []
 649
 650            else:
 651                yield dataset_item
 652
 653
 654    def sort_and_iterate_items(
 655            self, sort="", reverse=False, chunk_size=50000, **kwargs
 656    ) -> dict:
 657        """
 658        Loop through items in a dataset, sorted by a given key.
 659
 660        This is a wrapper function for `iterate_items()` with the
 661        added functionality of sorting a dataset.
 662
 663        :param sort:				The item key that determines the sort order.
 664        :param reverse:				Whether to sort by largest values first.
 665        :param chunk_size:          How many items to write
 666
 667        :returns dict:				Yields iterated post
 668        """
 669
 670        def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False):
 671            """
 672            Sort items based on the given key and order.
 673
 674            :param items_to_sort:  The items to sort
 675            :param sort_key:  The key to sort by
 676            :param reverse:  Whether to sort in reverse order
 677            :return:  Sorted items
 678            """
 679            if reverse is False and (sort_key == "dataset-order" or sort_key == ""):
 680                # Sort by dataset order
 681                yield from items_to_sort
 682            elif sort_key == "dataset-order" and reverse:
 683                # Sort by dataset order in reverse
 684                yield from reversed(list(items_to_sort))
 685            else:
 686                # Sort on the basis of a column value
 687                if not convert_sort_to_float:
 688                    yield from sorted(
 689                        items_to_sort,
 690                        key=lambda x: x.get(sort_key, ""),
 691                        reverse=reverse,
 692                    )
 693                else:
 694                    # Dataset fields can contain integers and empty strings.
 695                    # Since these cannot be compared, we will convert every
 696                    # empty string to 0.
 697                    yield from sorted(
 698                        items_to_sort,
 699                        key=lambda x: convert_to_float(x.get(sort_key, ""), force=True),
 700                        reverse=reverse,
 701                    )
 702
 703        if self.num_rows < chunk_size:
 704            try:
 705                # First try to force-sort float values. If this doesn't work, it'll be alphabetical.
 706                yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True)
 707            except (TypeError, ValueError):
 708                yield from sort_items(
 709                    self.iterate_items(**kwargs),
 710                    sort,
 711                    reverse,
 712                    convert_sort_to_float=False
 713                )
 714
 715        else:
 716            # For large datasets, we will use chunk sorting
 717            staging_area = self.get_staging_area()
 718            buffer = []
 719            chunk_files = []
 720            convert_sort_to_float = True
 721            fieldnames = self.get_columns()
 722
 723            def write_chunk(buffer, chunk_index):
 724                """
 725                Write a chunk of data to a temporary file
 726
 727                :param buffer:  The buffer containing the chunk of data
 728                :param chunk_index:  The index of the chunk
 729                :return:  The path to the temporary file
 730                """
 731                temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv")
 732                with temp_file.open("w", encoding="utf-8") as chunk_file:
 733                    writer = csv.DictWriter(chunk_file, fieldnames=fieldnames)
 734                    writer.writeheader()
 735                    writer.writerows(buffer)
 736                return temp_file
 737
 738            # Divide the dataset into sorted chunks
 739            for item in self.iterate_items(**kwargs):
 740                buffer.append(item)
 741                if len(buffer) >= chunk_size:
 742                    try:
 743                        buffer = list(
 744                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 745                        )
 746                    except (TypeError, ValueError):
 747                        convert_sort_to_float = False
 748                        buffer = list(
 749                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
 750                        )
 751
 752                    chunk_files.append(write_chunk(buffer, len(chunk_files)))
 753                    buffer.clear()
 754
 755            # Sort and write any remaining items in the buffer
 756            if buffer:
 757                buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float))
 758                chunk_files.append(write_chunk(buffer, len(chunk_files)))
 759                buffer.clear()
 760
 761            # Merge sorted chunks into the final sorted file
 762            sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv")
 763            with sorted_file.open("w", encoding="utf-8") as outfile:
 764                writer = csv.DictWriter(outfile, fieldnames=self.get_columns())
 765                writer.writeheader()
 766
 767                # Open all chunk files for reading
 768                chunk_readers = [
 769                    csv.DictReader(chunk.open("r", encoding="utf-8"))
 770                    for chunk in chunk_files
 771                ]
 772                heap = []
 773
 774                # Initialize the heap with the first row from each chunk
 775                for i, reader in enumerate(chunk_readers):
 776                    try:
 777                        row = next(reader)
 778                        if sort == "dataset-order" and reverse:
 779                            # Use a reverse index for "dataset-order" and reverse=True
 780                            sort_key = -i
 781                        elif convert_sort_to_float:
 782                            # Negate numeric keys for reverse sorting
 783                            sort_key = (
 784                                -convert_to_float(row.get(sort, ""))
 785                                if reverse
 786                                else convert_to_float(row.get(sort, ""))
 787                            )
 788                        else:
 789                            if reverse:
 790                                # For reverse string sorting, invert string comparison by creating a tuple
 791                                # with an inverted string - this makes Python's tuple comparison work in reverse
 792                                sort_key = (
 793                                    tuple(-ord(c) for c in row.get(sort, "")),
 794                                    -i,
 795                                )
 796                            else:
 797                                sort_key = (row.get(sort, ""), i)
 798                        heap.append((sort_key, i, row))
 799                    except StopIteration:
 800                        pass
 801
 802                # Use a heap to merge sorted chunks
 803                import heapq
 804
 805                heapq.heapify(heap)
 806                while heap:
 807                    _, chunk_index, smallest_row = heapq.heappop(heap)
 808                    writer.writerow(smallest_row)
 809                    try:
 810                        next_row = next(chunk_readers[chunk_index])
 811                        if sort == "dataset-order" and reverse:
 812                            # Use a reverse index for "dataset-order" and reverse=True
 813                            sort_key = -chunk_index
 814                        elif convert_sort_to_float:
 815                            sort_key = (
 816                                -convert_to_float(next_row.get(sort, ""))
 817                                if reverse
 818                                else convert_to_float(next_row.get(sort, ""))
 819                            )
 820                        else:
 821                            # Use the same inverted comparison for string values
 822                            if reverse:
 823                                sort_key = (
 824                                    tuple(-ord(c) for c in next_row.get(sort, "")),
 825                                    -chunk_index,
 826                                )
 827                            else:
 828                                sort_key = (next_row.get(sort, ""), chunk_index)
 829                        heapq.heappush(heap, (sort_key, chunk_index, next_row))
 830                    except StopIteration:
 831                        pass
 832
 833            # Read the sorted file and yield each item
 834            with sorted_file.open("r", encoding="utf-8") as infile:
 835                reader = csv.DictReader(infile)
 836                for item in reader:
 837                    yield item
 838
 839            # Remove the temporary files
 840            if staging_area.is_dir():
 841                shutil.rmtree(staging_area)
 842
 843    def get_staging_area(self):
 844        """
 845        Get path to a temporary folder in which files can be stored before
 846        finishing
 847
 848        This folder must be created before use, but is guaranteed to not exist
 849        yet. The folder may be used as a staging area for the dataset data
 850        while it is being processed.
 851
 852        :return Path:  Path to folder
 853        """
 854        results_file = self.get_results_path()
 855
 856        results_dir_base = results_file.parent
 857        results_dir = results_file.name.replace(".", "") + "-staging"
 858        results_path = results_dir_base.joinpath(results_dir)
 859        index = 1
 860        while results_path.exists():
 861            results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
 862            index += 1
 863
 864        # create temporary folder
 865        results_path.mkdir()
 866
 867        # Storing the staging area with the dataset so that it can be removed later
 868        self.disposable_files.append(results_path)
 869
 870        return results_path
 871
 872    def remove_disposable_files(self):
 873        """
 874        Remove any disposable files and folders, such as staging areas
 875
 876        Called from BasicProcessor after processing a dataset finishes.
 877        """
 878        # Remove DataSet staging areas
 879        if self.disposable_files:
 880            for disposable_file in self.disposable_files:
 881                if disposable_file.exists():
 882                    shutil.rmtree(disposable_file)
 883
 884    def finish(self, num_rows=0):
 885        """
 886        Declare the dataset finished
 887        """
 888        if self.data["is_finished"]:
 889            raise RuntimeError("Cannot finish a finished dataset again")
 890
 891        self.db.update(
 892            "datasets",
 893            where={"key": self.data["key"]},
 894            data={
 895                "is_finished": True,
 896                "num_rows": num_rows,
 897                "progress": 1.0,
 898                "timestamp_finished": int(time.time()),
 899            },
 900        )
 901        self.data["is_finished"] = True
 902        self.data["num_rows"] = num_rows
 903
 904    def copy(self, shallow=True):
 905        """
 906        Copies the dataset, making a new version with a unique key
 907
 908
 909        :param bool shallow:  Shallow copy: does not copy the result file, but
 910        instead refers to the same file as the original dataset did
 911        :return Dataset:  Copied dataset
 912        """
 913        parameters = self.parameters.copy()
 914
 915        # a key is partially based on the parameters. so by setting these extra
 916        # attributes, we also ensure a unique key will be generated for the
 917        # copy
 918        # possibly todo: don't use time for uniqueness (but one shouldn't be
 919        # copying a dataset multiple times per microsecond, that's not what
 920        # this is for)
 921        parameters["copied_from"] = self.key
 922        parameters["copied_at"] = time.time()
 923
 924        copy = DataSet(
 925            parameters=parameters,
 926            db=self.db,
 927            extension=self.result_file.split(".")[-1],
 928            type=self.type,
 929            modules=self.modules
 930        )
 931
 932        for field in self.data:
 933            if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
 934                continue
 935            copy.__setattr__(field, self.data[field])
 936
 937        if shallow:
 938            # use the same result file
 939            copy.result_file = self.result_file
 940        else:
 941            # copy to new file with new key
 942            shutil.copy(self.get_results_path(), copy.get_results_path())
 943
 944        if self.is_finished():
 945            copy.finish(self.num_rows)
 946
 947        # make sure ownership is also copied
 948        copy.copy_ownership_from(self)
 949
 950        return copy
 951
 952    def delete(self, commit=True, queue=None):
 953        """
 954        Delete the dataset, and all its children
 955
 956        Deletes both database records and result files. Note that manipulating
 957        a dataset object after it has been deleted is undefined behaviour.
 958
 959        :param bool commit:  Commit SQL DELETE query?
 960        """
 961        # first, recursively delete children
 962        children = self.db.fetchall(
 963            "SELECT * FROM datasets WHERE key_parent = %s", (self.key,)
 964        )
 965        for child in children:
 966            try:
 967                child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 968                child.delete(commit=commit)
 969            except DataSetException:
 970                # dataset already deleted - race condition?
 971                pass
 972
 973        # delete any queued jobs for this dataset
 974        try:
 975            job = Job.get_by_remote_ID(self.key, self.db, self.type)
 976            if job.is_claimed:
 977                # tell API to stop any jobs running for this dataset
 978                # level 2 = cancel job
 979                # we're not interested in the result - if the API is available,
 980                # it will do its thing, if it's not the backend is probably not
 981                # running so the job also doesn't need to be interrupted
 982                call_api(
 983                    "cancel-job",
 984                    {"remote_id": self.key, "jobtype": self.type, "level": 2},
 985                    False,
 986                )
 987
 988            # this deletes the job from the database
 989            job.finish(True)
 990
 991        except JobNotFoundException:
 992            pass
 993
 994        # delete this dataset's own annotations
 995        self.db.delete("annotations", where={"dataset": self.key}, commit=commit)
 996        # delete annotations that have been generated as part of this dataset
 997        self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit)
 998        # delete annotation fields on parent dataset(s) stemming from this dataset
 999        for related_dataset in self.get_genealogy(update_cache=True):
1000            field_deleted = False
1001            annotation_fields = related_dataset.annotation_fields
1002            if annotation_fields:
1003                for field_id in list(annotation_fields.keys()):
1004                    if annotation_fields[field_id].get("from_dataset", "") == self.key:
1005                        del annotation_fields[field_id]
1006                        field_deleted = True
1007            if field_deleted:
1008                related_dataset.save_annotation_fields(annotation_fields)
1009
1010        # delete dataset from database
1011        self.db.delete("datasets", where={"key": self.key}, commit=commit)
1012        self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
1013        self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
1014
1015        # delete from drive
1016        try:
1017            if self.get_results_path().exists():
1018                self.get_results_path().unlink()
1019            if self.get_results_path().with_suffix(".log").exists():
1020                self.get_results_path().with_suffix(".log").unlink()
1021            if self.get_results_folder_path().exists():
1022                shutil.rmtree(self.get_results_folder_path())
1023
1024        except FileNotFoundError:
1025            # already deleted, apparently
1026            pass
1027        except PermissionError as e:
1028            self.db.log.error(
1029                f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}"
1030            )
1031
1032    def update_children(self, **kwargs):
1033        """
1034        Update an attribute for all child datasets
1035
1036        Can be used to e.g. change the owner, version, finished status for all
1037        datasets in a tree
1038
1039        :param kwargs:  Parameters corresponding to known dataset attributes
1040        """
1041        for child in self.get_children(update=True):
1042            for attr, value in kwargs.items():
1043                child.__setattr__(attr, value)
1044
1045            child.update_children(**kwargs)
1046
1047    def is_finished(self):
1048        """
1049        Check if dataset is finished
1050        :return bool:
1051        """
1052        return bool(self.data["is_finished"])
1053
1054    def is_rankable(self, multiple_items=True):
1055        """
1056        Determine if a dataset is rankable
1057
1058        Rankable means that it is a CSV file with 'date' and 'value' columns
1059        as well as one or more item label columns
1060
1061        :param bool multiple_items:  Consider datasets with multiple items per
1062        item (e.g. word_1, word_2, etc)?
1063
1064        :return bool:  Whether the dataset is rankable or not
1065        """
1066        if (
1067                self.get_results_path().suffix != ".csv"
1068                or not self.get_results_path().exists()
1069        ):
1070            return False
1071
1072        column_options = {"date", "value", "item"}
1073        if multiple_items:
1074            column_options.add("word_1")
1075
1076        with self.get_results_path().open(encoding="utf-8") as infile:
1077            reader = csv.DictReader(infile)
1078            try:
1079                return len(set(reader.fieldnames) & column_options) >= 3
1080            except (TypeError, ValueError):
1081                return False
1082
1083    def is_accessible_by(self, username, role="owner"):
1084        """
1085        Check if dataset has given user as owner
1086
1087        :param str|User username: Username to check for
1088        :return bool:
1089        """
1090        if type(username) is not str:
1091            if hasattr(username, "get_id"):
1092                username = username.get_id()
1093            else:
1094                raise TypeError("User must be a str or User object")
1095
1096        # 'normal' owners
1097        if username in [
1098            owner
1099            for owner, meta in self.owners.items()
1100            if (role is None or meta["role"] == role)
1101        ]:
1102            return True
1103
1104        # owners that are owner by being part of a tag
1105        if username in itertools.chain(
1106                *[
1107                    tagged_owners
1108                    for tag, tagged_owners in self.tagged_owners.items()
1109                    if (role is None or self.owners[f"tag:{tag}"]["role"] == role)
1110                ]
1111        ):
1112            return True
1113
1114        return False
1115
1116    def get_owners_users(self, role="owner"):
1117        """
1118        Get list of dataset owners
1119
1120        This returns a list of *users* that are considered owners. Tags are
1121        transparently replaced with the users with that tag.
1122
1123        :param str|None role:  Role to check for. If `None`, all owners are
1124        returned regardless of role.
1125
1126        :return set:  De-duplicated owner list
1127        """
1128        # 'normal' owners
1129        owners = [
1130            owner
1131            for owner, meta in self.owners.items()
1132            if (role is None or meta["role"] == role) and not owner.startswith("tag:")
1133        ]
1134
1135        # owners that are owner by being part of a tag
1136        owners.extend(
1137            itertools.chain(
1138                *[
1139                    tagged_owners
1140                    for tag, tagged_owners in self.tagged_owners.items()
1141                    if role is None or self.owners[f"tag:{tag}"]["role"] == role
1142                ]
1143            )
1144        )
1145
1146        # de-duplicate before returning
1147        return set(owners)
1148
1149    def get_owners(self, role="owner"):
1150        """
1151        Get list of dataset owners
1152
1153        This returns a list of all owners, and does not transparently resolve
1154        tags (like `get_owners_users` does).
1155
1156        :param str|None role:  Role to check for. If `None`, all owners are
1157        returned regardless of role.
1158
1159        :return set:  De-duplicated owner list
1160        """
1161        return [
1162            owner
1163            for owner, meta in self.owners.items()
1164            if (role is None or meta["role"] == role)
1165        ]
1166
1167    def add_owner(self, username, role="owner"):
1168        """
1169        Set dataset owner
1170
1171        If the user is already an owner, but with a different role, the role is
1172        updated. If the user is already an owner with the same role, nothing happens.
1173
1174        :param str|User username:  Username to set as owner
1175        :param str|None role:  Role to add user with.
1176        """
1177        if type(username) is not str:
1178            if hasattr(username, "get_id"):
1179                username = username.get_id()
1180            else:
1181                raise TypeError("User must be a str or User object")
1182
1183        if username not in self.owners:
1184            self.owners[username] = {"name": username, "key": self.key, "role": role}
1185            self.db.insert("datasets_owners", data=self.owners[username], safe=True)
1186
1187        elif username in self.owners and self.owners[username]["role"] != role:
1188            self.db.update(
1189                "datasets_owners",
1190                data={"role": role},
1191                where={"name": username, "key": self.key},
1192            )
1193            self.owners[username]["role"] = role
1194
1195        if username.startswith("tag:"):
1196            # this is a bit more complicated than just adding to the list of
1197            # owners, so do a full refresh
1198            self.refresh_owners()
1199
1200        # make sure children's owners remain in sync
1201        for child in self.get_children(update=True):
1202            child.add_owner(username, role)
1203            # not recursive, since we're calling it from recursive code!
1204            child.copy_ownership_from(self, recursive=False)
1205
1206    def remove_owner(self, username):
1207        """
1208        Remove dataset owner
1209
1210        If no owner is set, the dataset is assigned to the anonymous user.
1211        If the user is not an owner, nothing happens.
1212
1213        :param str|User username:  Username to set as owner
1214        """
1215        if type(username) is not str:
1216            if hasattr(username, "get_id"):
1217                username = username.get_id()
1218            else:
1219                raise TypeError("User must be a str or User object")
1220
1221        if username in self.owners:
1222            del self.owners[username]
1223            self.db.delete("datasets_owners", where={"name": username, "key": self.key})
1224
1225            if not self.owners:
1226                self.add_owner("anonymous")
1227
1228        if username in self.tagged_owners:
1229            del self.tagged_owners[username]
1230
1231        # make sure children's owners remain in sync
1232        for child in self.get_children(update=True):
1233            child.remove_owner(username)
1234            # not recursive, since we're calling it from recursive code!
1235            child.copy_ownership_from(self, recursive=False)
1236
1237    def refresh_owners(self):
1238        """
1239        Update internal owner cache
1240
1241        This makes sure that the list of *users* and *tags* which can access the
1242        dataset is up to date.
1243        """
1244        self.owners = {
1245            owner["name"]: owner
1246            for owner in self.db.fetchall(
1247                "SELECT * FROM datasets_owners WHERE key = %s", (self.key,)
1248            )
1249        }
1250
1251        # determine which users (if any) are owners of the dataset by having a
1252        # tag that is listed as an owner
1253        owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
1254        if owner_tags:
1255            tagged_owners = self.db.fetchall(
1256                "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)
1257            )
1258            self.tagged_owners = {
1259                owner_tag: [
1260                    user["name"] for user in tagged_owners if owner_tag in user["tags"]
1261                ]
1262                for owner_tag in owner_tags
1263            }
1264        else:
1265            self.tagged_owners = {}
1266
1267    def copy_ownership_from(self, dataset, recursive=True):
1268        """
1269        Copy ownership
1270
1271        This is useful to e.g. make sure a dataset's ownership stays in sync
1272        with its parent
1273
1274        :param Dataset dataset:  Parent to copy from
1275        :return:
1276        """
1277        self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
1278
1279        for role in ("owner", "viewer"):
1280            owners = dataset.get_owners(role=role)
1281            for owner in owners:
1282                self.db.insert(
1283                    "datasets_owners",
1284                    data={"key": self.key, "name": owner, "role": role},
1285                    commit=False,
1286                    safe=True,
1287                )
1288
1289        self.db.commit()
1290        if recursive:
1291            for child in self.get_children(update=True):
1292                child.copy_ownership_from(self, recursive=recursive)
1293
1294    def get_parameters(self):
1295        """
1296        Get dataset parameters
1297
1298        The dataset parameters are stored as JSON in the database - parse them
1299        and return the resulting object
1300
1301        :return:  Dataset parameters as originally stored
1302        """
1303        try:
1304            return json.loads(self.data["parameters"])
1305        except json.JSONDecodeError:
1306            return {}
1307
1308    def get_columns(self):
1309        """
1310        Returns the dataset columns.
1311
1312        Useful for processor input forms. Can deal with both CSV and NDJSON
1313        files, the latter only if a `map_item` function is available in the
1314        processor that generated it. While in other cases one could use the
1315        keys of the JSON object, this is not always possible in follow-up code
1316        that uses the 'column' names, so for consistency this function acts as
1317        if no column can be parsed if no `map_item` function exists.
1318
1319        :return list:  List of dataset columns; empty list if unable to parse
1320        """
1321        if not self.get_results_path().exists():
1322            # no file to get columns from
1323            return []
1324
1325        if (self.get_results_path().suffix.lower() == ".csv") or (
1326                self.get_results_path().suffix.lower() == ".ndjson"
1327                and self.get_own_processor() is not None
1328                and self.get_own_processor().map_item_method_available(dataset=self)
1329        ):
1330            items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100)
1331            try:
1332                keys = list(next(items).keys())
1333                if self.annotation_fields:
1334                    for annotation_field in self.annotation_fields.values():
1335                        annotation_column = annotation_field["label"]
1336                        label_count = 1
1337                        while annotation_column in keys:
1338                            label_count += 1
1339                            annotation_column = (
1340                                f"{annotation_field['label']}_{label_count}"
1341                            )
1342                        keys.append(annotation_column)
1343                columns = keys
1344            except (StopIteration, NotImplementedError):
1345                # No items or otherwise unable to iterate
1346                columns = []
1347            finally:
1348                del items
1349        else:
1350            # Filetype not CSV or an NDJSON with `map_item`
1351            columns = []
1352
1353        return columns
1354
1355    def update_label(self, label):
1356        """
1357        Update label for this dataset
1358
1359        :param str label:  	New label
1360        :return str: 		The new label, as returned by get_label
1361        """
1362        self.parameters["label"] = label
1363
1364        self.db.update(
1365            "datasets",
1366            data={"parameters": json.dumps(self.parameters)},
1367            where={"key": self.key},
1368        )
1369        return self.get_label()
1370
1371    def get_label(self, parameters=None, default="Query"):
1372        """
1373        Generate a readable label for the dataset
1374
1375        :param dict parameters:  Parameters of the dataset
1376        :param str default:  Label to use if it cannot be inferred from the
1377        parameters
1378
1379        :return str:  Label
1380        """
1381        if not parameters:
1382            parameters = self.parameters
1383
1384        if parameters.get("label"):
1385            return parameters["label"]
1386        elif parameters.get("body_query") and parameters["body_query"] != "empty":
1387            return parameters["body_query"]
1388        elif parameters.get("body_match") and parameters["body_match"] != "empty":
1389            return parameters["body_match"]
1390        elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
1391            return parameters["subject_query"]
1392        elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
1393            return parameters["subject_match"]
1394        elif parameters.get("query"):
1395            label = parameters["query"]
1396            # Some legacy datasets have lists as query data
1397            if isinstance(label, list):
1398                label = ", ".join(label)
1399
1400            label = label if len(label) < 30 else label[:25] + "..."
1401            label = label.strip().replace("\n", ", ")
1402            return label
1403        elif parameters.get("country_flag") and parameters["country_flag"] != "all":
1404            return "Flag: %s" % parameters["country_flag"]
1405        elif parameters.get("country_name") and parameters["country_name"] != "all":
1406            return "Country: %s" % parameters["country_name"]
1407        elif parameters.get("filename"):
1408            return parameters["filename"]
1409        elif parameters.get("board") and "datasource" in parameters:
1410            return parameters["datasource"] + "/" + parameters["board"]
1411        elif (
1412                "datasource" in parameters
1413                and parameters["datasource"] in self.modules.datasources
1414        ):
1415            return (
1416                    self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
1417            )
1418        else:
1419            return default
1420
1421    def change_datasource(self, datasource):
1422        """
1423        Change the datasource type for this dataset
1424
1425        :param str label:  New datasource type
1426        :return str:  The new datasource type
1427        """
1428
1429        self.parameters["datasource"] = datasource
1430
1431        self.db.update(
1432            "datasets",
1433            data={"parameters": json.dumps(self.parameters)},
1434            where={"key": self.key},
1435        )
1436        return datasource
1437
1438    def reserve_result_file(self, parameters=None, extension="csv"):
1439        """
1440        Generate a unique path to the results file for this dataset
1441
1442        This generates a file name for the data file of this dataset, and makes sure
1443        no file exists or will exist at that location other than the file we
1444        expect (i.e. the data for this particular dataset).
1445
1446        :param str extension: File extension, "csv" by default
1447        :param parameters:  Dataset parameters
1448        :return bool:  Whether the file path was successfully reserved
1449        """
1450        if self.data["is_finished"]:
1451            raise RuntimeError("Cannot reserve results file for a finished dataset")
1452
1453        # Use 'random' for random post queries
1454        if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
1455            file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"]
1456        # Use country code for country flag queries
1457        elif "country_flag" in parameters and parameters["country_flag"] != "all":
1458            file = (
1459                    "countryflag-"
1460                    + str(parameters["country_flag"])
1461                    + "-"
1462                    + self.data["key"]
1463            )
1464        # Use the query string for all other queries
1465        else:
1466            query_bit = self.data["query"].replace(" ", "-").lower()
1467            query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
1468            query_bit = query_bit[:100]  # Crop to avoid OSError
1469            file = query_bit + "-" + self.data["key"]
1470            file = re.sub(r"[-]+", "-", file)
1471
1472        self.data["result_file"] = file + "." + extension.lower()
1473        index = 1
1474        while self.get_results_path().is_file():
1475            self.data["result_file"] = file + "-" + str(index) + "." + extension.lower()
1476            index += 1
1477
1478        updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
1479                                 data={"result_file": self.data["result_file"]})
1480        return updated > 0
1481
1482    def get_key(self, query, parameters, parent="", time_offset=0):
1483        """
1484        Generate a unique key for this dataset that can be used to identify it
1485
1486        The key is a hash of a combination of the query string and parameters.
1487        You never need to call this, really: it's used internally.
1488
1489        :param str query:  Query string
1490        :param parameters:  Dataset parameters
1491        :param parent: Parent dataset's key (if applicable)
1492        :param time_offset:  Offset to add to the time component of the dataset
1493        key. This can be used to ensure a unique key even if the parameters and
1494        timing is otherwise identical to an existing dataset's
1495
1496        :return str:  Dataset key
1497        """
1498        # Return a hash based on parameters
1499        # we're going to use the hash of the parameters to uniquely identify
1500        # the dataset, so make sure it's always in the same order, or we might
1501        # end up creating multiple keys for the same dataset if python
1502        # decides to return the dict in a different order
1503        param_key = collections.OrderedDict()
1504        for key in sorted(parameters):
1505            param_key[key] = parameters[key]
1506
1507        # we additionally use the current time as a salt - this should usually
1508        # ensure a unique key for the dataset. if for some reason there is a
1509        # hash collision
1510        param_key["_salt"] = int(time.time()) + time_offset
1511
1512        parent_key = str(parent) if parent else ""
1513        plain_key = repr(param_key) + str(query) + parent_key
1514        hashed_key = hash_to_md5(plain_key)
1515
1516        if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1517            # key exists, generate a new one
1518            return self.get_key(
1519                query, parameters, parent, time_offset=random.randint(1, 10)
1520            )
1521        else:
1522            return hashed_key
1523
1524    def set_key(self, key):
1525        """
1526        Change dataset key
1527
1528        In principe, keys should never be changed. But there are rare cases
1529        where it is useful to do so, in particular when importing a dataset
1530        from another 4CAT instance; in that case it makes sense to try and
1531        ensure that the key is the same as it was before. This function sets
1532        the dataset key and updates any dataset references to it.
1533
1534        :param str key:  Key to set
1535        :return str:  Key that was set. If the desired key already exists, the
1536        original key is kept.
1537        """
1538        key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1539        if key_exists or not key:
1540            return self.key
1541
1542        old_key = self.key
1543        self.db.update("datasets", data={"key": key}, where={"key": old_key})
1544
1545        # update references
1546        self.db.update(
1547            "datasets", data={"key_parent": key}, where={"key_parent": old_key}
1548        )
1549        self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1550        self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1551        self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1552
1553        # for good measure
1554        self.db.commit()
1555        self.key = key
1556
1557        return self.key
1558
1559    def get_status(self):
1560        """
1561        Get Dataset status
1562
1563        :return string: Dataset status
1564        """
1565        return self.data["status"]
1566
1567    def update_status(self, status, is_final=False):
1568        """
1569        Update dataset status
1570
1571        The status is a string that may be displayed to a user to keep them
1572        updated and informed about the progress of a dataset. No memory is kept
1573        of earlier dataset statuses; the current status is overwritten when
1574        updated.
1575
1576        Statuses are also written to the dataset log file.
1577
1578        :param string status:  Dataset status
1579        :param bool is_final:  If this is `True`, subsequent calls to this
1580        method while the object is instantiated will not update the dataset
1581        status.
1582        :return bool:  Status update successful?
1583        """
1584        if self.no_status_updates:
1585            return
1586
1587        # for presets, copy the updated status to the preset(s) this is part of
1588        if self.preset_parent is None:
1589            self.preset_parent = [
1590                                     parent
1591                                     for parent in self.get_genealogy()
1592                                     if parent.type.find("preset-") == 0 and parent.key != self.key
1593                                 ][:1]
1594
1595        if self.preset_parent:
1596            for preset_parent in self.preset_parent:
1597                if not preset_parent.is_finished():
1598                    preset_parent.update_status(status)
1599
1600        self.data["status"] = status
1601        updated = self.db.update(
1602            "datasets", where={"key": self.data["key"]}, data={"status": status}
1603        )
1604
1605        if is_final:
1606            self.no_status_updates = True
1607
1608        self.log(status)
1609
1610        return updated > 0
1611
1612    def update_progress(self, progress):
1613        """
1614        Update dataset progress
1615
1616        The progress can be used to indicate to a user how close the dataset
1617        is to completion.
1618
1619        :param float progress:  Between 0 and 1.
1620        :return:
1621        """
1622        progress = min(1, max(0, progress))  # clamp
1623        if type(progress) is int:
1624            progress = float(progress)
1625
1626        self.data["progress"] = progress
1627        updated = self.db.update(
1628            "datasets", where={"key": self.data["key"]}, data={"progress": progress}
1629        )
1630        return updated > 0
1631
1632    def get_progress(self):
1633        """
1634        Get dataset progress
1635
1636        :return float:  Progress, between 0 and 1
1637        """
1638        return self.data["progress"]
1639
1640    def finish_with_error(self, error):
1641        """
1642        Set error as final status, and finish with 0 results
1643
1644        This is a convenience function to avoid having to repeat
1645        "update_status" and "finish" a lot.
1646
1647        :param str error:  Error message for final dataset status.
1648        :return:
1649        """
1650        self.update_status(error, is_final=True)
1651        self.finish(0)
1652
1653        return None
1654
1655    def update_version(self, version):
1656        """
1657        Update software version used for this dataset
1658
1659        This can be used to verify the code that was used to process this dataset.
1660
1661        :param string version:  Version identifier
1662        :return bool:  Update successul?
1663        """
1664        try:
1665            # this fails if the processor type is unknown
1666            # edge case, but let's not crash...
1667            processor_path = self.modules.processors.get(self.data["type"]).filepath
1668        except AttributeError:
1669            processor_path = ""
1670
1671        updated = self.db.update(
1672            "datasets",
1673            where={"key": self.data["key"]},
1674            data={
1675                "software_version": version[0],
1676                "software_source": version[1],
1677                "software_file": processor_path,
1678            },
1679        )
1680
1681        return updated > 0
1682
1683    def delete_parameter(self, parameter, instant=True):
1684        """
1685        Delete a parameter from the dataset metadata
1686
1687        :param string parameter:  Parameter to delete
1688        :param bool instant:  Also delete parameters in this instance object?
1689        :return bool:  Update successul?
1690        """
1691        parameters = self.parameters.copy()
1692        if parameter in parameters:
1693            del parameters[parameter]
1694        else:
1695            return False
1696
1697        updated = self.db.update(
1698            "datasets",
1699            where={"key": self.data["key"]},
1700            data={"parameters": json.dumps(parameters)},
1701        )
1702
1703        if instant:
1704            self.parameters = parameters
1705
1706        return updated > 0
1707
1708    def get_version_url(self, file):
1709        """
1710        Get a versioned github URL for the version this dataset was processed with
1711
1712        :param file:  File to link within the repository
1713        :return:  URL, or an empty string
1714        """
1715        if not self.data["software_source"]:
1716            return ""
1717
1718        filepath = self.data.get("software_file", "")
1719        if filepath.startswith("/config/extensions/"):
1720            # go to root of extension
1721            filepath = "/" + "/".join(filepath.split("/")[3:])
1722
1723        return (
1724                self.data["software_source"]
1725                + "/blob/"
1726                + self.data["software_version"]
1727                + filepath
1728        )
1729
1730    def top_parent(self):
1731        """
1732        Get root dataset
1733
1734        Traverses the tree of datasets this one is part of until it finds one
1735        with no source_dataset dataset, then returns that dataset.
1736
1737        :return Dataset: Parent dataset
1738        """
1739        genealogy = self.get_genealogy()
1740        return genealogy[0]
1741
1742    def get_genealogy(self, update_cache=False):
1743        """
1744        Get genealogy of this dataset
1745
1746        Creates a list of DataSet objects, with the first one being the
1747        'top' dataset, and each subsequent one being a child of the previous
1748        one, ending with the current dataset.
1749
1750        :param bool update_cache:  Update the cached genealogy if True, else return cached value
1751        :return list:  Dataset genealogy, oldest dataset first
1752        """
1753        if not self._genealogy or update_cache:
1754            key_parent = self.key_parent
1755            genealogy = []
1756
1757            while key_parent:
1758                try:
1759                    parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1760                except DataSetException:
1761                    break
1762
1763                genealogy.append(parent)
1764                if parent.key_parent:
1765                    key_parent = parent.key_parent
1766                else:
1767                    break
1768
1769            genealogy.reverse()
1770
1771            # add self to the end
1772            genealogy.append(self)
1773            # cache the result
1774            self._genealogy = genealogy
1775
1776        # return a copy to prevent external modification
1777        return list(self._genealogy)
1778
1779    def get_children(self, update=False):
1780        """
1781        Get children of this dataset
1782
1783        :param bool update:  Update the list of children from database if True, else return cached value
1784        :return list:  List of child datasets
1785        """
1786        if self._children is not None and not update:
1787            return self._children
1788
1789        analyses = self.db.fetchall(
1790            "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC",
1791            (self.key,),
1792        )
1793        self._children = [
1794            DataSet(data=analysis, db=self.db, modules=self.modules)
1795            for analysis in analyses
1796        ]
1797        return self._children
1798
1799    def get_all_children(self, recursive=True, update=True):
1800        """
1801        Get all children of this dataset
1802
1803        Results are returned as a non-hierarchical list, i.e. the result does
1804        not reflect the actual dataset hierarchy (but all datasets in the
1805        result will have the original dataset as an ancestor somewhere)
1806
1807        :return list:  List of DataSets
1808        """
1809        children = self.get_children(update=update)
1810        results = children.copy()
1811        if recursive:
1812            for child in children:
1813                results += child.get_all_children(recursive=recursive, update=update)
1814
1815        return results
1816
1817    def nearest(self, type_filter):
1818        """
1819        Return nearest dataset that matches the given type
1820
1821        Starting with this dataset, traverse the hierarchy upwards and return
1822        whichever dataset matches the given type.
1823
1824        :param str type_filter:  Type filter. Can contain wildcards and is matched
1825        using `fnmatch.fnmatch`.
1826        :return:  Earliest matching dataset, or `None` if none match.
1827        """
1828        genealogy = self.get_genealogy()
1829        for dataset in reversed(genealogy):
1830            if fnmatch.fnmatch(dataset.type, type_filter):
1831                return dataset
1832
1833        return None
1834
1835    def get_breadcrumbs(self):
1836        """
1837        Get breadcrumbs navlink for use in permalinks
1838
1839        Returns a string representing this dataset's genealogy that may be used
1840        to uniquely identify it.
1841
1842        :return str: Nav link
1843        """
1844        if not self.key_parent:
1845            return self.key
1846
1847        genealogy = self.get_genealogy()
1848        return ",".join([d.key for d in genealogy])
1849
1850    def get_compatible_processors(self, config=None):
1851        """
1852        Get list of processors compatible with this dataset
1853
1854        Checks whether this dataset type is one that is listed as being accepted
1855        by the processor, for each known type: if the processor does not
1856        specify accepted types (via the `is_compatible_with` method), it is
1857        assumed it accepts any top-level datasets
1858
1859        :param ConfigManager|None config:  Configuration reader to determine
1860        compatibility through. This may not be the same reader the dataset was
1861        instantiated with, e.g. when checking whether some other user should
1862        be able to run processors on this dataset.
1863        :return dict:  Compatible processors, `name => class` mapping
1864        """
1865        processors = self.modules.processors
1866
1867        available = {}
1868        for processor_type, processor in processors.items():
1869            if processor.is_from_collector():
1870                continue
1871
1872            own_processor = self.get_own_processor()
1873            if own_processor and own_processor.exclude_followup_processors(
1874                    processor_type
1875            ):
1876                continue
1877
1878            # consider a processor compatible if its is_compatible_with
1879            # method returns True *or* if it has no explicit compatibility
1880            # check and this dataset is top-level (i.e. has no parent)
1881            if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1882                    or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)):
1883                available[processor_type] = processor
1884
1885        return available
1886
1887    def get_place_in_queue(self, update=False):
1888        """
1889        Determine dataset's position in queue
1890
1891        If the dataset is already finished, the position is -1. Else, the
1892        position is the number of datasets to be completed before this one will
1893        be processed. A position of 0 would mean that the dataset is currently
1894        being executed, or that the backend is not running.
1895
1896        :param bool update:  Update the queue position from database if True, else return cached value
1897        :return int:  Queue position
1898        """
1899        if self.is_finished() or not self.data.get("job"):
1900            self._queue_position = -1
1901            return self._queue_position
1902        elif not update and self._queue_position is not None:
1903            # Use cached value
1904            return self._queue_position
1905        else:
1906            # Collect queue position from database via the job
1907            try:
1908                job = Job.get_by_ID(self.data["job"], self.db)
1909                self._queue_position = job.get_place_in_queue()
1910            except JobNotFoundException:
1911                self._queue_position = -1
1912
1913            return self._queue_position
1914
1915    def get_own_processor(self):
1916        """
1917        Get the processor class that produced this dataset
1918
1919        :return:  Processor class, or `None` if not available.
1920        """
1921        processor_type = self.parameters.get("type", self.data.get("type"))
1922
1923        return self.modules.processors.get(processor_type)
1924
1925    def get_available_processors(self, config=None, exclude_hidden=False):
1926        """
1927        Get list of processors that may be run for this dataset
1928
1929        Returns all compatible processors except for those that are already
1930        queued or finished and have no options. Processors that have been
1931        run but have options are included so they may be run again with a
1932        different configuration
1933
1934        :param ConfigManager|None config:  Configuration reader to determine
1935        compatibility through. This may not be the same reader the dataset was
1936        instantiated with, e.g. when checking whether some other user should
1937        be able to run processors on this dataset.
1938        :param bool exclude_hidden:  Exclude processors that should be displayed
1939        in the UI? If `False`, all processors are returned.
1940
1941        :return dict:  Available processors, `name => properties` mapping
1942        """
1943        if self.available_processors:
1944            # Update to reflect exclude_hidden parameter which may be different from last call
1945            # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1946            return {
1947                processor_type: processor
1948                for processor_type, processor in self.available_processors.items()
1949                if not exclude_hidden or not processor.is_hidden
1950            }
1951
1952        processors = self.get_compatible_processors(config=config)
1953
1954        for analysis in self.get_children(update=True):
1955            if analysis.type not in processors:
1956                continue
1957
1958            if not processors[analysis.type].get_options(config=config):
1959                # No variable options; this processor has been run so remove
1960                del processors[analysis.type]
1961                continue
1962
1963            if exclude_hidden and processors[analysis.type].is_hidden:
1964                del processors[analysis.type]
1965
1966        self.available_processors = processors
1967        return processors
1968
1969    def link_job(self, job):
1970        """
1971        Link this dataset to a job ID
1972
1973        Updates the dataset data to include a reference to the job that will be
1974        executing (or has already executed) this job.
1975
1976        Note that if no job can be found for this dataset, this method silently
1977        fails.
1978
1979        :param Job job:  The job that will run this dataset
1980
1981        :todo: If the job column ever gets used, make sure it always contains
1982               a valid value, rather than silently failing this method.
1983        """
1984        if type(job) is not Job:
1985            raise TypeError("link_job requires a Job object as its argument")
1986
1987        if "id" not in job.data:
1988            try:
1989                job = Job.get_by_remote_ID(self.key, self.db, jobtype=self.data["type"])
1990            except JobNotFoundException:
1991                return
1992
1993        self.db.update(
1994            "datasets", where={"key": self.key}, data={"job": job.data["id"]}
1995        )
1996
1997    def link_parent(self, key_parent):
1998        """
1999        Set source_dataset key for this dataset
2000
2001        :param key_parent:  Parent key. Not checked for validity
2002        """
2003        self.db.update(
2004            "datasets", where={"key": self.key}, data={"key_parent": key_parent}
2005        )
2006        # reset caches
2007        self.data["key_parent"] = key_parent
2008        self._genealogy = None  
2009
2010    def get_parent(self):
2011        """
2012        Get parent dataset
2013
2014        :return DataSet:  Parent dataset, or `None` if not applicable
2015        """
2016        return (
2017            DataSet(key=self.key_parent, db=self.db, modules=self.modules)
2018            if self.key_parent
2019            else None
2020        )
2021
2022    def detach(self):
2023        """
2024        Makes the datasets standalone, i.e. not having any source_dataset dataset
2025        """
2026        self.link_parent("")
2027
2028    def is_dataset(self):
2029        """
2030        Easy way to confirm this is a dataset.
2031        Used for checking processor and dataset compatibility,
2032        which needs to handle both processors and datasets.
2033        """
2034        return True
2035
2036    def is_top_dataset(self):
2037        """
2038        Easy way to confirm this is a top dataset.
2039        Used for checking processor and dataset compatibility,
2040        which needs to handle both processors and datasets.
2041        """
2042        if self.key_parent:
2043            return False
2044        return True
2045
2046    def is_expiring(self, config):
2047        """
2048        Determine if dataset is set to expire
2049
2050        Similar to `is_expired`, but checks if the dataset will be deleted in
2051        the future, not if it should be deleted right now.
2052
2053        :param ConfigManager config:  Configuration reader (context-aware)
2054        :return bool|int:  `False`, or the expiration date as a Unix timestamp.
2055        """
2056        # has someone opted out of deleting this?
2057        if self.parameters.get("keep"):
2058            return False
2059
2060        # is this dataset explicitly marked as expiring after a certain time?
2061        if self.parameters.get("expires-after"):
2062            return self.parameters.get("expires-after")
2063
2064        # is the data source configured to have its datasets expire?
2065        expiration = config.get("datasources.expiration", {})
2066        if not expiration.get(self.parameters.get("datasource")):
2067            return False
2068
2069        # is there a timeout for this data source?
2070        if expiration.get(self.parameters.get("datasource")).get("timeout"):
2071            return self.timestamp + expiration.get(
2072                self.parameters.get("datasource")
2073            ).get("timeout")
2074
2075        return False
2076
2077    def is_expired(self, config):
2078        """
2079        Determine if dataset should be deleted
2080
2081        Datasets can be set to expire, but when they should be deleted depends
2082        on a number of factor. This checks them all.
2083
2084        :param ConfigManager config:  Configuration reader (context-aware)
2085        :return bool:
2086        """
2087        # has someone opted out of deleting this?
2088        if not self.is_expiring(config):
2089            return False
2090
2091        # is this dataset explicitly marked as expiring after a certain time?
2092        future = (
2093                time.time() + 3600
2094        )  # ensure we don't delete datasets with invalid expiration times
2095        if (
2096                self.parameters.get("expires-after")
2097                and convert_to_int(self.parameters["expires-after"], future) < time.time()
2098        ):
2099            return True
2100
2101        # is the data source configured to have its datasets expire?
2102        expiration = config.get("datasources.expiration", {})
2103        if not expiration.get(self.parameters.get("datasource")):
2104            return False
2105
2106        # is the dataset older than the set timeout?
2107        if expiration.get(self.parameters.get("datasource")).get("timeout"):
2108            return (
2109                    self.timestamp
2110                    + expiration[self.parameters.get("datasource")]["timeout"]
2111                    < time.time()
2112            )
2113
2114        return False
2115
2116    def is_from_collector(self):
2117        """
2118        Check if this dataset was made by a processor that collects data, i.e.
2119        a search or import worker.
2120
2121        :return bool:
2122        """
2123        return self.type.endswith("-search") or self.type.endswith("-import")
2124
2125    def get_extension(self):
2126        """
2127        Gets the file extension this dataset produces.
2128        Also checks whether the results file exists.
2129        Used for checking processor and dataset compatibility.
2130
2131        :return str extension:  Extension, e.g. `csv`
2132        """
2133        if self.get_results_path().exists():
2134            return self.get_results_path().suffix[1:]
2135
2136        return False
2137    
2138    def is_filter(self):
2139        """
2140        Check whether a dataset is a filter dataset.
2141
2142        :return bool:  True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown).
2143        """
2144        own_processor = self.get_own_processor()
2145        if own_processor is None:
2146            # Deprecated datasets do not have a processor
2147            return None
2148        return own_processor.is_filter()
2149
2150    def get_media_type(self):
2151        """
2152        Gets the media type of the dataset file.
2153
2154        :return str: media type, e.g., "text"
2155        """
2156        own_processor = self.get_own_processor()
2157        if hasattr(self, "media_type"):
2158            # media type can be defined explicitly in the dataset; this is the priority
2159            return self.media_type
2160        elif own_processor is not None:
2161            # or media type can be defined in the processor
2162            # some processors can set different media types for different datasets (e.g., import_media)
2163            if hasattr(own_processor, "media_type"):
2164                return own_processor.media_type
2165
2166        # Default to text
2167        return self.parameters.get("media_type", "text")
2168
2169    def get_metadata(self):
2170        """
2171        Get dataset metadata
2172
2173        This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
2174        as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
2175        update its database (and ensure compatibility with the exporting version of 4CAT).
2176        """
2177        metadata = self.db.fetchone(
2178            "SELECT * FROM datasets WHERE key = %s", (self.key,)
2179        )
2180
2181        # get 4CAT version (presumably to ensure export is compatible with import)
2182        metadata["current_4CAT_version"] = get_software_version()
2183        return metadata
2184
2185    def get_result_url(self):
2186        """
2187        Gets the 4CAT frontend URL of a dataset file.
2188
2189        Uses the FlaskConfig attributes (i.e., SERVER_NAME and
2190        SERVER_HTTPS) plus hardcoded '/result/'.
2191        TODO: create more dynamic method of obtaining url.
2192        """
2193        filename = self.get_results_path().name
2194
2195        # we cheat a little here by using the modules' config reader, but these
2196        # will never be context-dependent values anyway
2197        url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \
2198                      self.modules.config.get("flask.server_name") + '/result/' + filename
2199        return url_to_file
2200
2201    def warn_unmappable_item(
2202            self, item_count, processor=None, error_message=None, warn_admins=True
2203    ):
2204        """
2205        Log an item that is unable to be mapped and warn administrators.
2206
2207        :param int item_count:			Item index
2208        :param Processor processor:		Processor calling function8
2209        """
2210        dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
2211
2212        # Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
2213        closest_dataset = (
2214            processor.dataset
2215            if processor is not None and processor.dataset is not None
2216            else self
2217        )
2218        # Log error to dataset log
2219        closest_dataset.log(dataset_error_message)
2220
2221        if warn_admins:
2222            if processor is not None:
2223                processor.log.warning(
2224                    f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}."
2225                )
2226            elif hasattr(self.db, "log"):
2227                # borrow the database's log handler
2228                self.db.log.warning(
2229                    f"Unable to map item all items for dataset {closest_dataset.key}."
2230                )
2231            else:
2232                # No other log available
2233                raise DataSetException(
2234                    f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn"
2235                )
2236
2237    # Annotation functions (most of it is handled in Annotations)
2238    def has_annotations(self) -> bool:
2239        """
2240        Whether this dataset has annotations
2241        """
2242
2243        annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,))
2244
2245        return True if annotation else False
2246
2247    def num_annotations(self) -> int:
2248        """
2249        Get the amount of annotations
2250        """
2251        return self.db.fetchone(
2252            "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,)
2253        )["count"]
2254
2255    def get_annotation(self, data: dict):
2256        """
2257        Retrieves a specific annotation if it exists.
2258
2259        :param data:		A dictionary with which to get the annotations from.
2260                                                To get specific annotations, include either an `id` field or
2261                                                `field_id` and `item_id` fields.
2262
2263        return Annotation:	Annotation object.
2264        """
2265
2266        if "id" not in data or ("field_id" not in data and "item_id" not in data):
2267            return None
2268
2269        if "dataset" not in data:
2270            data["dataset"] = self.key
2271
2272        return Annotation(data=data, db=self.db)
2273
2274    def get_annotations(self) -> list:
2275        """
2276        Retrieves all annotations for this dataset.
2277
2278        return list: 	List of Annotation objects.
2279        """
2280
2281        return Annotation.get_annotations_for_dataset(self.db, self.key)
2282
2283    def get_annotations_for_item(self, item_id: str | list, before=0) -> list:
2284        """
2285        Retrieves all annotations from this dataset for a specific item (e.g. social media post).
2286        :param str item_id:  The ID of the annotation item
2287        :param int before:   The upper timestamp range for annotations.
2288        """
2289        return Annotation.get_annotations_for_dataset(
2290            self.db, self.key, item_id=item_id, before=before
2291        )
2292
2293    def has_annotation_fields(self) -> bool:
2294        """
2295        Returns True if there's annotation fields saved tot the dataset table
2296        Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.).
2297        """
2298
2299        return True if self.annotation_fields else False
2300
2301    def get_annotation_field_labels(self) -> list:
2302        """
2303        Retrieves the saved annotation field labels for this dataset.
2304        These are stored in the annotations table.
2305
2306        :return list: List of annotation field labels.
2307        """
2308
2309        annotation_fields = self.annotation_fields
2310
2311        if not annotation_fields:
2312            return []
2313
2314        labels = [v["label"] for v in annotation_fields.values()]
2315
2316        return labels
2317
2318    def save_annotations(self, annotations: list) -> int:
2319        """
2320        Takes a list of annotations and saves them to the annotations table.
2321        If a field is not yet present in the `annotation_fields` column in
2322        the datasets table, it also adds it there.
2323
2324        :param list annotations:		List of dictionaries with annotation items. Must have `item_id`, `field_id`,
2325                                                                        and `label`.
2326                                                                        `item_id` is for the specific item being annotated (e.g. a social media post)
2327                                                                        `field_id` refers to the annotation field.
2328                                                                        `label` is a human-readable description of this annotation.
2329                                                                        E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd",
2330                                                                         "value": "Yes"}]
2331
2332        :returns int:					How many annotations were saved.
2333
2334        """
2335
2336        if not annotations:
2337            return 0
2338
2339        count = 0
2340        annotation_fields = self.annotation_fields
2341
2342        # Add some dataset data to annotations, if not present
2343        for annotation_data in annotations:
2344            # Check if the required fields are present
2345            if not annotation_data.get("item_id"):
2346                raise AnnotationException(
2347                    "Can't save annotations; annotation must have an `item_id` referencing "
2348                    "the item it annotated, got %s" % annotation_data
2349                )
2350            if not annotation_data.get("field_id"):
2351                raise AnnotationException(
2352                    "Can't save annotations; annotation must have a `field_id` field, "
2353                    "got %s" % annotation_data
2354                )
2355            if not annotation_data.get("label") or not isinstance(
2356                    annotation_data["label"], str
2357            ):
2358                raise AnnotationException(
2359                    "Can't save annotations; annotation must have a `label` field, "
2360                    "got %s" % annotation_data
2361                )
2362
2363            # Set dataset key
2364            if not annotation_data.get("dataset"):
2365                annotation_data["dataset"] = self.key
2366
2367            # Set default author to this dataset owner
2368            # If this annotation is made by a processor, it will have the processor name
2369            if not annotation_data.get("author"):
2370                annotation_data["author"] = self.get_owners()[0]
2371
2372            # Create Annotation object, which also saves it to the database
2373            # If this dataset/item_id/field_id combination already exists, this retrieves the
2374            # existing data and updates it with new values.
2375            Annotation(data=annotation_data, db=self.db)
2376            count += 1
2377
2378        # Save annotation fields if things changed
2379        if annotation_fields != self.annotation_fields:
2380            self.save_annotation_fields(annotation_fields)
2381
2382        return count
2383
2384    def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2385        """
2386        Save annotation field data to the datasets table (in the `annotation_fields` column).
2387        If changes to the annotation fields affect existing annotations,
2388        this function will also call `update_annotations_via_fields()` to change them.
2389
2390        :param dict new_fields:  		New annotation fields, with a field ID as key.
2391
2392        :param bool add:				Whether we're merely adding new fields
2393                                                                        or replacing the whole batch. If add is False,
2394                                                                        `new_fields` should contain all fields.
2395
2396        :return int:					The number of annotation fields saved.
2397
2398        """
2399
2400        # Get existing annotation fields to see if stuff changed.
2401        old_fields = self.annotation_fields
2402        changes = False
2403
2404        # Annotation field must be valid JSON.
2405        try:
2406            json.dumps(new_fields)
2407        except ValueError:
2408            raise AnnotationException(
2409                "Can't save annotation fields: not valid JSON (%s)" % new_fields
2410            )
2411
2412        # No duplicate IDs
2413        if len(new_fields) != len(set(new_fields)):
2414            raise AnnotationException(
2415                "Can't save annotation fields: field IDs must be unique"
2416            )
2417
2418        # Annotation fields must at minimum have `type` and `label` keys.
2419        for field_id, annotation_field in new_fields.items():
2420            if not isinstance(field_id, str):
2421                raise AnnotationException(
2422                    "Can't save annotation fields: field ID %s is not a valid string"
2423                    % field_id
2424                )
2425            if "label" not in annotation_field:
2426                raise AnnotationException(
2427                    "Can't save annotation fields: all fields must have a label"
2428                    % field_id
2429                )
2430            if "type" not in annotation_field:
2431                raise AnnotationException(
2432                    "Can't save annotation fields: all fields must have a type"
2433                    % field_id
2434                )
2435
2436        # Check if fields are removed
2437        if not add and old_fields:
2438            for field_id in old_fields.keys():
2439                if field_id not in new_fields:
2440                    changes = True
2441
2442        # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin
2443        # dataset
2444        for field_id in new_fields.keys():
2445            if field_id in old_fields and old_fields[field_id].get("from_dataset"):
2446                old_fields[field_id]["label"] = new_fields[field_id][
2447                    "label"
2448                ]  # Only labels could've been changed
2449                new_fields[field_id] = old_fields[field_id]
2450
2451        # If we're just adding fields, add them to the old fields.
2452        # If the field already exists, overwrite the old field.
2453        if add and old_fields:
2454            all_fields = old_fields
2455            for field_id, annotation_field in new_fields.items():
2456                all_fields[field_id] = annotation_field
2457            new_fields = all_fields
2458
2459        # We're saving the new annotation fields as-is.
2460        # Ordering of fields is preserved this way.
2461        self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)})
2462        self.annotation_fields = new_fields
2463
2464        # If anything changed with the annotation fields, possibly update
2465        # existing annotations (e.g. to delete them or change their labels).
2466        if changes:
2467            Annotation.update_annotations_via_fields(
2468                self.key, old_fields, new_fields, self.db
2469            )
2470
2471        return len(new_fields)
2472
2473    def get_annotation_metadata(self) -> dict:
2474        """
2475        Retrieves all the data for this dataset from the annotations table.
2476        """
2477
2478        annotation_data = self.db.fetchall(
2479            "SELECT * FROM annotations WHERE dataset = '%s';" % self.key
2480        )
2481        return annotation_data
2482
2483    def __getattr__(self, attr):
2484        """
2485        Getter so we don't have to use .data all the time
2486
2487        :param attr:  Data key to get
2488        :return:  Value
2489        """
2490
2491        if attr in dir(self):
2492            # an explicitly defined attribute should always be called in favour
2493            # of this passthrough
2494            attribute = getattr(self, attr)
2495            return attribute
2496        elif attr in self.data:
2497            return self.data[attr]
2498        else:
2499            raise AttributeError("DataSet instance has no attribute %s" % attr)
2500
2501    def __setattr__(self, attr, value):
2502        """
2503        Setter so we can flexibly update the database
2504
2505        Also updates internal data stores (.data etc). If the attribute is
2506        unknown, it is stored within the 'parameters' attribute.
2507
2508        :param str attr:  Attribute to update
2509        :param value:  New value
2510        """
2511
2512        # don't override behaviour for *actual* class attributes
2513        if attr in dir(self):
2514            super().__setattr__(attr, value)
2515            return
2516
2517        if attr not in self.data:
2518            self.parameters[attr] = value
2519            attr = "parameters"
2520            value = self.parameters
2521
2522        if attr == "parameters":
2523            value = json.dumps(value)
2524
2525        self.db.update("datasets", where={"key": self.key}, data={attr: value})
2526
2527        self.data[attr] = value
2528
2529        if attr == "parameters":
2530            self.parameters = json.loads(value)

Provide interface to safely register and run operations on a dataset

A dataset is a collection of:

  • A unique identifier
  • A set of parameters that demarcate the data contained within
  • The data

The data is usually stored in a file on the disk; the parameters are stored in a database. The handling of the data, et cetera, is done by other workers; this class defines method to create and manipulate the dataset's properties.

DataSet( parameters=None, key=None, job=None, data=None, db=None, parent='', extension=None, type=None, is_private=True, owner='anonymous', modules=None)
 63    def __init__(
 64            self,
 65            parameters=None,
 66            key=None,
 67            job=None,
 68            data=None,
 69            db=None,
 70            parent="",
 71            extension=None,
 72            type=None,
 73            is_private=True,
 74            owner="anonymous",
 75            modules=None
 76    ):
 77        """
 78        Create new dataset object
 79
 80        If the dataset is not in the database yet, it is added.
 81
 82        :param dict parameters:  Only when creating a new dataset. Dataset
 83        parameters, free-form dictionary.
 84        :param str key: Dataset key. If given, dataset with this key is loaded.
 85        :param int job: Job ID. If given, dataset corresponding to job is
 86        loaded.
 87        :param dict data: Dataset data, corresponding to a row in the datasets
 88        database table. If not given, retrieved from database depending on key.
 89        :param db:  Database connection
 90        :param str parent:  Only when creating a new dataset. Parent dataset
 91        key to which the one being created is a child.
 92        :param str extension: Only when creating a new dataset. Extension of
 93        dataset result file.
 94        :param str type: Only when creating a new dataset. Type of the dataset,
 95        corresponding to the type property of a processor class.
 96        :param bool is_private: Only when creating a new dataset. Whether the
 97        dataset is private or public.
 98        :param str owner: Only when creating a new dataset. The user name of
 99        the dataset's creator.
100        :param modules: Module cache. If not given, will be loaded when needed
101        (expensive). Used to figure out what processors are compatible with
102        this dataset.
103        """
104        self.db = db
105
106        # Ensure mutable attributes are set in __init__ as they are unique to each DataSet
107        self.data = {}
108        self.parameters = {}
109        self.available_processors = {}
110        self._genealogy = None
111        self.disposable_files = []
112        self.modules = modules
113
114        if key is not None:
115            self.key = key
116            current = self.db.fetchone(
117                "SELECT * FROM datasets WHERE key = %s", (self.key,)
118            )
119            if not current:
120                raise DataSetNotFoundException(
121                    "DataSet() requires a valid dataset key for its 'key' argument, \"%s\" given"
122                    % key
123                )
124
125        elif job is not None:
126            current = self.db.fetchone("SELECT * FROM datasets WHERE (parameters::json->>'job')::text = %s", (str(job),))
127            if not current:
128                raise DataSetNotFoundException("DataSet() requires a valid job ID for its 'job' argument")
129
130            self.key = current["key"]
131        elif data is not None:
132            current = data
133            if (
134                    "query" not in data
135                    or "key" not in data
136                    or "parameters" not in data
137                    or "key_parent" not in data
138            ):
139                raise DataSetException(
140                    "DataSet() requires a complete dataset record for its 'data' argument"
141                )
142
143            self.key = current["key"]
144        else:
145            if parameters is None:
146                raise DataSetException(
147                    "DataSet() requires either 'key', or 'parameters' to be given"
148                )
149
150            if not type:
151                raise DataSetException("Datasets must have their type set explicitly")
152
153            query = self.get_label(parameters, default=type)
154            self.key = self.get_key(query, parameters, parent)
155            current = self.db.fetchone(
156                "SELECT * FROM datasets WHERE key = %s AND query = %s",
157                (self.key, query),
158            )
159
160        if current:
161            self.data = current
162            self.parameters = json.loads(self.data["parameters"])
163            self.annotation_fields = json.loads(self.data["annotation_fields"]) \
164                if self.data.get("annotation_fields") else {}
165            self.is_new = False
166        else:
167            self.data = {"type": type}  # get_own_processor needs this
168            own_processor = self.get_own_processor()
169            version = get_software_commit(own_processor)
170            self.data = {
171                "key": self.key,
172                "query": self.get_label(parameters, default=type),
173                "parameters": json.dumps(parameters),
174                "result_file": "",
175                "creator": owner,
176                "status": "",
177                "type": type,
178                "timestamp": int(time.time()),
179                "is_finished": False,
180                "is_private": is_private,
181                "software_version": version[0],
182                "software_source": version[1],
183                "software_file": "",
184                "num_rows": 0,
185                "progress": 0.0,
186                "key_parent": parent,
187                "annotation_fields": "{}"
188            }
189            self.parameters = parameters
190            self.annotation_fields = {}
191
192            self.db.insert("datasets", data=self.data)
193            self.refresh_owners()
194            self.add_owner(owner)
195
196            # Find desired extension from processor if not explicitly set
197            if extension is None:
198                if own_processor:
199                    extension = own_processor.get_extension(
200                        parent_dataset=DataSet(key=parent, db=db, modules=self.modules)
201                        if parent
202                        else None
203                    )
204                # Still no extension, default to 'csv'
205                if not extension:
206                    extension = "csv"
207
208            # Reserve filename and update data['result_file']
209            self.reserve_result_file(parameters, extension)
210
211        self.refresh_owners()

Create new dataset object

If the dataset is not in the database yet, it is added.

Parameters
  • dict parameters: Only when creating a new dataset. Dataset parameters, free-form dictionary.
  • str key: Dataset key. If given, dataset with this key is loaded.
  • int job: Job ID. If given, dataset corresponding to job is loaded.
  • dict data: Dataset data, corresponding to a row in the datasets database table. If not given, retrieved from database depending on key.
  • db: Database connection
  • str parent: Only when creating a new dataset. Parent dataset key to which the one being created is a child.
  • str extension: Only when creating a new dataset. Extension of dataset result file.
  • str type: Only when creating a new dataset. Type of the dataset, corresponding to the type property of a processor class.
  • bool is_private: Only when creating a new dataset. Whether the dataset is private or public.
  • str owner: Only when creating a new dataset. The user name of the dataset's creator.
  • modules: Module cache. If not given, will be loaded when needed (expensive). Used to figure out what processors are compatible with this dataset.
data = None
key = ''
available_processors = None
preset_parent = None
parameters = None
modules = None
annotation_fields = None
owners = None
tagged_owners = None
db = None
folder = None
is_new = True
no_status_updates = False
disposable_files = None
def check_dataset_finished(self):
213    def check_dataset_finished(self):
214        """
215        Checks if dataset is finished. Returns path to results file is not empty,
216        or 'empty_file' when there were not matches.
217
218        Only returns a path if the dataset is complete. In other words, if this
219        method returns a path, a file with the complete results for this dataset
220        will exist at that location.
221
222        :return: A path to the results file, 'empty_file', or `None`
223        """
224        if self.data["is_finished"] and self.data["num_rows"] > 0:
225            return self.get_results_path()
226        elif self.data["is_finished"] and self.data["num_rows"] == 0:
227            return 'empty'
228        else:
229            return None

Checks if dataset is finished. Returns path to results file is not empty, or 'empty_file' when there were not matches.

Only returns a path if the dataset is complete. In other words, if this method returns a path, a file with the complete results for this dataset will exist at that location.

Returns

A path to the results file, 'empty_file', or None

def get_results_path(self):
231    def get_results_path(self):
232        """
233        Get path to results file
234
235        Always returns a path, that will at some point contain the dataset
236        data, but may not do so yet. Use this to get the location to write
237        generated results to.
238
239        :return Path:  A path to the results file
240        """
241        # alas we need to instantiate a config reader here - no way around it
242        if not self.folder:
243            self.folder = self.modules.config.get('PATH_DATA')
244        return self.folder.joinpath(self.data["result_file"])

Get path to results file

Always returns a path, that will at some point contain the dataset data, but may not do so yet. Use this to get the location to write generated results to.

Returns

A path to the results file

def get_results_folder_path(self):
246    def get_results_folder_path(self):
247        """
248        Get path to folder containing accompanying results
249
250        Returns a path that may not yet be created
251
252        :return Path:  A path to the results file
253        """
254        return self.get_results_path().parent.joinpath("folder_" + self.key)

Get path to folder containing accompanying results

Returns a path that may not yet be created

Returns

A path to the results file

def get_log_path(self):
256    def get_log_path(self):
257        """
258        Get path to dataset log file
259
260        Each dataset has a single log file that documents its creation. This
261        method returns the path to that file. It is identical to the path of
262        the dataset result file, with 'log' as its extension instead.
263
264        :return Path:  A path to the log file
265        """
266        return self.get_results_path().with_suffix(".log")

Get path to dataset log file

Each dataset has a single log file that documents its creation. This method returns the path to that file. It is identical to the path of the dataset result file, with 'log' as its extension instead.

Returns

A path to the log file

def clear_log(self):
268    def clear_log(self):
269        """
270        Clears the dataset log file
271
272        If the log file does not exist, it is created empty. The log file will
273        have the same file name as the dataset result file, with the 'log'
274        extension.
275        """
276        log_path = self.get_log_path()
277        with log_path.open("w"):
278            pass

Clears the dataset log file

If the log file does not exist, it is created empty. The log file will have the same file name as the dataset result file, with the 'log' extension.

def log(self, log):
280    def log(self, log):
281        """
282        Write log message to file
283
284        Writes the log message to the log file on a new line, including a
285        timestamp at the start of the line. Note that this assumes the log file
286        already exists - it should have been created/cleared with clear_log()
287        prior to calling this.
288
289        :param str log:  Log message to write
290        """
291        log_path = self.get_log_path()
292        with log_path.open("a", encoding="utf-8") as outfile:
293            outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))

Write log message to file

Writes the log message to the log file on a new line, including a timestamp at the start of the line. Note that this assumes the log file already exists - it should have been created/cleared with clear_log() prior to calling this.

Parameters
  • str log: Log message to write
def iterate_items( self, processor=None, warn_unmappable=True, map_missing='default', get_annotations=True, max_unmappable=None, offset=0, *args, **kwargs):
449    def iterate_items(
450            self, processor=None, warn_unmappable=True, map_missing="default", get_annotations=True, max_unmappable=None,
451            offset=0, *args, **kwargs
452    ):
453        """
454        Generate mapped dataset items
455
456        Wrapper for _iterate_items that returns a DatasetItem, which can be
457        accessed as a dict returning the original item or (if a mapper is
458        available) the mapped item. Mapped or original versions of the item can
459        also be accessed via the `original` and `mapped_object` properties of
460        the DatasetItem.
461
462        Processors can define a method called `map_item` that can be used to map
463        an item from the dataset file before it is processed any further. This is
464        slower than storing the data file in the right format to begin with but
465        not all data sources allow for easy 'flat' mapping of items, e.g. tweets
466        are nested objects when retrieved from the twitter API that are easier
467        to store as a JSON file than as a flat CSV file, and it would be a shame
468        to throw away that data.
469
470        Note the two parameters warn_unmappable and map_missing. Items can be
471        unmappable in that their structure is too different to coerce into a
472        neat dictionary of the structure the data source expects. This makes it
473        'unmappable' and warn_unmappable determines what happens in this case.
474        It can also be of the right structure, but with some fields missing or
475        incomplete. map_missing determines what happens in that case. The
476        latter is for example possible when importing data via Zeeschuimer,
477        which produces unstably-structured data captured from social media
478        sites.
479
480        :param BasicProcessor processor:  A reference to the processor
481        iterating the dataset.
482        :param bool warn_unmappable:  If an item is not mappable, skip the item
483        and log a warning
484        :param max_unmappable:  Skip at most this many unmappable items; if
485        more are encountered, stop iterating. `None` to never stop.
486        :param map_missing: Indicates what to do with mapped items for which
487        some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
488        - 'default': fill missing fields with the default passed by map_item
489        - 'abort': raise a MappedItemIncompleteException if a field is missing
490        - a callback: replace missing field with the return value of the
491          callback. The MappedItem object is passed to the callback as the
492          first argument and the name of the missing field as the second.
493        - a dictionary with a key for each possible missing field: replace missing
494          field with a strategy for that field ('default', 'abort', or a callback)
495        :param get_annotations: Whether to also fetch annotations from the database.
496          This can be disabled to help speed up iteration.
497        :param offset: After how many rows we should yield items.
498        :param bool immediately_delete:  Only used when iterating a file
499          archive. Defaults to `True`, if set to `False`, files are not deleted
500          from the staging area after the iteration, so they can be re-used.
501        :param staging_area:  Only used when iterating a file archive. Where to
502          store the files while they're being worked with. If omitted, a
503          temporary folder is created and marked for deletion after all files
504          have been yielded.
505        :param list filename_filter:  Only used when iterating a file archive.
506          Whitelist of filenames to iterate, others are skipped. If empty, do
507          not filter.
508        :return generator:  A generator that yields DatasetItems
509        """
510        unmapped_items = 0
511
512        # Collect item_mapper for use with filter
513        item_mapper = False
514        own_processor = self.get_own_processor()
515        if own_processor and own_processor.map_item_method_available(dataset=self):
516            item_mapper = True
517
518        # Annotations are dynamically added, and we're handling them as 'extra' map_item fields.
519        # If we're getting annotations, we're caching items so we don't need to retrieve annotations one-by-one.
520        get_annotations = True if self.annotation_fields and get_annotations else False
521        if get_annotations:
522            annotation_fields = self.annotation_fields.copy()
523            item_batch_size = 500
524            dataset_item_cache = []
525            annotations_before = int(time.time())
526
527            # Append a number to annotation labels if there's duplicate ones
528            annotation_labels = {}
529            for (annotation_field_id, annotation_field_items,) in annotation_fields.items():
530                unique_label = annotation_field_items["label"]
531                counter = 1
532                while unique_label in annotation_labels.values():
533                    counter += 1
534                    unique_label = f"{annotation_field_items['label']}_{counter}"
535                annotation_labels[annotation_field_id] = unique_label
536
537        # missing field strategy can be for all fields at once, or per field
538        # if it is per field, it is a dictionary with field names and their strategy
539        # if it is for all fields, it may be a callback, 'abort', or 'default'
540        default_strategy = "default"
541        if type(map_missing) is not dict:
542            default_strategy = map_missing
543            map_missing = {}
544
545        iterator = self._iterate_items if self.get_extension() != "zip" else self._iterate_archive_contents
546
547        # Loop through items
548        for i, item in enumerate(iterator(processor=processor, offset=offset, *args, **kwargs)):
549            # Save original to yield
550            original_item = item.copy()
551
552            # Map item
553            if item_mapper:
554                try:
555                    mapped_item = own_processor.get_mapped_item(item)
556                except MapItemException as e:
557                    if warn_unmappable:
558                        self.warn_unmappable_item(
559                            i, processor, e, warn_admins=unmapped_items is False
560                        )
561
562                    unmapped_items += 1
563                    if max_unmappable and unmapped_items > max_unmappable:
564                        break
565                    else:
566                        continue
567
568                # check if fields have been marked as 'missing' in the
569                # underlying data, and treat according to the chosen strategy
570                if mapped_item.get_missing_fields():
571                    for missing_field in mapped_item.get_missing_fields():
572                        strategy = map_missing.get(missing_field, default_strategy)
573
574                        if callable(strategy):
575                            # delegate handling to a callback
576                            mapped_item.data[missing_field] = strategy(
577                                mapped_item.data, missing_field
578                            )
579                        elif strategy == "abort":
580                            # raise an exception to be handled at the processor level
581                            raise MappedItemIncompleteException(
582                                f"Cannot process item, field {missing_field} missing in source data."
583                            )
584                        elif strategy == "default":
585                            # use whatever was passed to the object constructor
586                            mapped_item.data[missing_field] = mapped_item.data[
587                                missing_field
588                            ].value
589                        else:
590                            raise ValueError(
591                                "map_missing must be 'abort', 'default', or a callback."
592                            )
593            else:
594                mapped_item = original_item
595
596            # yield a DatasetItem, which is a dict with some special properties
597            dataset_item = DatasetItem(
598                mapper=item_mapper,
599                original=original_item,
600                mapped_object=mapped_item,
601                data_file=original_item["path"] if "path" in original_item and issubclass(type(original_item["path"]), os.PathLike) else None,
602                **(
603                    mapped_item.get_item_data()
604                    if type(mapped_item) is MappedItem
605                    else mapped_item
606                ),
607            )
608
609            # If we're getting annotations, yield in items batches so we don't need to get annotations per item.
610            if get_annotations:
611                dataset_item_cache.append(dataset_item)
612
613                # When we reach the batch limit or the end of the dataset,
614                # get the annotations for cached items and yield the entire thing.
615                if len(dataset_item_cache) >= item_batch_size or i == (self.num_rows - 1):
616
617                    item_ids = [dataset_item.get("id") for dataset_item in dataset_item_cache]
618
619                    # Dict with item ids for fast lookup
620                    annotations_dict = collections.defaultdict(dict)
621                    annotations = self.get_annotations_for_item(item_ids, before=annotations_before)
622                    for item_annotation in annotations:
623                        item_id = item_annotation.item_id
624                        if item_annotation:
625                            annotations_dict[item_id][item_annotation.field_id] = item_annotation.value
626
627                    # Process each dataset item
628                    for dataset_item in dataset_item_cache:
629                        item_id = dataset_item.get("id")
630                        item_annotations = annotations_dict.get(item_id, {})
631
632                        for annotation_field_id, annotation_field_items in annotation_fields.items():
633                            # Get annotation value
634                            value = item_annotations.get(annotation_field_id, "")
635
636                            # Convert list to string if needed
637                            if isinstance(value, list):
638                                value = ",".join(value)
639                            elif value != "":
640                                value = str(value)  # Ensure string type
641                            else:
642                                value = ""
643
644                            dataset_item[annotation_labels[annotation_field_id]] = value
645
646                        yield dataset_item
647
648                    dataset_item_cache = []
649
650            else:
651                yield dataset_item

Generate mapped dataset items

Wrapper for _iterate_items that returns a DatasetItem, which can be accessed as a dict returning the original item or (if a mapper is available) the mapped item. Mapped or original versions of the item can also be accessed via the original and mapped_object properties of the DatasetItem.

Processors can define a method called map_item that can be used to map an item from the dataset file before it is processed any further. This is slower than storing the data file in the right format to begin with but not all data sources allow for easy 'flat' mapping of items, e.g. tweets are nested objects when retrieved from the twitter API that are easier to store as a JSON file than as a flat CSV file, and it would be a shame to throw away that data.

Note the two parameters warn_unmappable and map_missing. Items can be unmappable in that their structure is too different to coerce into a neat dictionary of the structure the data source expects. This makes it 'unmappable' and warn_unmappable determines what happens in this case. It can also be of the right structure, but with some fields missing or incomplete. map_missing determines what happens in that case. The latter is for example possible when importing data via Zeeschuimer, which produces unstably-structured data captured from social media sites.

Parameters
  • BasicProcessor processor: A reference to the processor iterating the dataset.
  • bool warn_unmappable: If an item is not mappable, skip the item and log a warning
  • max_unmappable: Skip at most this many unmappable items; if more are encountered, stop iterating. None to never stop.
  • map_missing: Indicates what to do with mapped items for which some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
    • 'default': fill missing fields with the default passed by map_item
    • 'abort': raise a MappedItemIncompleteException if a field is missing
    • a callback: replace missing field with the return value of the callback. The MappedItem object is passed to the callback as the first argument and the name of the missing field as the second.
    • a dictionary with a key for each possible missing field: replace missing field with a strategy for that field ('default', 'abort', or a callback)
  • get_annotations: Whether to also fetch annotations from the database. This can be disabled to help speed up iteration.
  • offset: After how many rows we should yield items.
  • bool immediately_delete: Only used when iterating a file archive. Defaults to True, if set to False, files are not deleted from the staging area after the iteration, so they can be re-used.
  • staging_area: Only used when iterating a file archive. Where to store the files while they're being worked with. If omitted, a temporary folder is created and marked for deletion after all files have been yielded.
  • list filename_filter: Only used when iterating a file archive. Whitelist of filenames to iterate, others are skipped. If empty, do not filter.
Returns

A generator that yields DatasetItems

def sort_and_iterate_items(self, sort='', reverse=False, chunk_size=50000, **kwargs) -> dict:
654    def sort_and_iterate_items(
655            self, sort="", reverse=False, chunk_size=50000, **kwargs
656    ) -> dict:
657        """
658        Loop through items in a dataset, sorted by a given key.
659
660        This is a wrapper function for `iterate_items()` with the
661        added functionality of sorting a dataset.
662
663        :param sort:				The item key that determines the sort order.
664        :param reverse:				Whether to sort by largest values first.
665        :param chunk_size:          How many items to write
666
667        :returns dict:				Yields iterated post
668        """
669
670        def sort_items(items_to_sort, sort_key, reverse, convert_sort_to_float=False):
671            """
672            Sort items based on the given key and order.
673
674            :param items_to_sort:  The items to sort
675            :param sort_key:  The key to sort by
676            :param reverse:  Whether to sort in reverse order
677            :return:  Sorted items
678            """
679            if reverse is False and (sort_key == "dataset-order" or sort_key == ""):
680                # Sort by dataset order
681                yield from items_to_sort
682            elif sort_key == "dataset-order" and reverse:
683                # Sort by dataset order in reverse
684                yield from reversed(list(items_to_sort))
685            else:
686                # Sort on the basis of a column value
687                if not convert_sort_to_float:
688                    yield from sorted(
689                        items_to_sort,
690                        key=lambda x: x.get(sort_key, ""),
691                        reverse=reverse,
692                    )
693                else:
694                    # Dataset fields can contain integers and empty strings.
695                    # Since these cannot be compared, we will convert every
696                    # empty string to 0.
697                    yield from sorted(
698                        items_to_sort,
699                        key=lambda x: convert_to_float(x.get(sort_key, ""), force=True),
700                        reverse=reverse,
701                    )
702
703        if self.num_rows < chunk_size:
704            try:
705                # First try to force-sort float values. If this doesn't work, it'll be alphabetical.
706                yield from sort_items(self.iterate_items(**kwargs), sort, reverse, convert_sort_to_float=True)
707            except (TypeError, ValueError):
708                yield from sort_items(
709                    self.iterate_items(**kwargs),
710                    sort,
711                    reverse,
712                    convert_sort_to_float=False
713                )
714
715        else:
716            # For large datasets, we will use chunk sorting
717            staging_area = self.get_staging_area()
718            buffer = []
719            chunk_files = []
720            convert_sort_to_float = True
721            fieldnames = self.get_columns()
722
723            def write_chunk(buffer, chunk_index):
724                """
725                Write a chunk of data to a temporary file
726
727                :param buffer:  The buffer containing the chunk of data
728                :param chunk_index:  The index of the chunk
729                :return:  The path to the temporary file
730                """
731                temp_file = staging_area.joinpath(f"chunk_{chunk_index}.csv")
732                with temp_file.open("w", encoding="utf-8") as chunk_file:
733                    writer = csv.DictWriter(chunk_file, fieldnames=fieldnames)
734                    writer.writeheader()
735                    writer.writerows(buffer)
736                return temp_file
737
738            # Divide the dataset into sorted chunks
739            for item in self.iterate_items(**kwargs):
740                buffer.append(item)
741                if len(buffer) >= chunk_size:
742                    try:
743                        buffer = list(
744                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
745                        )
746                    except (TypeError, ValueError):
747                        convert_sort_to_float = False
748                        buffer = list(
749                            sort_items(buffer, sort, reverse, convert_sort_to_float=convert_sort_to_float)
750                        )
751
752                    chunk_files.append(write_chunk(buffer, len(chunk_files)))
753                    buffer.clear()
754
755            # Sort and write any remaining items in the buffer
756            if buffer:
757                buffer = list(sort_items(buffer, sort, reverse, convert_sort_to_float))
758                chunk_files.append(write_chunk(buffer, len(chunk_files)))
759                buffer.clear()
760
761            # Merge sorted chunks into the final sorted file
762            sorted_file = staging_area.joinpath("sorted_" + self.key + ".csv")
763            with sorted_file.open("w", encoding="utf-8") as outfile:
764                writer = csv.DictWriter(outfile, fieldnames=self.get_columns())
765                writer.writeheader()
766
767                # Open all chunk files for reading
768                chunk_readers = [
769                    csv.DictReader(chunk.open("r", encoding="utf-8"))
770                    for chunk in chunk_files
771                ]
772                heap = []
773
774                # Initialize the heap with the first row from each chunk
775                for i, reader in enumerate(chunk_readers):
776                    try:
777                        row = next(reader)
778                        if sort == "dataset-order" and reverse:
779                            # Use a reverse index for "dataset-order" and reverse=True
780                            sort_key = -i
781                        elif convert_sort_to_float:
782                            # Negate numeric keys for reverse sorting
783                            sort_key = (
784                                -convert_to_float(row.get(sort, ""))
785                                if reverse
786                                else convert_to_float(row.get(sort, ""))
787                            )
788                        else:
789                            if reverse:
790                                # For reverse string sorting, invert string comparison by creating a tuple
791                                # with an inverted string - this makes Python's tuple comparison work in reverse
792                                sort_key = (
793                                    tuple(-ord(c) for c in row.get(sort, "")),
794                                    -i,
795                                )
796                            else:
797                                sort_key = (row.get(sort, ""), i)
798                        heap.append((sort_key, i, row))
799                    except StopIteration:
800                        pass
801
802                # Use a heap to merge sorted chunks
803                import heapq
804
805                heapq.heapify(heap)
806                while heap:
807                    _, chunk_index, smallest_row = heapq.heappop(heap)
808                    writer.writerow(smallest_row)
809                    try:
810                        next_row = next(chunk_readers[chunk_index])
811                        if sort == "dataset-order" and reverse:
812                            # Use a reverse index for "dataset-order" and reverse=True
813                            sort_key = -chunk_index
814                        elif convert_sort_to_float:
815                            sort_key = (
816                                -convert_to_float(next_row.get(sort, ""))
817                                if reverse
818                                else convert_to_float(next_row.get(sort, ""))
819                            )
820                        else:
821                            # Use the same inverted comparison for string values
822                            if reverse:
823                                sort_key = (
824                                    tuple(-ord(c) for c in next_row.get(sort, "")),
825                                    -chunk_index,
826                                )
827                            else:
828                                sort_key = (next_row.get(sort, ""), chunk_index)
829                        heapq.heappush(heap, (sort_key, chunk_index, next_row))
830                    except StopIteration:
831                        pass
832
833            # Read the sorted file and yield each item
834            with sorted_file.open("r", encoding="utf-8") as infile:
835                reader = csv.DictReader(infile)
836                for item in reader:
837                    yield item
838
839            # Remove the temporary files
840            if staging_area.is_dir():
841                shutil.rmtree(staging_area)

Loop through items in a dataset, sorted by a given key.

This is a wrapper function for iterate_items() with the added functionality of sorting a dataset.

Parameters
  • sort: The item key that determines the sort order.
  • reverse: Whether to sort by largest values first.
  • chunk_size: How many items to write

:returns dict: Yields iterated post

def get_staging_area(self):
843    def get_staging_area(self):
844        """
845        Get path to a temporary folder in which files can be stored before
846        finishing
847
848        This folder must be created before use, but is guaranteed to not exist
849        yet. The folder may be used as a staging area for the dataset data
850        while it is being processed.
851
852        :return Path:  Path to folder
853        """
854        results_file = self.get_results_path()
855
856        results_dir_base = results_file.parent
857        results_dir = results_file.name.replace(".", "") + "-staging"
858        results_path = results_dir_base.joinpath(results_dir)
859        index = 1
860        while results_path.exists():
861            results_path = results_dir_base.joinpath(results_dir + "-" + str(index))
862            index += 1
863
864        # create temporary folder
865        results_path.mkdir()
866
867        # Storing the staging area with the dataset so that it can be removed later
868        self.disposable_files.append(results_path)
869
870        return results_path

Get path to a temporary folder in which files can be stored before finishing

This folder must be created before use, but is guaranteed to not exist yet. The folder may be used as a staging area for the dataset data while it is being processed.

Returns

Path to folder

def remove_disposable_files(self):
872    def remove_disposable_files(self):
873        """
874        Remove any disposable files and folders, such as staging areas
875
876        Called from BasicProcessor after processing a dataset finishes.
877        """
878        # Remove DataSet staging areas
879        if self.disposable_files:
880            for disposable_file in self.disposable_files:
881                if disposable_file.exists():
882                    shutil.rmtree(disposable_file)

Remove any disposable files and folders, such as staging areas

Called from BasicProcessor after processing a dataset finishes.

def finish(self, num_rows=0):
884    def finish(self, num_rows=0):
885        """
886        Declare the dataset finished
887        """
888        if self.data["is_finished"]:
889            raise RuntimeError("Cannot finish a finished dataset again")
890
891        self.db.update(
892            "datasets",
893            where={"key": self.data["key"]},
894            data={
895                "is_finished": True,
896                "num_rows": num_rows,
897                "progress": 1.0,
898                "timestamp_finished": int(time.time()),
899            },
900        )
901        self.data["is_finished"] = True
902        self.data["num_rows"] = num_rows

Declare the dataset finished

def copy(self, shallow=True):
904    def copy(self, shallow=True):
905        """
906        Copies the dataset, making a new version with a unique key
907
908
909        :param bool shallow:  Shallow copy: does not copy the result file, but
910        instead refers to the same file as the original dataset did
911        :return Dataset:  Copied dataset
912        """
913        parameters = self.parameters.copy()
914
915        # a key is partially based on the parameters. so by setting these extra
916        # attributes, we also ensure a unique key will be generated for the
917        # copy
918        # possibly todo: don't use time for uniqueness (but one shouldn't be
919        # copying a dataset multiple times per microsecond, that's not what
920        # this is for)
921        parameters["copied_from"] = self.key
922        parameters["copied_at"] = time.time()
923
924        copy = DataSet(
925            parameters=parameters,
926            db=self.db,
927            extension=self.result_file.split(".")[-1],
928            type=self.type,
929            modules=self.modules
930        )
931
932        for field in self.data:
933            if field in ("id", "key", "timestamp", "job", "parameters", "result_file"):
934                continue
935            copy.__setattr__(field, self.data[field])
936
937        if shallow:
938            # use the same result file
939            copy.result_file = self.result_file
940        else:
941            # copy to new file with new key
942            shutil.copy(self.get_results_path(), copy.get_results_path())
943
944        if self.is_finished():
945            copy.finish(self.num_rows)
946
947        # make sure ownership is also copied
948        copy.copy_ownership_from(self)
949
950        return copy

Copies the dataset, making a new version with a unique key

Parameters
  • bool shallow: Shallow copy: does not copy the result file, but instead refers to the same file as the original dataset did
Returns

Copied dataset

def delete(self, commit=True, queue=None):
 952    def delete(self, commit=True, queue=None):
 953        """
 954        Delete the dataset, and all its children
 955
 956        Deletes both database records and result files. Note that manipulating
 957        a dataset object after it has been deleted is undefined behaviour.
 958
 959        :param bool commit:  Commit SQL DELETE query?
 960        """
 961        # first, recursively delete children
 962        children = self.db.fetchall(
 963            "SELECT * FROM datasets WHERE key_parent = %s", (self.key,)
 964        )
 965        for child in children:
 966            try:
 967                child = DataSet(key=child["key"], db=self.db, modules=self.modules)
 968                child.delete(commit=commit)
 969            except DataSetException:
 970                # dataset already deleted - race condition?
 971                pass
 972
 973        # delete any queued jobs for this dataset
 974        try:
 975            job = Job.get_by_remote_ID(self.key, self.db, self.type)
 976            if job.is_claimed:
 977                # tell API to stop any jobs running for this dataset
 978                # level 2 = cancel job
 979                # we're not interested in the result - if the API is available,
 980                # it will do its thing, if it's not the backend is probably not
 981                # running so the job also doesn't need to be interrupted
 982                call_api(
 983                    "cancel-job",
 984                    {"remote_id": self.key, "jobtype": self.type, "level": 2},
 985                    False,
 986                )
 987
 988            # this deletes the job from the database
 989            job.finish(True)
 990
 991        except JobNotFoundException:
 992            pass
 993
 994        # delete this dataset's own annotations
 995        self.db.delete("annotations", where={"dataset": self.key}, commit=commit)
 996        # delete annotations that have been generated as part of this dataset
 997        self.db.delete("annotations", where={"from_dataset": self.key}, commit=commit)
 998        # delete annotation fields on parent dataset(s) stemming from this dataset
 999        for related_dataset in self.get_genealogy(update_cache=True):
1000            field_deleted = False
1001            annotation_fields = related_dataset.annotation_fields
1002            if annotation_fields:
1003                for field_id in list(annotation_fields.keys()):
1004                    if annotation_fields[field_id].get("from_dataset", "") == self.key:
1005                        del annotation_fields[field_id]
1006                        field_deleted = True
1007            if field_deleted:
1008                related_dataset.save_annotation_fields(annotation_fields)
1009
1010        # delete dataset from database
1011        self.db.delete("datasets", where={"key": self.key}, commit=commit)
1012        self.db.delete("datasets_owners", where={"key": self.key}, commit=commit)
1013        self.db.delete("users_favourites", where={"key": self.key}, commit=commit)
1014
1015        # delete from drive
1016        try:
1017            if self.get_results_path().exists():
1018                self.get_results_path().unlink()
1019            if self.get_results_path().with_suffix(".log").exists():
1020                self.get_results_path().with_suffix(".log").unlink()
1021            if self.get_results_folder_path().exists():
1022                shutil.rmtree(self.get_results_folder_path())
1023
1024        except FileNotFoundError:
1025            # already deleted, apparently
1026            pass
1027        except PermissionError as e:
1028            self.db.log.error(
1029                f"Could not delete all dataset {self.key} files; they may need to be deleted manually: {e}"
1030            )

Delete the dataset, and all its children

Deletes both database records and result files. Note that manipulating a dataset object after it has been deleted is undefined behaviour.

Parameters
  • bool commit: Commit SQL DELETE query?
def update_children(self, **kwargs):
1032    def update_children(self, **kwargs):
1033        """
1034        Update an attribute for all child datasets
1035
1036        Can be used to e.g. change the owner, version, finished status for all
1037        datasets in a tree
1038
1039        :param kwargs:  Parameters corresponding to known dataset attributes
1040        """
1041        for child in self.get_children(update=True):
1042            for attr, value in kwargs.items():
1043                child.__setattr__(attr, value)
1044
1045            child.update_children(**kwargs)

Update an attribute for all child datasets

Can be used to e.g. change the owner, version, finished status for all datasets in a tree

Parameters
  • kwargs: Parameters corresponding to known dataset attributes
def is_finished(self):
1047    def is_finished(self):
1048        """
1049        Check if dataset is finished
1050        :return bool:
1051        """
1052        return bool(self.data["is_finished"])

Check if dataset is finished

Returns
def is_rankable(self, multiple_items=True):
1054    def is_rankable(self, multiple_items=True):
1055        """
1056        Determine if a dataset is rankable
1057
1058        Rankable means that it is a CSV file with 'date' and 'value' columns
1059        as well as one or more item label columns
1060
1061        :param bool multiple_items:  Consider datasets with multiple items per
1062        item (e.g. word_1, word_2, etc)?
1063
1064        :return bool:  Whether the dataset is rankable or not
1065        """
1066        if (
1067                self.get_results_path().suffix != ".csv"
1068                or not self.get_results_path().exists()
1069        ):
1070            return False
1071
1072        column_options = {"date", "value", "item"}
1073        if multiple_items:
1074            column_options.add("word_1")
1075
1076        with self.get_results_path().open(encoding="utf-8") as infile:
1077            reader = csv.DictReader(infile)
1078            try:
1079                return len(set(reader.fieldnames) & column_options) >= 3
1080            except (TypeError, ValueError):
1081                return False

Determine if a dataset is rankable

Rankable means that it is a CSV file with 'date' and 'value' columns as well as one or more item label columns

Parameters
  • bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)?
Returns

Whether the dataset is rankable or not

def is_accessible_by(self, username, role='owner'):
1083    def is_accessible_by(self, username, role="owner"):
1084        """
1085        Check if dataset has given user as owner
1086
1087        :param str|User username: Username to check for
1088        :return bool:
1089        """
1090        if type(username) is not str:
1091            if hasattr(username, "get_id"):
1092                username = username.get_id()
1093            else:
1094                raise TypeError("User must be a str or User object")
1095
1096        # 'normal' owners
1097        if username in [
1098            owner
1099            for owner, meta in self.owners.items()
1100            if (role is None or meta["role"] == role)
1101        ]:
1102            return True
1103
1104        # owners that are owner by being part of a tag
1105        if username in itertools.chain(
1106                *[
1107                    tagged_owners
1108                    for tag, tagged_owners in self.tagged_owners.items()
1109                    if (role is None or self.owners[f"tag:{tag}"]["role"] == role)
1110                ]
1111        ):
1112            return True
1113
1114        return False

Check if dataset has given user as owner

Parameters
  • str|User username: Username to check for
Returns
def get_owners_users(self, role='owner'):
1116    def get_owners_users(self, role="owner"):
1117        """
1118        Get list of dataset owners
1119
1120        This returns a list of *users* that are considered owners. Tags are
1121        transparently replaced with the users with that tag.
1122
1123        :param str|None role:  Role to check for. If `None`, all owners are
1124        returned regardless of role.
1125
1126        :return set:  De-duplicated owner list
1127        """
1128        # 'normal' owners
1129        owners = [
1130            owner
1131            for owner, meta in self.owners.items()
1132            if (role is None or meta["role"] == role) and not owner.startswith("tag:")
1133        ]
1134
1135        # owners that are owner by being part of a tag
1136        owners.extend(
1137            itertools.chain(
1138                *[
1139                    tagged_owners
1140                    for tag, tagged_owners in self.tagged_owners.items()
1141                    if role is None or self.owners[f"tag:{tag}"]["role"] == role
1142                ]
1143            )
1144        )
1145
1146        # de-duplicate before returning
1147        return set(owners)

Get list of dataset owners

This returns a list of users that are considered owners. Tags are transparently replaced with the users with that tag.

Parameters
  • str|None role: Role to check for. If None, all owners are returned regardless of role.
Returns

De-duplicated owner list

def get_owners(self, role='owner'):
1149    def get_owners(self, role="owner"):
1150        """
1151        Get list of dataset owners
1152
1153        This returns a list of all owners, and does not transparently resolve
1154        tags (like `get_owners_users` does).
1155
1156        :param str|None role:  Role to check for. If `None`, all owners are
1157        returned regardless of role.
1158
1159        :return set:  De-duplicated owner list
1160        """
1161        return [
1162            owner
1163            for owner, meta in self.owners.items()
1164            if (role is None or meta["role"] == role)
1165        ]

Get list of dataset owners

This returns a list of all owners, and does not transparently resolve tags (like get_owners_users does).

Parameters
  • str|None role: Role to check for. If None, all owners are returned regardless of role.
Returns

De-duplicated owner list

def add_owner(self, username, role='owner'):
1167    def add_owner(self, username, role="owner"):
1168        """
1169        Set dataset owner
1170
1171        If the user is already an owner, but with a different role, the role is
1172        updated. If the user is already an owner with the same role, nothing happens.
1173
1174        :param str|User username:  Username to set as owner
1175        :param str|None role:  Role to add user with.
1176        """
1177        if type(username) is not str:
1178            if hasattr(username, "get_id"):
1179                username = username.get_id()
1180            else:
1181                raise TypeError("User must be a str or User object")
1182
1183        if username not in self.owners:
1184            self.owners[username] = {"name": username, "key": self.key, "role": role}
1185            self.db.insert("datasets_owners", data=self.owners[username], safe=True)
1186
1187        elif username in self.owners and self.owners[username]["role"] != role:
1188            self.db.update(
1189                "datasets_owners",
1190                data={"role": role},
1191                where={"name": username, "key": self.key},
1192            )
1193            self.owners[username]["role"] = role
1194
1195        if username.startswith("tag:"):
1196            # this is a bit more complicated than just adding to the list of
1197            # owners, so do a full refresh
1198            self.refresh_owners()
1199
1200        # make sure children's owners remain in sync
1201        for child in self.get_children(update=True):
1202            child.add_owner(username, role)
1203            # not recursive, since we're calling it from recursive code!
1204            child.copy_ownership_from(self, recursive=False)

Set dataset owner

If the user is already an owner, but with a different role, the role is updated. If the user is already an owner with the same role, nothing happens.

Parameters
  • str|User username: Username to set as owner
  • str|None role: Role to add user with.
def remove_owner(self, username):
1206    def remove_owner(self, username):
1207        """
1208        Remove dataset owner
1209
1210        If no owner is set, the dataset is assigned to the anonymous user.
1211        If the user is not an owner, nothing happens.
1212
1213        :param str|User username:  Username to set as owner
1214        """
1215        if type(username) is not str:
1216            if hasattr(username, "get_id"):
1217                username = username.get_id()
1218            else:
1219                raise TypeError("User must be a str or User object")
1220
1221        if username in self.owners:
1222            del self.owners[username]
1223            self.db.delete("datasets_owners", where={"name": username, "key": self.key})
1224
1225            if not self.owners:
1226                self.add_owner("anonymous")
1227
1228        if username in self.tagged_owners:
1229            del self.tagged_owners[username]
1230
1231        # make sure children's owners remain in sync
1232        for child in self.get_children(update=True):
1233            child.remove_owner(username)
1234            # not recursive, since we're calling it from recursive code!
1235            child.copy_ownership_from(self, recursive=False)

Remove dataset owner

If no owner is set, the dataset is assigned to the anonymous user. If the user is not an owner, nothing happens.

Parameters
  • str|User username: Username to set as owner
def refresh_owners(self):
1237    def refresh_owners(self):
1238        """
1239        Update internal owner cache
1240
1241        This makes sure that the list of *users* and *tags* which can access the
1242        dataset is up to date.
1243        """
1244        self.owners = {
1245            owner["name"]: owner
1246            for owner in self.db.fetchall(
1247                "SELECT * FROM datasets_owners WHERE key = %s", (self.key,)
1248            )
1249        }
1250
1251        # determine which users (if any) are owners of the dataset by having a
1252        # tag that is listed as an owner
1253        owner_tags = [name[4:] for name in self.owners if name.startswith("tag:")]
1254        if owner_tags:
1255            tagged_owners = self.db.fetchall(
1256                "SELECT name, tags FROM users WHERE tags ?| %s ", (owner_tags,)
1257            )
1258            self.tagged_owners = {
1259                owner_tag: [
1260                    user["name"] for user in tagged_owners if owner_tag in user["tags"]
1261                ]
1262                for owner_tag in owner_tags
1263            }
1264        else:
1265            self.tagged_owners = {}

Update internal owner cache

This makes sure that the list of users and tags which can access the dataset is up to date.

def copy_ownership_from(self, dataset, recursive=True):
1267    def copy_ownership_from(self, dataset, recursive=True):
1268        """
1269        Copy ownership
1270
1271        This is useful to e.g. make sure a dataset's ownership stays in sync
1272        with its parent
1273
1274        :param Dataset dataset:  Parent to copy from
1275        :return:
1276        """
1277        self.db.delete("datasets_owners", where={"key": self.key}, commit=False)
1278
1279        for role in ("owner", "viewer"):
1280            owners = dataset.get_owners(role=role)
1281            for owner in owners:
1282                self.db.insert(
1283                    "datasets_owners",
1284                    data={"key": self.key, "name": owner, "role": role},
1285                    commit=False,
1286                    safe=True,
1287                )
1288
1289        self.db.commit()
1290        if recursive:
1291            for child in self.get_children(update=True):
1292                child.copy_ownership_from(self, recursive=recursive)

Copy ownership

This is useful to e.g. make sure a dataset's ownership stays in sync with its parent

Parameters
  • Dataset dataset: Parent to copy from
Returns
def get_parameters(self):
1294    def get_parameters(self):
1295        """
1296        Get dataset parameters
1297
1298        The dataset parameters are stored as JSON in the database - parse them
1299        and return the resulting object
1300
1301        :return:  Dataset parameters as originally stored
1302        """
1303        try:
1304            return json.loads(self.data["parameters"])
1305        except json.JSONDecodeError:
1306            return {}

Get dataset parameters

The dataset parameters are stored as JSON in the database - parse them and return the resulting object

Returns

Dataset parameters as originally stored

def get_columns(self):
1308    def get_columns(self):
1309        """
1310        Returns the dataset columns.
1311
1312        Useful for processor input forms. Can deal with both CSV and NDJSON
1313        files, the latter only if a `map_item` function is available in the
1314        processor that generated it. While in other cases one could use the
1315        keys of the JSON object, this is not always possible in follow-up code
1316        that uses the 'column' names, so for consistency this function acts as
1317        if no column can be parsed if no `map_item` function exists.
1318
1319        :return list:  List of dataset columns; empty list if unable to parse
1320        """
1321        if not self.get_results_path().exists():
1322            # no file to get columns from
1323            return []
1324
1325        if (self.get_results_path().suffix.lower() == ".csv") or (
1326                self.get_results_path().suffix.lower() == ".ndjson"
1327                and self.get_own_processor() is not None
1328                and self.get_own_processor().map_item_method_available(dataset=self)
1329        ):
1330            items = self.iterate_items(warn_unmappable=False, get_annotations=False, max_unmappable=100)
1331            try:
1332                keys = list(next(items).keys())
1333                if self.annotation_fields:
1334                    for annotation_field in self.annotation_fields.values():
1335                        annotation_column = annotation_field["label"]
1336                        label_count = 1
1337                        while annotation_column in keys:
1338                            label_count += 1
1339                            annotation_column = (
1340                                f"{annotation_field['label']}_{label_count}"
1341                            )
1342                        keys.append(annotation_column)
1343                columns = keys
1344            except (StopIteration, NotImplementedError):
1345                # No items or otherwise unable to iterate
1346                columns = []
1347            finally:
1348                del items
1349        else:
1350            # Filetype not CSV or an NDJSON with `map_item`
1351            columns = []
1352
1353        return columns

Returns the dataset columns.

Useful for processor input forms. Can deal with both CSV and NDJSON files, the latter only if a map_item function is available in the processor that generated it. While in other cases one could use the keys of the JSON object, this is not always possible in follow-up code that uses the 'column' names, so for consistency this function acts as if no column can be parsed if no map_item function exists.

Returns

List of dataset columns; empty list if unable to parse

def update_label(self, label):
1355    def update_label(self, label):
1356        """
1357        Update label for this dataset
1358
1359        :param str label:  	New label
1360        :return str: 		The new label, as returned by get_label
1361        """
1362        self.parameters["label"] = label
1363
1364        self.db.update(
1365            "datasets",
1366            data={"parameters": json.dumps(self.parameters)},
1367            where={"key": self.key},
1368        )
1369        return self.get_label()

Update label for this dataset

Parameters
  • str label: New label
Returns
        The new label, as returned by get_label
def get_label(self, parameters=None, default='Query'):
1371    def get_label(self, parameters=None, default="Query"):
1372        """
1373        Generate a readable label for the dataset
1374
1375        :param dict parameters:  Parameters of the dataset
1376        :param str default:  Label to use if it cannot be inferred from the
1377        parameters
1378
1379        :return str:  Label
1380        """
1381        if not parameters:
1382            parameters = self.parameters
1383
1384        if parameters.get("label"):
1385            return parameters["label"]
1386        elif parameters.get("body_query") and parameters["body_query"] != "empty":
1387            return parameters["body_query"]
1388        elif parameters.get("body_match") and parameters["body_match"] != "empty":
1389            return parameters["body_match"]
1390        elif parameters.get("subject_query") and parameters["subject_query"] != "empty":
1391            return parameters["subject_query"]
1392        elif parameters.get("subject_match") and parameters["subject_match"] != "empty":
1393            return parameters["subject_match"]
1394        elif parameters.get("query"):
1395            label = parameters["query"]
1396            # Some legacy datasets have lists as query data
1397            if isinstance(label, list):
1398                label = ", ".join(label)
1399
1400            label = label if len(label) < 30 else label[:25] + "..."
1401            label = label.strip().replace("\n", ", ")
1402            return label
1403        elif parameters.get("country_flag") and parameters["country_flag"] != "all":
1404            return "Flag: %s" % parameters["country_flag"]
1405        elif parameters.get("country_name") and parameters["country_name"] != "all":
1406            return "Country: %s" % parameters["country_name"]
1407        elif parameters.get("filename"):
1408            return parameters["filename"]
1409        elif parameters.get("board") and "datasource" in parameters:
1410            return parameters["datasource"] + "/" + parameters["board"]
1411        elif (
1412                "datasource" in parameters
1413                and parameters["datasource"] in self.modules.datasources
1414        ):
1415            return (
1416                    self.modules.datasources[parameters["datasource"]]["name"] + " Dataset"
1417            )
1418        else:
1419            return default

Generate a readable label for the dataset

Parameters
  • dict parameters: Parameters of the dataset
  • str default: Label to use if it cannot be inferred from the parameters
Returns

Label

def change_datasource(self, datasource):
1421    def change_datasource(self, datasource):
1422        """
1423        Change the datasource type for this dataset
1424
1425        :param str label:  New datasource type
1426        :return str:  The new datasource type
1427        """
1428
1429        self.parameters["datasource"] = datasource
1430
1431        self.db.update(
1432            "datasets",
1433            data={"parameters": json.dumps(self.parameters)},
1434            where={"key": self.key},
1435        )
1436        return datasource

Change the datasource type for this dataset

Parameters
  • str label: New datasource type
Returns

The new datasource type

def reserve_result_file(self, parameters=None, extension='csv'):
1438    def reserve_result_file(self, parameters=None, extension="csv"):
1439        """
1440        Generate a unique path to the results file for this dataset
1441
1442        This generates a file name for the data file of this dataset, and makes sure
1443        no file exists or will exist at that location other than the file we
1444        expect (i.e. the data for this particular dataset).
1445
1446        :param str extension: File extension, "csv" by default
1447        :param parameters:  Dataset parameters
1448        :return bool:  Whether the file path was successfully reserved
1449        """
1450        if self.data["is_finished"]:
1451            raise RuntimeError("Cannot reserve results file for a finished dataset")
1452
1453        # Use 'random' for random post queries
1454        if "random_amount" in parameters and int(parameters["random_amount"]) > 0:
1455            file = "random-" + str(parameters["random_amount"]) + "-" + self.data["key"]
1456        # Use country code for country flag queries
1457        elif "country_flag" in parameters and parameters["country_flag"] != "all":
1458            file = (
1459                    "countryflag-"
1460                    + str(parameters["country_flag"])
1461                    + "-"
1462                    + self.data["key"]
1463            )
1464        # Use the query string for all other queries
1465        else:
1466            query_bit = self.data["query"].replace(" ", "-").lower()
1467            query_bit = re.sub(r"[^a-z0-9\-]", "", query_bit)
1468            query_bit = query_bit[:100]  # Crop to avoid OSError
1469            file = query_bit + "-" + self.data["key"]
1470            file = re.sub(r"[-]+", "-", file)
1471
1472        self.data["result_file"] = file + "." + extension.lower()
1473        index = 1
1474        while self.get_results_path().is_file():
1475            self.data["result_file"] = file + "-" + str(index) + "." + extension.lower()
1476            index += 1
1477
1478        updated = self.db.update("datasets", where={"query": self.data["query"], "key": self.data["key"]},
1479                                 data={"result_file": self.data["result_file"]})
1480        return updated > 0

Generate a unique path to the results file for this dataset

This generates a file name for the data file of this dataset, and makes sure no file exists or will exist at that location other than the file we expect (i.e. the data for this particular dataset).

Parameters
  • str extension: File extension, "csv" by default
  • parameters: Dataset parameters
Returns

Whether the file path was successfully reserved

def get_key(self, query, parameters, parent='', time_offset=0):
1482    def get_key(self, query, parameters, parent="", time_offset=0):
1483        """
1484        Generate a unique key for this dataset that can be used to identify it
1485
1486        The key is a hash of a combination of the query string and parameters.
1487        You never need to call this, really: it's used internally.
1488
1489        :param str query:  Query string
1490        :param parameters:  Dataset parameters
1491        :param parent: Parent dataset's key (if applicable)
1492        :param time_offset:  Offset to add to the time component of the dataset
1493        key. This can be used to ensure a unique key even if the parameters and
1494        timing is otherwise identical to an existing dataset's
1495
1496        :return str:  Dataset key
1497        """
1498        # Return a hash based on parameters
1499        # we're going to use the hash of the parameters to uniquely identify
1500        # the dataset, so make sure it's always in the same order, or we might
1501        # end up creating multiple keys for the same dataset if python
1502        # decides to return the dict in a different order
1503        param_key = collections.OrderedDict()
1504        for key in sorted(parameters):
1505            param_key[key] = parameters[key]
1506
1507        # we additionally use the current time as a salt - this should usually
1508        # ensure a unique key for the dataset. if for some reason there is a
1509        # hash collision
1510        param_key["_salt"] = int(time.time()) + time_offset
1511
1512        parent_key = str(parent) if parent else ""
1513        plain_key = repr(param_key) + str(query) + parent_key
1514        hashed_key = hash_to_md5(plain_key)
1515
1516        if self.db.fetchone("SELECT key FROM datasets WHERE key = %s", (hashed_key,)):
1517            # key exists, generate a new one
1518            return self.get_key(
1519                query, parameters, parent, time_offset=random.randint(1, 10)
1520            )
1521        else:
1522            return hashed_key

Generate a unique key for this dataset that can be used to identify it

The key is a hash of a combination of the query string and parameters. You never need to call this, really: it's used internally.

Parameters
  • str query: Query string
  • parameters: Dataset parameters
  • parent: Parent dataset's key (if applicable)
  • time_offset: Offset to add to the time component of the dataset key. This can be used to ensure a unique key even if the parameters and timing is otherwise identical to an existing dataset's
Returns

Dataset key

def set_key(self, key):
1524    def set_key(self, key):
1525        """
1526        Change dataset key
1527
1528        In principe, keys should never be changed. But there are rare cases
1529        where it is useful to do so, in particular when importing a dataset
1530        from another 4CAT instance; in that case it makes sense to try and
1531        ensure that the key is the same as it was before. This function sets
1532        the dataset key and updates any dataset references to it.
1533
1534        :param str key:  Key to set
1535        :return str:  Key that was set. If the desired key already exists, the
1536        original key is kept.
1537        """
1538        key_exists = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (key,))
1539        if key_exists or not key:
1540            return self.key
1541
1542        old_key = self.key
1543        self.db.update("datasets", data={"key": key}, where={"key": old_key})
1544
1545        # update references
1546        self.db.update(
1547            "datasets", data={"key_parent": key}, where={"key_parent": old_key}
1548        )
1549        self.db.update("datasets_owners", data={"key": key}, where={"key": old_key})
1550        self.db.update("jobs", data={"remote_id": key}, where={"remote_id": old_key})
1551        self.db.update("users_favourites", data={"key": key}, where={"key": old_key})
1552
1553        # for good measure
1554        self.db.commit()
1555        self.key = key
1556
1557        return self.key

Change dataset key

In principe, keys should never be changed. But there are rare cases where it is useful to do so, in particular when importing a dataset from another 4CAT instance; in that case it makes sense to try and ensure that the key is the same as it was before. This function sets the dataset key and updates any dataset references to it.

Parameters
  • str key: Key to set
Returns

Key that was set. If the desired key already exists, the original key is kept.

def get_status(self):
1559    def get_status(self):
1560        """
1561        Get Dataset status
1562
1563        :return string: Dataset status
1564        """
1565        return self.data["status"]

Get Dataset status

Returns

Dataset status

def update_status(self, status, is_final=False):
1567    def update_status(self, status, is_final=False):
1568        """
1569        Update dataset status
1570
1571        The status is a string that may be displayed to a user to keep them
1572        updated and informed about the progress of a dataset. No memory is kept
1573        of earlier dataset statuses; the current status is overwritten when
1574        updated.
1575
1576        Statuses are also written to the dataset log file.
1577
1578        :param string status:  Dataset status
1579        :param bool is_final:  If this is `True`, subsequent calls to this
1580        method while the object is instantiated will not update the dataset
1581        status.
1582        :return bool:  Status update successful?
1583        """
1584        if self.no_status_updates:
1585            return
1586
1587        # for presets, copy the updated status to the preset(s) this is part of
1588        if self.preset_parent is None:
1589            self.preset_parent = [
1590                                     parent
1591                                     for parent in self.get_genealogy()
1592                                     if parent.type.find("preset-") == 0 and parent.key != self.key
1593                                 ][:1]
1594
1595        if self.preset_parent:
1596            for preset_parent in self.preset_parent:
1597                if not preset_parent.is_finished():
1598                    preset_parent.update_status(status)
1599
1600        self.data["status"] = status
1601        updated = self.db.update(
1602            "datasets", where={"key": self.data["key"]}, data={"status": status}
1603        )
1604
1605        if is_final:
1606            self.no_status_updates = True
1607
1608        self.log(status)
1609
1610        return updated > 0

Update dataset status

The status is a string that may be displayed to a user to keep them updated and informed about the progress of a dataset. No memory is kept of earlier dataset statuses; the current status is overwritten when updated.

Statuses are also written to the dataset log file.

Parameters
  • string status: Dataset status
  • bool is_final: If this is True, subsequent calls to this method while the object is instantiated will not update the dataset status.
Returns

Status update successful?

def update_progress(self, progress):
1612    def update_progress(self, progress):
1613        """
1614        Update dataset progress
1615
1616        The progress can be used to indicate to a user how close the dataset
1617        is to completion.
1618
1619        :param float progress:  Between 0 and 1.
1620        :return:
1621        """
1622        progress = min(1, max(0, progress))  # clamp
1623        if type(progress) is int:
1624            progress = float(progress)
1625
1626        self.data["progress"] = progress
1627        updated = self.db.update(
1628            "datasets", where={"key": self.data["key"]}, data={"progress": progress}
1629        )
1630        return updated > 0

Update dataset progress

The progress can be used to indicate to a user how close the dataset is to completion.

Parameters
  • float progress: Between 0 and 1.
Returns
def get_progress(self):
1632    def get_progress(self):
1633        """
1634        Get dataset progress
1635
1636        :return float:  Progress, between 0 and 1
1637        """
1638        return self.data["progress"]

Get dataset progress

Returns

Progress, between 0 and 1

def finish_with_error(self, error):
1640    def finish_with_error(self, error):
1641        """
1642        Set error as final status, and finish with 0 results
1643
1644        This is a convenience function to avoid having to repeat
1645        "update_status" and "finish" a lot.
1646
1647        :param str error:  Error message for final dataset status.
1648        :return:
1649        """
1650        self.update_status(error, is_final=True)
1651        self.finish(0)
1652
1653        return None

Set error as final status, and finish with 0 results

This is a convenience function to avoid having to repeat "update_status" and "finish" a lot.

Parameters
  • str error: Error message for final dataset status.
Returns
def update_version(self, version):
1655    def update_version(self, version):
1656        """
1657        Update software version used for this dataset
1658
1659        This can be used to verify the code that was used to process this dataset.
1660
1661        :param string version:  Version identifier
1662        :return bool:  Update successul?
1663        """
1664        try:
1665            # this fails if the processor type is unknown
1666            # edge case, but let's not crash...
1667            processor_path = self.modules.processors.get(self.data["type"]).filepath
1668        except AttributeError:
1669            processor_path = ""
1670
1671        updated = self.db.update(
1672            "datasets",
1673            where={"key": self.data["key"]},
1674            data={
1675                "software_version": version[0],
1676                "software_source": version[1],
1677                "software_file": processor_path,
1678            },
1679        )
1680
1681        return updated > 0

Update software version used for this dataset

This can be used to verify the code that was used to process this dataset.

Parameters
  • string version: Version identifier
Returns

Update successul?

def delete_parameter(self, parameter, instant=True):
1683    def delete_parameter(self, parameter, instant=True):
1684        """
1685        Delete a parameter from the dataset metadata
1686
1687        :param string parameter:  Parameter to delete
1688        :param bool instant:  Also delete parameters in this instance object?
1689        :return bool:  Update successul?
1690        """
1691        parameters = self.parameters.copy()
1692        if parameter in parameters:
1693            del parameters[parameter]
1694        else:
1695            return False
1696
1697        updated = self.db.update(
1698            "datasets",
1699            where={"key": self.data["key"]},
1700            data={"parameters": json.dumps(parameters)},
1701        )
1702
1703        if instant:
1704            self.parameters = parameters
1705
1706        return updated > 0

Delete a parameter from the dataset metadata

Parameters
  • string parameter: Parameter to delete
  • bool instant: Also delete parameters in this instance object?
Returns

Update successul?

def get_version_url(self, file):
1708    def get_version_url(self, file):
1709        """
1710        Get a versioned github URL for the version this dataset was processed with
1711
1712        :param file:  File to link within the repository
1713        :return:  URL, or an empty string
1714        """
1715        if not self.data["software_source"]:
1716            return ""
1717
1718        filepath = self.data.get("software_file", "")
1719        if filepath.startswith("/config/extensions/"):
1720            # go to root of extension
1721            filepath = "/" + "/".join(filepath.split("/")[3:])
1722
1723        return (
1724                self.data["software_source"]
1725                + "/blob/"
1726                + self.data["software_version"]
1727                + filepath
1728        )

Get a versioned github URL for the version this dataset was processed with

Parameters
  • file: File to link within the repository
Returns

URL, or an empty string

def top_parent(self):
1730    def top_parent(self):
1731        """
1732        Get root dataset
1733
1734        Traverses the tree of datasets this one is part of until it finds one
1735        with no source_dataset dataset, then returns that dataset.
1736
1737        :return Dataset: Parent dataset
1738        """
1739        genealogy = self.get_genealogy()
1740        return genealogy[0]

Get root dataset

Traverses the tree of datasets this one is part of until it finds one with no source_dataset dataset, then returns that dataset.

Returns

Parent dataset

def get_genealogy(self, update_cache=False):
1742    def get_genealogy(self, update_cache=False):
1743        """
1744        Get genealogy of this dataset
1745
1746        Creates a list of DataSet objects, with the first one being the
1747        'top' dataset, and each subsequent one being a child of the previous
1748        one, ending with the current dataset.
1749
1750        :param bool update_cache:  Update the cached genealogy if True, else return cached value
1751        :return list:  Dataset genealogy, oldest dataset first
1752        """
1753        if not self._genealogy or update_cache:
1754            key_parent = self.key_parent
1755            genealogy = []
1756
1757            while key_parent:
1758                try:
1759                    parent = DataSet(key=key_parent, db=self.db, modules=self.modules)
1760                except DataSetException:
1761                    break
1762
1763                genealogy.append(parent)
1764                if parent.key_parent:
1765                    key_parent = parent.key_parent
1766                else:
1767                    break
1768
1769            genealogy.reverse()
1770
1771            # add self to the end
1772            genealogy.append(self)
1773            # cache the result
1774            self._genealogy = genealogy
1775
1776        # return a copy to prevent external modification
1777        return list(self._genealogy)

Get genealogy of this dataset

Creates a list of DataSet objects, with the first one being the 'top' dataset, and each subsequent one being a child of the previous one, ending with the current dataset.

Parameters
  • bool update_cache: Update the cached genealogy if True, else return cached value
Returns

Dataset genealogy, oldest dataset first

def get_children(self, update=False):
1779    def get_children(self, update=False):
1780        """
1781        Get children of this dataset
1782
1783        :param bool update:  Update the list of children from database if True, else return cached value
1784        :return list:  List of child datasets
1785        """
1786        if self._children is not None and not update:
1787            return self._children
1788
1789        analyses = self.db.fetchall(
1790            "SELECT * FROM datasets WHERE key_parent = %s ORDER BY timestamp ASC",
1791            (self.key,),
1792        )
1793        self._children = [
1794            DataSet(data=analysis, db=self.db, modules=self.modules)
1795            for analysis in analyses
1796        ]
1797        return self._children

Get children of this dataset

Parameters
  • bool update: Update the list of children from database if True, else return cached value
Returns

List of child datasets

def get_all_children(self, recursive=True, update=True):
1799    def get_all_children(self, recursive=True, update=True):
1800        """
1801        Get all children of this dataset
1802
1803        Results are returned as a non-hierarchical list, i.e. the result does
1804        not reflect the actual dataset hierarchy (but all datasets in the
1805        result will have the original dataset as an ancestor somewhere)
1806
1807        :return list:  List of DataSets
1808        """
1809        children = self.get_children(update=update)
1810        results = children.copy()
1811        if recursive:
1812            for child in children:
1813                results += child.get_all_children(recursive=recursive, update=update)
1814
1815        return results

Get all children of this dataset

Results are returned as a non-hierarchical list, i.e. the result does not reflect the actual dataset hierarchy (but all datasets in the result will have the original dataset as an ancestor somewhere)

Returns

List of DataSets

def nearest(self, type_filter):
1817    def nearest(self, type_filter):
1818        """
1819        Return nearest dataset that matches the given type
1820
1821        Starting with this dataset, traverse the hierarchy upwards and return
1822        whichever dataset matches the given type.
1823
1824        :param str type_filter:  Type filter. Can contain wildcards and is matched
1825        using `fnmatch.fnmatch`.
1826        :return:  Earliest matching dataset, or `None` if none match.
1827        """
1828        genealogy = self.get_genealogy()
1829        for dataset in reversed(genealogy):
1830            if fnmatch.fnmatch(dataset.type, type_filter):
1831                return dataset
1832
1833        return None

Return nearest dataset that matches the given type

Starting with this dataset, traverse the hierarchy upwards and return whichever dataset matches the given type.

Parameters
  • str type_filter: Type filter. Can contain wildcards and is matched using fnmatch.fnmatch.
Returns

Earliest matching dataset, or None if none match.

def get_breadcrumbs(self):
1835    def get_breadcrumbs(self):
1836        """
1837        Get breadcrumbs navlink for use in permalinks
1838
1839        Returns a string representing this dataset's genealogy that may be used
1840        to uniquely identify it.
1841
1842        :return str: Nav link
1843        """
1844        if not self.key_parent:
1845            return self.key
1846
1847        genealogy = self.get_genealogy()
1848        return ",".join([d.key for d in genealogy])

Get breadcrumbs navlink for use in permalinks

Returns a string representing this dataset's genealogy that may be used to uniquely identify it.

Returns

Nav link

def get_compatible_processors(self, config=None):
1850    def get_compatible_processors(self, config=None):
1851        """
1852        Get list of processors compatible with this dataset
1853
1854        Checks whether this dataset type is one that is listed as being accepted
1855        by the processor, for each known type: if the processor does not
1856        specify accepted types (via the `is_compatible_with` method), it is
1857        assumed it accepts any top-level datasets
1858
1859        :param ConfigManager|None config:  Configuration reader to determine
1860        compatibility through. This may not be the same reader the dataset was
1861        instantiated with, e.g. when checking whether some other user should
1862        be able to run processors on this dataset.
1863        :return dict:  Compatible processors, `name => class` mapping
1864        """
1865        processors = self.modules.processors
1866
1867        available = {}
1868        for processor_type, processor in processors.items():
1869            if processor.is_from_collector():
1870                continue
1871
1872            own_processor = self.get_own_processor()
1873            if own_processor and own_processor.exclude_followup_processors(
1874                    processor_type
1875            ):
1876                continue
1877
1878            # consider a processor compatible if its is_compatible_with
1879            # method returns True *or* if it has no explicit compatibility
1880            # check and this dataset is top-level (i.e. has no parent)
1881            if (not hasattr(processor, "is_compatible_with") and not self.key_parent) \
1882                    or (hasattr(processor, "is_compatible_with") and processor.is_compatible_with(self, config=config)):
1883                available[processor_type] = processor
1884
1885        return available

Get list of processors compatible with this dataset

Checks whether this dataset type is one that is listed as being accepted by the processor, for each known type: if the processor does not specify accepted types (via the is_compatible_with method), it is assumed it accepts any top-level datasets

Parameters
  • ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
Returns

Compatible processors, name => class mapping

def get_place_in_queue(self, update=False):
1887    def get_place_in_queue(self, update=False):
1888        """
1889        Determine dataset's position in queue
1890
1891        If the dataset is already finished, the position is -1. Else, the
1892        position is the number of datasets to be completed before this one will
1893        be processed. A position of 0 would mean that the dataset is currently
1894        being executed, or that the backend is not running.
1895
1896        :param bool update:  Update the queue position from database if True, else return cached value
1897        :return int:  Queue position
1898        """
1899        if self.is_finished() or not self.data.get("job"):
1900            self._queue_position = -1
1901            return self._queue_position
1902        elif not update and self._queue_position is not None:
1903            # Use cached value
1904            return self._queue_position
1905        else:
1906            # Collect queue position from database via the job
1907            try:
1908                job = Job.get_by_ID(self.data["job"], self.db)
1909                self._queue_position = job.get_place_in_queue()
1910            except JobNotFoundException:
1911                self._queue_position = -1
1912
1913            return self._queue_position

Determine dataset's position in queue

If the dataset is already finished, the position is -1. Else, the position is the number of datasets to be completed before this one will be processed. A position of 0 would mean that the dataset is currently being executed, or that the backend is not running.

Parameters
  • bool update: Update the queue position from database if True, else return cached value
Returns

Queue position

def get_own_processor(self):
1915    def get_own_processor(self):
1916        """
1917        Get the processor class that produced this dataset
1918
1919        :return:  Processor class, or `None` if not available.
1920        """
1921        processor_type = self.parameters.get("type", self.data.get("type"))
1922
1923        return self.modules.processors.get(processor_type)

Get the processor class that produced this dataset

Returns

Processor class, or None if not available.

def get_available_processors(self, config=None, exclude_hidden=False):
1925    def get_available_processors(self, config=None, exclude_hidden=False):
1926        """
1927        Get list of processors that may be run for this dataset
1928
1929        Returns all compatible processors except for those that are already
1930        queued or finished and have no options. Processors that have been
1931        run but have options are included so they may be run again with a
1932        different configuration
1933
1934        :param ConfigManager|None config:  Configuration reader to determine
1935        compatibility through. This may not be the same reader the dataset was
1936        instantiated with, e.g. when checking whether some other user should
1937        be able to run processors on this dataset.
1938        :param bool exclude_hidden:  Exclude processors that should be displayed
1939        in the UI? If `False`, all processors are returned.
1940
1941        :return dict:  Available processors, `name => properties` mapping
1942        """
1943        if self.available_processors:
1944            # Update to reflect exclude_hidden parameter which may be different from last call
1945            # TODO: could children also have been created? Possible bug, but I have not seen anything effected by this
1946            return {
1947                processor_type: processor
1948                for processor_type, processor in self.available_processors.items()
1949                if not exclude_hidden or not processor.is_hidden
1950            }
1951
1952        processors = self.get_compatible_processors(config=config)
1953
1954        for analysis in self.get_children(update=True):
1955            if analysis.type not in processors:
1956                continue
1957
1958            if not processors[analysis.type].get_options(config=config):
1959                # No variable options; this processor has been run so remove
1960                del processors[analysis.type]
1961                continue
1962
1963            if exclude_hidden and processors[analysis.type].is_hidden:
1964                del processors[analysis.type]
1965
1966        self.available_processors = processors
1967        return processors

Get list of processors that may be run for this dataset

Returns all compatible processors except for those that are already queued or finished and have no options. Processors that have been run but have options are included so they may be run again with a different configuration

Parameters
  • ConfigManager|None config: Configuration reader to determine compatibility through. This may not be the same reader the dataset was instantiated with, e.g. when checking whether some other user should be able to run processors on this dataset.
  • bool exclude_hidden: Exclude processors that should be displayed in the UI? If False, all processors are returned.
Returns

Available processors, name => properties mapping

def get_parent(self):
2010    def get_parent(self):
2011        """
2012        Get parent dataset
2013
2014        :return DataSet:  Parent dataset, or `None` if not applicable
2015        """
2016        return (
2017            DataSet(key=self.key_parent, db=self.db, modules=self.modules)
2018            if self.key_parent
2019            else None
2020        )

Get parent dataset

Returns

Parent dataset, or None if not applicable

def detach(self):
2022    def detach(self):
2023        """
2024        Makes the datasets standalone, i.e. not having any source_dataset dataset
2025        """
2026        self.link_parent("")

Makes the datasets standalone, i.e. not having any source_dataset dataset

def is_dataset(self):
2028    def is_dataset(self):
2029        """
2030        Easy way to confirm this is a dataset.
2031        Used for checking processor and dataset compatibility,
2032        which needs to handle both processors and datasets.
2033        """
2034        return True

Easy way to confirm this is a dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.

def is_top_dataset(self):
2036    def is_top_dataset(self):
2037        """
2038        Easy way to confirm this is a top dataset.
2039        Used for checking processor and dataset compatibility,
2040        which needs to handle both processors and datasets.
2041        """
2042        if self.key_parent:
2043            return False
2044        return True

Easy way to confirm this is a top dataset. Used for checking processor and dataset compatibility, which needs to handle both processors and datasets.

def is_expiring(self, config):
2046    def is_expiring(self, config):
2047        """
2048        Determine if dataset is set to expire
2049
2050        Similar to `is_expired`, but checks if the dataset will be deleted in
2051        the future, not if it should be deleted right now.
2052
2053        :param ConfigManager config:  Configuration reader (context-aware)
2054        :return bool|int:  `False`, or the expiration date as a Unix timestamp.
2055        """
2056        # has someone opted out of deleting this?
2057        if self.parameters.get("keep"):
2058            return False
2059
2060        # is this dataset explicitly marked as expiring after a certain time?
2061        if self.parameters.get("expires-after"):
2062            return self.parameters.get("expires-after")
2063
2064        # is the data source configured to have its datasets expire?
2065        expiration = config.get("datasources.expiration", {})
2066        if not expiration.get(self.parameters.get("datasource")):
2067            return False
2068
2069        # is there a timeout for this data source?
2070        if expiration.get(self.parameters.get("datasource")).get("timeout"):
2071            return self.timestamp + expiration.get(
2072                self.parameters.get("datasource")
2073            ).get("timeout")
2074
2075        return False

Determine if dataset is set to expire

Similar to is_expired, but checks if the dataset will be deleted in the future, not if it should be deleted right now.

Parameters
  • ConfigManager config: Configuration reader (context-aware)
Returns

False, or the expiration date as a Unix timestamp.

def is_expired(self, config):
2077    def is_expired(self, config):
2078        """
2079        Determine if dataset should be deleted
2080
2081        Datasets can be set to expire, but when they should be deleted depends
2082        on a number of factor. This checks them all.
2083
2084        :param ConfigManager config:  Configuration reader (context-aware)
2085        :return bool:
2086        """
2087        # has someone opted out of deleting this?
2088        if not self.is_expiring(config):
2089            return False
2090
2091        # is this dataset explicitly marked as expiring after a certain time?
2092        future = (
2093                time.time() + 3600
2094        )  # ensure we don't delete datasets with invalid expiration times
2095        if (
2096                self.parameters.get("expires-after")
2097                and convert_to_int(self.parameters["expires-after"], future) < time.time()
2098        ):
2099            return True
2100
2101        # is the data source configured to have its datasets expire?
2102        expiration = config.get("datasources.expiration", {})
2103        if not expiration.get(self.parameters.get("datasource")):
2104            return False
2105
2106        # is the dataset older than the set timeout?
2107        if expiration.get(self.parameters.get("datasource")).get("timeout"):
2108            return (
2109                    self.timestamp
2110                    + expiration[self.parameters.get("datasource")]["timeout"]
2111                    < time.time()
2112            )
2113
2114        return False

Determine if dataset should be deleted

Datasets can be set to expire, but when they should be deleted depends on a number of factor. This checks them all.

Parameters
  • ConfigManager config: Configuration reader (context-aware)
Returns
def is_from_collector(self):
2116    def is_from_collector(self):
2117        """
2118        Check if this dataset was made by a processor that collects data, i.e.
2119        a search or import worker.
2120
2121        :return bool:
2122        """
2123        return self.type.endswith("-search") or self.type.endswith("-import")

Check if this dataset was made by a processor that collects data, i.e. a search or import worker.

Returns
def get_extension(self):
2125    def get_extension(self):
2126        """
2127        Gets the file extension this dataset produces.
2128        Also checks whether the results file exists.
2129        Used for checking processor and dataset compatibility.
2130
2131        :return str extension:  Extension, e.g. `csv`
2132        """
2133        if self.get_results_path().exists():
2134            return self.get_results_path().suffix[1:]
2135
2136        return False

Gets the file extension this dataset produces. Also checks whether the results file exists. Used for checking processor and dataset compatibility.

Returns

Extension, e.g. csv

def is_filter(self):
2138    def is_filter(self):
2139        """
2140        Check whether a dataset is a filter dataset.
2141
2142        :return bool:  True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown).
2143        """
2144        own_processor = self.get_own_processor()
2145        if own_processor is None:
2146            # Deprecated datasets do not have a processor
2147            return None
2148        return own_processor.is_filter()

Check whether a dataset is a filter dataset.

Returns

True if the dataset is a filter dataset, False otherwise. None if deprecated (i.e., filter status unknown).

def get_media_type(self):
2150    def get_media_type(self):
2151        """
2152        Gets the media type of the dataset file.
2153
2154        :return str: media type, e.g., "text"
2155        """
2156        own_processor = self.get_own_processor()
2157        if hasattr(self, "media_type"):
2158            # media type can be defined explicitly in the dataset; this is the priority
2159            return self.media_type
2160        elif own_processor is not None:
2161            # or media type can be defined in the processor
2162            # some processors can set different media types for different datasets (e.g., import_media)
2163            if hasattr(own_processor, "media_type"):
2164                return own_processor.media_type
2165
2166        # Default to text
2167        return self.parameters.get("media_type", "text")

Gets the media type of the dataset file.

Returns

media type, e.g., "text"

def get_metadata(self):
2169    def get_metadata(self):
2170        """
2171        Get dataset metadata
2172
2173        This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended
2174        as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to
2175        update its database (and ensure compatibility with the exporting version of 4CAT).
2176        """
2177        metadata = self.db.fetchone(
2178            "SELECT * FROM datasets WHERE key = %s", (self.key,)
2179        )
2180
2181        # get 4CAT version (presumably to ensure export is compatible with import)
2182        metadata["current_4CAT_version"] = get_software_version()
2183        return metadata

Get dataset metadata

This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to update its database (and ensure compatibility with the exporting version of 4CAT).

def get_result_url(self):
2185    def get_result_url(self):
2186        """
2187        Gets the 4CAT frontend URL of a dataset file.
2188
2189        Uses the FlaskConfig attributes (i.e., SERVER_NAME and
2190        SERVER_HTTPS) plus hardcoded '/result/'.
2191        TODO: create more dynamic method of obtaining url.
2192        """
2193        filename = self.get_results_path().name
2194
2195        # we cheat a little here by using the modules' config reader, but these
2196        # will never be context-dependent values anyway
2197        url_to_file = ('https://' if self.modules.config.get("flask.https") else 'http://') + \
2198                      self.modules.config.get("flask.server_name") + '/result/' + filename
2199        return url_to_file

Gets the 4CAT frontend URL of a dataset file.

Uses the FlaskConfig attributes (i.e., SERVER_NAME and SERVER_HTTPS) plus hardcoded '/result/'. TODO: create more dynamic method of obtaining url.

def warn_unmappable_item( self, item_count, processor=None, error_message=None, warn_admins=True):
2201    def warn_unmappable_item(
2202            self, item_count, processor=None, error_message=None, warn_admins=True
2203    ):
2204        """
2205        Log an item that is unable to be mapped and warn administrators.
2206
2207        :param int item_count:			Item index
2208        :param Processor processor:		Processor calling function8
2209        """
2210        dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"
2211
2212        # Use processing dataset if available, otherwise use original dataset (which likely already has this error message)
2213        closest_dataset = (
2214            processor.dataset
2215            if processor is not None and processor.dataset is not None
2216            else self
2217        )
2218        # Log error to dataset log
2219        closest_dataset.log(dataset_error_message)
2220
2221        if warn_admins:
2222            if processor is not None:
2223                processor.log.warning(
2224                    f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}."
2225                )
2226            elif hasattr(self.db, "log"):
2227                # borrow the database's log handler
2228                self.db.log.warning(
2229                    f"Unable to map item all items for dataset {closest_dataset.key}."
2230                )
2231            else:
2232                # No other log available
2233                raise DataSetException(
2234                    f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn"
2235                )

Log an item that is unable to be mapped and warn administrators.

Parameters
  • int item_count: Item index
  • Processor processor: Processor calling function8
def has_annotations(self) -> bool:
2238    def has_annotations(self) -> bool:
2239        """
2240        Whether this dataset has annotations
2241        """
2242
2243        annotation = self.db.fetchone("SELECT * FROM annotations WHERE dataset = %s LIMIT 1", (self.key,))
2244
2245        return True if annotation else False

Whether this dataset has annotations

def num_annotations(self) -> int:
2247    def num_annotations(self) -> int:
2248        """
2249        Get the amount of annotations
2250        """
2251        return self.db.fetchone(
2252            "SELECT COUNT(*) FROM annotations WHERE dataset = %s", (self.key,)
2253        )["count"]

Get the amount of annotations

def get_annotation(self, data: dict):
2255    def get_annotation(self, data: dict):
2256        """
2257        Retrieves a specific annotation if it exists.
2258
2259        :param data:		A dictionary with which to get the annotations from.
2260                                                To get specific annotations, include either an `id` field or
2261                                                `field_id` and `item_id` fields.
2262
2263        return Annotation:	Annotation object.
2264        """
2265
2266        if "id" not in data or ("field_id" not in data and "item_id" not in data):
2267            return None
2268
2269        if "dataset" not in data:
2270            data["dataset"] = self.key
2271
2272        return Annotation(data=data, db=self.db)

Retrieves a specific annotation if it exists.

Parameters
  • data: A dictionary with which to get the annotations from. To get specific annotations, include either an id field or field_id and item_id fields.

return Annotation: Annotation object.

def get_annotations(self) -> list:
2274    def get_annotations(self) -> list:
2275        """
2276        Retrieves all annotations for this dataset.
2277
2278        return list: 	List of Annotation objects.
2279        """
2280
2281        return Annotation.get_annotations_for_dataset(self.db, self.key)

Retrieves all annotations for this dataset.

return list: List of Annotation objects.

def get_annotations_for_item(self, item_id: str | list, before=0) -> list:
2283    def get_annotations_for_item(self, item_id: str | list, before=0) -> list:
2284        """
2285        Retrieves all annotations from this dataset for a specific item (e.g. social media post).
2286        :param str item_id:  The ID of the annotation item
2287        :param int before:   The upper timestamp range for annotations.
2288        """
2289        return Annotation.get_annotations_for_dataset(
2290            self.db, self.key, item_id=item_id, before=before
2291        )

Retrieves all annotations from this dataset for a specific item (e.g. social media post).

Parameters
  • str item_id: The ID of the annotation item
  • int before: The upper timestamp range for annotations.
def has_annotation_fields(self) -> bool:
2293    def has_annotation_fields(self) -> bool:
2294        """
2295        Returns True if there's annotation fields saved tot the dataset table
2296        Annotation fields are metadata that describe a type of annotation (with info on `id`, `type`, etc.).
2297        """
2298
2299        return True if self.annotation_fields else False

Returns True if there's annotation fields saved tot the dataset table Annotation fields are metadata that describe a type of annotation (with info on id, type, etc.).

def get_annotation_field_labels(self) -> list:
2301    def get_annotation_field_labels(self) -> list:
2302        """
2303        Retrieves the saved annotation field labels for this dataset.
2304        These are stored in the annotations table.
2305
2306        :return list: List of annotation field labels.
2307        """
2308
2309        annotation_fields = self.annotation_fields
2310
2311        if not annotation_fields:
2312            return []
2313
2314        labels = [v["label"] for v in annotation_fields.values()]
2315
2316        return labels

Retrieves the saved annotation field labels for this dataset. These are stored in the annotations table.

Returns

List of annotation field labels.

def save_annotations(self, annotations: list) -> int:
2318    def save_annotations(self, annotations: list) -> int:
2319        """
2320        Takes a list of annotations and saves them to the annotations table.
2321        If a field is not yet present in the `annotation_fields` column in
2322        the datasets table, it also adds it there.
2323
2324        :param list annotations:		List of dictionaries with annotation items. Must have `item_id`, `field_id`,
2325                                                                        and `label`.
2326                                                                        `item_id` is for the specific item being annotated (e.g. a social media post)
2327                                                                        `field_id` refers to the annotation field.
2328                                                                        `label` is a human-readable description of this annotation.
2329                                                                        E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd",
2330                                                                         "value": "Yes"}]
2331
2332        :returns int:					How many annotations were saved.
2333
2334        """
2335
2336        if not annotations:
2337            return 0
2338
2339        count = 0
2340        annotation_fields = self.annotation_fields
2341
2342        # Add some dataset data to annotations, if not present
2343        for annotation_data in annotations:
2344            # Check if the required fields are present
2345            if not annotation_data.get("item_id"):
2346                raise AnnotationException(
2347                    "Can't save annotations; annotation must have an `item_id` referencing "
2348                    "the item it annotated, got %s" % annotation_data
2349                )
2350            if not annotation_data.get("field_id"):
2351                raise AnnotationException(
2352                    "Can't save annotations; annotation must have a `field_id` field, "
2353                    "got %s" % annotation_data
2354                )
2355            if not annotation_data.get("label") or not isinstance(
2356                    annotation_data["label"], str
2357            ):
2358                raise AnnotationException(
2359                    "Can't save annotations; annotation must have a `label` field, "
2360                    "got %s" % annotation_data
2361                )
2362
2363            # Set dataset key
2364            if not annotation_data.get("dataset"):
2365                annotation_data["dataset"] = self.key
2366
2367            # Set default author to this dataset owner
2368            # If this annotation is made by a processor, it will have the processor name
2369            if not annotation_data.get("author"):
2370                annotation_data["author"] = self.get_owners()[0]
2371
2372            # Create Annotation object, which also saves it to the database
2373            # If this dataset/item_id/field_id combination already exists, this retrieves the
2374            # existing data and updates it with new values.
2375            Annotation(data=annotation_data, db=self.db)
2376            count += 1
2377
2378        # Save annotation fields if things changed
2379        if annotation_fields != self.annotation_fields:
2380            self.save_annotation_fields(annotation_fields)
2381
2382        return count

Takes a list of annotations and saves them to the annotations table. If a field is not yet present in the annotation_fields column in the datasets table, it also adds it there.

Parameters
  • list annotations: List of dictionaries with annotation items. Must have item_id, field_id, and label. item_id is for the specific item being annotated (e.g. a social media post) field_id refers to the annotation field. label is a human-readable description of this annotation. E.g.: [{"item_id": "12345", "label": "Valid", "field_id": "123asd", "value": "Yes"}]

:returns int: How many annotations were saved.

def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2384    def save_annotation_fields(self, new_fields: dict, add=False) -> int:
2385        """
2386        Save annotation field data to the datasets table (in the `annotation_fields` column).
2387        If changes to the annotation fields affect existing annotations,
2388        this function will also call `update_annotations_via_fields()` to change them.
2389
2390        :param dict new_fields:  		New annotation fields, with a field ID as key.
2391
2392        :param bool add:				Whether we're merely adding new fields
2393                                                                        or replacing the whole batch. If add is False,
2394                                                                        `new_fields` should contain all fields.
2395
2396        :return int:					The number of annotation fields saved.
2397
2398        """
2399
2400        # Get existing annotation fields to see if stuff changed.
2401        old_fields = self.annotation_fields
2402        changes = False
2403
2404        # Annotation field must be valid JSON.
2405        try:
2406            json.dumps(new_fields)
2407        except ValueError:
2408            raise AnnotationException(
2409                "Can't save annotation fields: not valid JSON (%s)" % new_fields
2410            )
2411
2412        # No duplicate IDs
2413        if len(new_fields) != len(set(new_fields)):
2414            raise AnnotationException(
2415                "Can't save annotation fields: field IDs must be unique"
2416            )
2417
2418        # Annotation fields must at minimum have `type` and `label` keys.
2419        for field_id, annotation_field in new_fields.items():
2420            if not isinstance(field_id, str):
2421                raise AnnotationException(
2422                    "Can't save annotation fields: field ID %s is not a valid string"
2423                    % field_id
2424                )
2425            if "label" not in annotation_field:
2426                raise AnnotationException(
2427                    "Can't save annotation fields: all fields must have a label"
2428                    % field_id
2429                )
2430            if "type" not in annotation_field:
2431                raise AnnotationException(
2432                    "Can't save annotation fields: all fields must have a type"
2433                    % field_id
2434                )
2435
2436        # Check if fields are removed
2437        if not add and old_fields:
2438            for field_id in old_fields.keys():
2439                if field_id not in new_fields:
2440                    changes = True
2441
2442        # Make sure to do nothing to processor-generated annotations; these must remain 'traceable' to their origin
2443        # dataset
2444        for field_id in new_fields.keys():
2445            if field_id in old_fields and old_fields[field_id].get("from_dataset"):
2446                old_fields[field_id]["label"] = new_fields[field_id][
2447                    "label"
2448                ]  # Only labels could've been changed
2449                new_fields[field_id] = old_fields[field_id]
2450
2451        # If we're just adding fields, add them to the old fields.
2452        # If the field already exists, overwrite the old field.
2453        if add and old_fields:
2454            all_fields = old_fields
2455            for field_id, annotation_field in new_fields.items():
2456                all_fields[field_id] = annotation_field
2457            new_fields = all_fields
2458
2459        # We're saving the new annotation fields as-is.
2460        # Ordering of fields is preserved this way.
2461        self.db.update("datasets", where={"key": self.key}, data={"annotation_fields": json.dumps(new_fields)})
2462        self.annotation_fields = new_fields
2463
2464        # If anything changed with the annotation fields, possibly update
2465        # existing annotations (e.g. to delete them or change their labels).
2466        if changes:
2467            Annotation.update_annotations_via_fields(
2468                self.key, old_fields, new_fields, self.db
2469            )
2470
2471        return len(new_fields)

Save annotation field data to the datasets table (in the annotation_fields column). If changes to the annotation fields affect existing annotations, this function will also call update_annotations_via_fields() to change them.

Parameters
  • dict new_fields: New annotation fields, with a field ID as key.

  • bool add: Whether we're merely adding new fields or replacing the whole batch. If add is False, new_fields should contain all fields.

Returns
                                The number of annotation fields saved.
def get_annotation_metadata(self) -> dict:
2473    def get_annotation_metadata(self) -> dict:
2474        """
2475        Retrieves all the data for this dataset from the annotations table.
2476        """
2477
2478        annotation_data = self.db.fetchall(
2479            "SELECT * FROM annotations WHERE dataset = '%s';" % self.key
2480        )
2481        return annotation_data

Retrieves all the data for this dataset from the annotations table.