Edit on GitHub

backend.lib.processor

Basic post-processor worker - should be inherited by workers to post-process results

   1"""
   2Basic post-processor worker - should be inherited by workers to post-process results
   3"""
   4import traceback
   5import zipfile
   6import typing
   7import shutil
   8import abc
   9import csv
  10import os
  11import re
  12import time
  13
  14from pathlib import PurePath
  15
  16from backend.lib.worker import BasicWorker
  17from common.lib.dataset import DataSet
  18from common.lib.fourcat_module import FourcatModule
  19from common.lib.helpers import get_software_commit, remove_nuls, send_email, hash_to_md5
  20from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException,
  21                                   DataSetException, MapItemException, AnnotationException)
  22from common.config_manager import ConfigWrapper
  23from common.lib.user import User
  24
  25csv.field_size_limit(1024 * 1024 * 1024)
  26
  27
  28class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
  29    """
  30    Abstract processor class
  31
  32    A processor takes a finished dataset as input and processes its result in
  33    some way, with another dataset set as output. The input thus is a file, and
  34    the output (usually) as well. In other words, the result of a processor can
  35    be used as input for another processor (though whether and when this is
  36    useful is another question).
  37
  38    To determine whether a processor can process a given dataset, you can
  39    define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class
  40    method which takes a dataset as argument and returns a bool that determines
  41    if this processor is considered compatible with that dataset. For example:
  42
  43    .. code-block:: python
  44
  45        @classmethod
  46        def is_compatible_with(cls, module=None, config=None):
  47            return module.type == "linguistic-features"
  48
  49
  50    """
  51
  52    #: Database handler to interface with the 4CAT database
  53    db = None
  54
  55    #: Job object that requests the execution of this processor
  56    job = None
  57
  58    #: The dataset object that the processor is *creating*.
  59    dataset = None
  60
  61    #: Owner (username) of the dataset
  62    owner = None
  63
  64    #: The dataset object that the processor is *processing*.
  65    source_dataset = None
  66
  67    #: The file that is being processed
  68    source_file = None
  69
  70    #: Processor description, which will be displayed in the web interface
  71    description = "No description available"
  72
  73    #: Category identifier, used to group processors in the web interface
  74    category = "Other"
  75
  76    #: Extension of the file created by the processor
  77    extension = "csv"
  78
  79    #: 4CAT settings from the perspective of the dataset's owner
  80    config = None
  81
  82    #: Is this processor running 'within' a preset processor?
  83    is_running_in_preset = False
  84
  85    #: Is this processor hidden in the front-end, and only used internally/in presets?
  86    is_hidden = False
  87
  88    #: This will be defined automatically upon loading the processor. There is
  89    #: no need to override manually
  90    filepath = None
  91
  92    #: This will be a list; files added to it are deleted after the processor
  93    #: terminates, even on failure, if they still exist at that point. Add
  94    #: path objects or dataset objects; for datasets, the
  95    #: `remove_disposable_files()` method will be called.
  96    for_cleanup = None
  97
  98    def work(self):
  99        """
 100        Process a dataset
 101
 102        Loads dataset metadata, sets up the scaffolding for performing some kind
 103        of processing on that dataset, and then processes it. Afterwards, clean
 104        up.
 105        """
 106        try:
 107            # a dataset can have multiple owners, but the creator is the user
 108            # that actually queued the processor, so their config is relevant
 109            self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
 110            self.owner = self.dataset.creator
 111        except DataSetException:
 112            # query has been deleted in the meantime. finish without error,
 113            # as deleting it will have been a conscious choice by a user
 114            self.job.finish()
 115            return
 116
 117        # set up config reader wrapping the worker's config manager, which is
 118        # in turn the one passed to it by the WorkerManager, which is the one
 119        # originally loaded in bootstrap
 120        self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))
 121
 122        if self.dataset.data.get("key_parent", None):
 123            # search workers never have parents (for now), so we don't need to
 124            # find out what the source_dataset dataset is if it's a search worker
 125            try:
 126                self.source_dataset = self.dataset.get_parent()
 127
 128                # for presets, transparently use the *top* dataset as a source_dataset
 129                # since that is where any underlying processors should get
 130                # their data from. However, this should only be done as long as the
 131                # preset is not finished yet, because after that there may be processors
 132                # that run on the final preset result
 133                while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
 134                    self.is_running_in_preset = True
 135                    self.source_dataset = self.source_dataset.get_parent()
 136                    if self.source_dataset is None:
 137                        # this means there is no dataset that is *not* a preset anywhere
 138                        # above this dataset. This should never occur, but if it does, we
 139                        # cannot continue
 140                        self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
 141                                       (self.type, self.dataset.key))
 142                        self.job.finish()
 143                        return
 144
 145            except DataSetException:
 146                # we need to know what the source_dataset dataset was to properly handle the
 147                # analysis
 148                self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
 149                    self.type, self.dataset.key))
 150                self.job.finish()
 151                return
 152
 153            if not self.source_dataset.is_finished() and not self.is_running_in_preset:
 154                # not finished yet - retry after a while
 155                # exception for presets, since these *should* be unfinished
 156                # until underlying processors are done
 157                self.job.release(delay=30)
 158                return
 159
 160            self.source_file = self.source_dataset.get_results_path()
 161            if not self.source_file.exists():
 162                self.dataset.update_status("Finished, no input data found.")
 163
 164        self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
 165
 166        processor_name = self.title if hasattr(self, "title") else self.type
 167        self.dataset.clear_log()
 168        self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
 169
 170        # start log file
 171        self.dataset.update_status("Processing data")
 172        self.dataset.update_version(get_software_commit(self))
 173
 174        # we may create temporary files with the processor; anything in here
 175        # will be deleted after the processor ends (or crashes!). dataset
 176        # objects will have their cleanup methods called
 177        self.for_cleanup = [self.dataset]
 178        if self.source_dataset is not None:
 179            # Add source dataset to cleanup list to remove disposable files
 180            self.for_cleanup.append(self.source_dataset)
 181
 182        # get parameters
 183        # if possible, fill defaults where parameters are not provided
 184        given_parameters = self.dataset.parameters.copy()
 185        all_parameters = self.get_options(self.dataset, config=self.config)
 186        self.parameters = {
 187            param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
 188            for param in [*all_parameters.keys(), *given_parameters.keys()]
 189        }
 190
 191        # now the parameters have been loaded into memory, clear any sensitive
 192        # ones. This has a side-effect that a processor may not run again
 193        # without starting from scratch, but this is the price of progress
 194        options = self.get_options(self.dataset.get_parent(), config=self.config)
 195        for option, option_settings in options.items():
 196            if option_settings.get("sensitive"):
 197                self.dataset.delete_parameter(option)
 198
 199        if self.interrupted:
 200            self.dataset.log("Processing interrupted, trying again later")
 201            return self.abort()
 202
 203        if not self.dataset.is_finished():
 204            try:
 205                self.process()
 206                self.after_process()
 207                
 208                # processors should usually finish their jobs by themselves, but if
 209                # the worker finished without errors, the job can be finished in
 210                # any case
 211                if not self.job.is_finished:
 212                    self.job.finish()
 213            except WorkerInterruptedException as e:
 214                self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
 215                self.abort()
 216            except Exception as e:
 217                self.dataset.log("Processor crashed (%s), trying again later" % str(e))
 218                stack = traceback.extract_tb(e.__traceback__)
 219                frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]]
 220                location = "->".join(frames)
 221
 222                # Not all datasets have source_dataset keys
 223                if len(self.dataset.get_genealogy()) > 1:
 224                    parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
 225                else:
 226                    parent_key = ""
 227                
 228                # Clean up partially created datasets/files
 229                self.clean_up_on_error()
 230
 231                raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
 232                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack)
 233
 234            finally:
 235                # clean up files that have been created and marked as disposable
 236                for item in self.for_cleanup:
 237                    if type(item) is DataSet:
 238                        item.remove_disposable_files()
 239                    elif item.exists():
 240                        shutil.rmtree(item, ignore_errors=True)
 241        else:
 242            # dataset already finished, job shouldn't be open anymore
 243            self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (
 244            self.job.data["jobtype"], self.job.data["remote_id"]))
 245            self.job.finish()
 246
 247    def after_process(self):
 248        """
 249        Run after processing the dataset
 250
 251        This method cleans up temporary files, and if needed, handles logistics
 252        concerning the result file, e.g. running a pre-defined processor on the
 253        result, copying it to another dataset, and so on.
 254        """
 255        if self.dataset.data["num_rows"] > 0:
 256            self.dataset.update_status("Dataset completed.")
 257
 258        if not self.dataset.is_finished():
 259            self.dataset.finish()
 260
 261        # see if we have anything else lined up to run next
 262        for next in self.parameters.get("next", []):
 263            can_run_next = True
 264            next_parameters = next.get("parameters", {})
 265            next_type = next.get("type", "")
 266            try:
 267                available_processors = self.dataset.get_available_processors(config=self.config)
 268            except ValueError:
 269                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
 270                break
 271
 272
 273            # run it only if the post-processor is actually available for this query
 274            if self.dataset.data["num_rows"] <= 0:
 275                can_run_next = False
 276                self.log.info(
 277                    "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (
 278                        next_type, self.dataset.key))
 279
 280            elif next_type in available_processors:
 281                next_analysis = DataSet(
 282                    parameters=next_parameters,
 283                    type=next_type,
 284                    db=self.db,
 285                    parent=self.dataset.key,
 286                    extension=available_processors[next_type].extension,
 287                    is_private=self.dataset.is_private,
 288                    owner=self.dataset.creator,
 289                    modules=self.modules
 290                )
 291                # copy ownership from parent dataset
 292                next_analysis.copy_ownership_from(self.dataset)
 293                # add to queue
 294                self.queue.add_job(self.modules.processors[next_type], dataset=next_analysis)
 295            else:
 296                can_run_next = False
 297                self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (
 298                self.dataset.key, self.type, next_type))
 299
 300            if not can_run_next:
 301                # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
 302                # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
 303                if "attach_to" in self.parameters:
 304                    # Probably should not happen, but for some reason a mid processor has been designated as the processor
 305                    # the parent should attach to
 306                    pass
 307                else:
 308                    # Check for "attach_to" parameter in descendents
 309                    while True:
 310                        if "attach_to" in next_parameters:
 311                            self.parameters["attach_to"] = next_parameters["attach_to"]
 312                            break
 313                        else:
 314                            if "next" in next_parameters:
 315                                next_parameters = next_parameters["next"][0]["parameters"]
 316                            else:
 317                                # No more descendents
 318                                # Should not happen; we cannot find the source dataset
 319                                self.log.warning(
 320                                    "Cannot find preset's source dataset for dataset %s" % self.dataset.key)
 321                                break
 322
 323        # see if we need to register the result somewhere
 324        if "copy_to" in self.parameters:
 325            # copy the results to an arbitrary place that was passed
 326            if self.dataset.get_results_path().exists():
 327                shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
 328            else:
 329                # if copy_to was passed, that means it's important that this
 330                # file exists somewhere, so we create it as an empty file
 331                with open(self.parameters["copy_to"], "w") as empty_file:
 332                    empty_file.write("")
 333
 334        # see if this query chain is to be attached to another query
 335        # if so, the full genealogy of this query (minus the original dataset)
 336        # is attached to the given query - this is mostly useful for presets,
 337        # where a chain of processors can be marked as 'underlying' a preset
 338        if "attach_to" in self.parameters:
 339            try:
 340                # copy metadata and results to the surrogate
 341                surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules)
 342
 343                if self.dataset.get_results_path().exists():
 344                    # Update the surrogate's results file suffix to match this dataset's suffix
 345                    surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(
 346                        self.dataset.get_results_path().suffix)
 347                    shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
 348
 349                try:
 350                    surrogate.finish(self.dataset.data["num_rows"])
 351                except RuntimeError:
 352                    # already finished, could happen (though it shouldn't)
 353                    pass
 354
 355                surrogate.update_status(self.dataset.get_status())
 356
 357            except DataSetException:
 358                # dataset with key to attach to doesn't exist...
 359                self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have "
 360                                 "been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"]))
 361
 362        self.job.finish()
 363
 364        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 365            owner = self.dataset.get_parameters().get("email-complete", False)
 366            # Check that username is email address
 367            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 368                from email.mime.multipart import MIMEMultipart
 369                from email.mime.text import MIMEText
 370                from smtplib import SMTPException
 371                import socket
 372                import html2text
 373
 374        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 375            owner = self.dataset.get_parameters().get("email-complete", False)
 376            # Check that username is email address
 377            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 378                from email.mime.multipart import MIMEMultipart
 379                from email.mime.text import MIMEText
 380                from smtplib import SMTPException
 381                import socket
 382                import html2text
 383
 384                self.log.debug("Sending email to %s" % owner)
 385                dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
 386                sender = self.config.get('mail.noreply')
 387                message = MIMEMultipart("alternative")
 388                message["From"] = sender
 389                message["To"] = owner
 390                message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
 391                mail = """
 392                    <p>Hello %s,</p>
 393                    <p>4CAT has finished collecting your %s dataset labeled: %s</p>
 394                    <p>You can view your dataset via the following link:</p>
 395                    <p><a href="%s">%s</a></p> 
 396                    <p>Sincerely,</p>
 397                    <p>Your friendly neighborhood 4CAT admin</p>
 398                    """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
 399                html_parser = html2text.HTML2Text()
 400                message.attach(MIMEText(html_parser.handle(mail), "plain"))
 401                message.attach(MIMEText(mail, "html"))
 402                try:
 403                    send_email([owner], message, self.config)
 404                except (SMTPException, ConnectionRefusedError, socket.timeout):
 405                    self.log.error("Error sending email to %s" % owner)
 406
 407    def clean_up_on_error(self):
 408        try:
 409            # ensure proxied requests are stopped
 410            self.flush_proxied_requests()
 411            # delete annotations that have been generated as part of this processor
 412            self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True)
 413            # Remove the results file that was created
 414            if self.dataset.get_results_path().exists():
 415                self.dataset.get_results_path().unlink()
 416            if self.dataset.get_results_folder_path().exists():
 417                shutil.rmtree(self.dataset.get_results_folder_path())
 418
 419        except Exception as e:
 420            self.log.error("Error during processor cleanup after error: %s" % str(e))
 421
 422    def abort(self):
 423        """
 424        Abort dataset creation and clean up so it may be attempted again later
 425        """
 426        self.clean_up_on_error()
 427
 428        # we release instead of finish, since interrupting is just that - the
 429        # job should resume at a later point. Delay resuming by 10 seconds to
 430        # give 4CAT the time to do whatever it wants (though usually this isn't
 431        # needed since restarting also stops the spawning of new workers)
 432        if self.interrupted == self.INTERRUPT_RETRY:
 433            # retry later - wait at least 10 seconds to give the backend time to shut down
 434            self.job.release(delay=10)
 435        elif self.interrupted == self.INTERRUPT_CANCEL:
 436            # cancel job
 437            self.job.finish()
 438
 439    def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
 440        """
 441        Request an iterable of URLs and return results
 442
 443        This method takes an iterable yielding URLs and yields the result for
 444        a GET request for that URL in return. This is done through the worker
 445        manager's DelegatedRequestHandler, which can run multiple requests in
 446        parallel and divide them over the proxies configured in 4CAT (if any).
 447        Proxy cooloff and queueing is shared with other processors, so that a
 448        processor will never accidentally request from the same site as another
 449        processor, potentially triggering rate limits.
 450
 451        :param urls:  Something that can be iterated over and yields URLs
 452        :param kwargs:  Other keyword arguments are passed on to `add_urls`
 453        and eventually to `requests.get()`.
 454        :param bool preserve_order:  Return items in the original order. Use
 455        `False` to potentially speed up processing, if order is not important.
 456        :return:  A generator yielding request results, i.e. tuples of a
 457        URL and a `requests` response objects
 458        """
 459        queue_name = self._proxy_queue_name()
 460        delegator = self.manager.proxy_delegator
 461
 462        delegator.refresh_settings(self.config)
 463
 464        # 50 is an arbitrary batch size - but we top up every 0.05s, so
 465        # that should be sufficient
 466        batch_size = 50
 467
 468        # we need an iterable, so we can use next() and StopIteration
 469        urls = iter(urls)
 470
 471        have_urls = True
 472        while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls:
 473            if queue_length < batch_size and have_urls:
 474                batch = []
 475                while len(batch) < (batch_size - queue_length):
 476                    try:
 477                        batch.append(next(urls))
 478                    except StopIteration:
 479                        have_urls = False
 480                        break
 481
 482                delegator.add_urls(batch, queue_name, **kwargs)
 483
 484            time.sleep(0.05)  # arbitrary...
 485            for url, result in delegator.get_results(queue_name, preserve_order=preserve_order):
 486                # result may also be a FailedProxiedRequest!
 487                # up to the processor to decide how to deal with it
 488                yield url, result
 489
 490    def push_proxied_request(self, url, position=-1, **kwargs):
 491        """
 492        Add a single URL to the proxied requests queue
 493
 494        :param str url:  URL to add
 495        :param position:  Position to add to queue; can be used to add priority
 496        requests, adds to end of queue by default
 497        :param kwargs:
 498        """
 499        self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)
 500
 501    def flush_proxied_requests(self):
 502        """
 503        Get rid of remaining proxied requests
 504
 505        Can be used if enough results are available and any remaining ones need
 506        to be stopped ASAP and are otherwise unneeded.
 507
 508        Blocking!
 509        """
 510        self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())
 511
 512    def _proxy_queue_name(self):
 513        """
 514        Get proxy queue name
 515
 516        For internal use.
 517
 518        :return str:
 519        """
 520        return f"{self.type}-{self.dataset.key}"
 521
 522    def unpack_archive_contents(self, path, staging_area=None):
 523        """
 524        Unpack all files in an archive to a staging area
 525
 526        With every iteration, the processor's 'interrupted' flag is checked,
 527        and if set a ProcessorInterruptedException is raised, which by default
 528        is caught and subsequently stops execution gracefully.
 529
 530        Files are unzipped to a staging area. The staging area is *not*
 531        cleaned up automatically.
 532
 533        :param Path path:     Path to zip file to read
 534        :param Path staging_area:  Where to store the files while they're
 535          being worked with. If omitted, a temporary folder is created and
 536          deleted after use
 537        :param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
 538        :return Path:  A path to the staging area
 539        """
 540
 541        if not path.exists():
 542            return
 543
 544        if not staging_area:
 545            staging_area = self.dataset.get_staging_area()
 546
 547        if not staging_area.exists() or not staging_area.is_dir():
 548            raise RuntimeError("Staging area %s is not a valid folder")
 549
 550        paths = []
 551        with zipfile.ZipFile(path, "r") as archive_file:
 552            archive_contents = sorted(archive_file.namelist())
 553
 554            for archived_file in archive_contents:
 555                if self.interrupted:
 556                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
 557
 558                file_name = archived_file.split("/")[-1]
 559                temp_file = staging_area.joinpath(file_name)
 560                archive_file.extract(archived_file, staging_area)
 561                paths.append(temp_file)
 562
 563        return staging_area
 564
 565    def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
 566        """
 567        Extract a file from an archive by name
 568
 569        :param str filename:  Name of file to extract
 570        :param Path archive_path:  Path to zip file to read
 571        :param Path staging_area:  Where to store the files while they're
 572                  being worked with. If omitted, a temporary folder is created
 573        :return Path:  A path to the extracted file
 574        """
 575        if not archive_path.exists():
 576            return
 577
 578        if not staging_area:
 579            staging_area = self.dataset.get_staging_area()
 580
 581        if not staging_area.exists() or not staging_area.is_dir():
 582            raise RuntimeError("Staging area %s is not a valid folder")
 583
 584        with zipfile.ZipFile(archive_path, "r") as archive_file:
 585            if filename not in archive_file.namelist():
 586                raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
 587            else:
 588                archive_file.extract(filename, staging_area)
 589                return staging_area.joinpath(filename)
 590
 591    def write_csv_items_and_finish(self, data):
 592        """
 593        Write data as csv to results file and finish dataset
 594
 595        Determines result file path using dataset's path determination helper
 596        methods. After writing results, the dataset is marked finished. Will
 597        raise a ProcessorInterruptedException if the interrupted flag for this
 598        processor is set while iterating.
 599
 600        :param data: A list or tuple of dictionaries, all with the same keys
 601        """
 602        if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
 603            raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
 604
 605        if not data:
 606            raise ValueError("write_csv_items requires a dictionary with at least one item")
 607
 608        self.dataset.update_status("Writing results file")
 609        writer = False
 610        with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
 611            for row in data:
 612                if self.interrupted:
 613                    raise ProcessorInterruptedException("Interrupted while writing results file")
 614
 615                row = remove_nuls(row)
 616                if not writer:
 617                    writer = csv.DictWriter(results, fieldnames=row.keys())
 618                    writer.writeheader()
 619
 620                writer.writerow(row)
 621
 622        self.dataset.update_status("Finished")
 623        self.dataset.finish(len(data))
 624
 625    def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
 626        """
 627        Archive a bunch of files into a zip archive and finish processing
 628
 629        :param list|Path files: If a list, all files will be added to the
 630          archive and deleted afterwards. If a folder, all files in the folder
 631          will be added and the folder will be deleted afterwards.
 632        :param int num_items: Items in the dataset. If None, the amount of
 633          files added to the archive will be used.
 634        :param int compression:  Type of compression to use. By default, files
 635          are not compressed, to speed up unarchiving.
 636        :param bool finish:  Finish the dataset/job afterwards or not?
 637        """
 638        is_folder = False
 639        if issubclass(type(files), PurePath):
 640            is_folder = files
 641            if not files.exists() or not files.is_dir():
 642                raise RuntimeError("Folder %s is not a folder that can be archived" % files)
 643
 644            files = files.glob("*")
 645
 646        # create zip of archive and delete temporary files and folder
 647        self.dataset.update_status("Compressing results into archive")
 648        done = 0
 649        with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
 650            for output_path in files:
 651                zip.write(output_path, output_path.name)
 652                output_path.unlink()
 653                done += 1
 654
 655        # delete temporary folder
 656        if is_folder:
 657            shutil.rmtree(is_folder)
 658
 659        self.dataset.update_status("Finished")
 660        if num_items is None:
 661            num_items = done
 662
 663        if finish:
 664            self.dataset.finish(num_items)
 665
 666    def create_standalone(self, item_ids=None):
 667        """
 668        Copy this dataset and make that copy standalone.
 669
 670        This has the benefit of allowing for all analyses that can be run on
 671        full datasets on the new, filtered copy as well.
 672        
 673        This also transfers annotations and annotation fields.
 674
 675        :param list item_ids:   The item_ids that are copied-over. Used to check what annotations need to be copied.
 676
 677        :return DataSet:  The new standalone dataset
 678        """
 679
 680        top_parent = self.source_dataset
 681
 682        finished = self.dataset.check_dataset_finished()
 683        if finished == 'empty':
 684            # No data to process, so we can't create a standalone dataset
 685            return
 686        elif finished is None:
 687            # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
 688            pass
 689
 690        standalone = self.dataset.copy(shallow=False)
 691        standalone.body_match = "(Filtered) " + top_parent.query
 692        standalone.datasource = top_parent.parameters.get("datasource", "custom")
 693
 694        if top_parent.annotation_fields and top_parent.num_annotations() > 0:
 695            # Get column names dynamically
 696            annotation_cols = self.db.fetchone("SELECT * FROM annotations LIMIT 1")
 697            annotation_cols = list(annotation_cols.keys())
 698            annotation_cols.remove("id")  # Set by the DB
 699            cols_str = ",".join(annotation_cols)
 700
 701            cols_list = ["a." + col for col in annotation_cols if col != "dataset"]
 702            query = f"INSERT INTO annotations ({cols_str}) OVERRIDING USER VALUE " \
 703                    f"SELECT %s, {', '.join(cols_list)} " \
 704                    f"FROM annotations AS a WHERE a.dataset = %s"
 705
 706            # Copy over all annotations if no item_ids are given
 707            if not item_ids or top_parent.num_rows == standalone.num_rows:
 708                self.db.execute(query, replacements=(standalone.key, top_parent.key))
 709            else:
 710                query += " AND a.item_id = ANY(%s)"
 711                self.db.execute(query, replacements=(standalone.key, top_parent.key, item_ids))
 712
 713        # Copy over annotation fields and update annotations with new field IDs
 714        if top_parent.annotation_fields:
 715            # New field IDs based on the new dataset key
 716            annotation_fields = {
 717                hash_to_md5(old_field_id + standalone.key): field_values
 718                for old_field_id, field_values in top_parent.annotation_fields.items()
 719            }
 720            standalone.annotation_fields = {}  # Reset to insert everything without checking for changes
 721            standalone.save_annotation_fields(annotation_fields)  # Save to db
 722
 723            # Also update field IDs in annotations
 724            for i, old_field_id in enumerate(top_parent.annotation_fields.keys()):
 725                self.db.update(
 726                    "annotations",
 727                    where={"field_id": old_field_id, "dataset": standalone.key},
 728                    data={"field_id": hash_to_md5(old_field_id + standalone.key)
 729                })
 730
 731        try:
 732            standalone.board = top_parent.board
 733        except AttributeError:
 734            standalone.board = self.type
 735
 736        standalone.type = top_parent.type
 737
 738        standalone.detach()
 739        standalone.delete_parameter("key_parent")
 740
 741        self.dataset.copied_to = standalone.key
 742
 743        # we don't need this file anymore - it has been copied to the new
 744        # standalone dataset, and this one is not accessible via the interface
 745        # except as a link to the copied standalone dataset
 746        os.unlink(self.dataset.get_results_path())
 747
 748        # Copy the log
 749        shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
 750
 751        return standalone
 752
 753    def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
 754        """
 755        Saves annotations made by this processor on the basis of another dataset.
 756        Also adds some data regarding this processor: set `author` and `label` to processor name,
 757        and add parameters to `metadata` (unless explicitly indicated).
 758
 759        :param annotations:				List of dictionaries with annotation items. Must have `item_id` and `value`.
 760                                        E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
 761        :param source_dataset:			The dataset that these annotations will be saved on. If None, will use the
 762                                        top parent.
 763        :param bool hide_in_explorer:	Whether this annotation is included in the Explorer. 'Hidden' annotations
 764                                        are still shown in `iterate_items()`).
 765
 766        :returns int:					How many annotations were saved.
 767
 768        """
 769
 770        if not annotations:
 771            return 0
 772
 773        # Default to parent dataset
 774        if not source_dataset:
 775            source_dataset = self.source_dataset.top_parent()
 776
 777        # Check if this dataset already has annotation fields, and if so, store some values to use per annotation.
 778        annotation_fields = source_dataset.annotation_fields
 779
 780        # Keep track of what fields we've already seen, so we don't need to hash every time.
 781        seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id
 782                       for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items}
 783
 784        annotations_saved = 0
 785        failed = 0
 786
 787        # Loop through all annotations. This may be batched.
 788        for annotation in annotations:
 789
 790            # item_id always needs to be present
 791            if not annotation.get("item_id"):
 792                failed += 1
 793                continue
 794
 795            # Keep track of what dataset generated this annotation
 796            annotation["from_dataset"] = self.dataset.key
 797            # Set the author to this processor's name
 798            if not annotation.get("author"):
 799                annotation["author"] = self.name
 800            if not annotation.get("author_original"):
 801                annotation["author_original"] = self.name
 802            annotation["by_processor"] = True
 803
 804            # Only use a default label if no custom one is given
 805            if not annotation.get("label"):
 806                annotation["label"] = self.name
 807
 808            # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet.
 809            # We need to do this within this loop because this function may be called in batches and with different
 810            # annotation types.
 811            if (annotation["from_dataset"], annotation["label"]) not in seen_fields:
 812                # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key.
 813                # This should create unique fields, even if there's multiple annotation types for one processor.
 814                field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"])
 815                seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id
 816                annotation_fields[field_id] = {
 817                    "label": annotation["label"],
 818                    "type": annotation["type"] if annotation.get("type") else "text",
 819                    "from_dataset": annotation["from_dataset"],
 820                    "hide_in_explorer": hide_in_explorer
 821                }
 822            else:
 823                # Else just get the field ID
 824                field_id = seen_fields[(annotation["from_dataset"], annotation["label"])]
 825
 826            # Add field ID to the annotation
 827            annotation["field_id"] = field_id
 828
 829        try:
 830            annotations_saved = source_dataset.save_annotations(annotations)
 831        except AnnotationException as e:
 832            self.source_dataset.update_status(str(e))
 833        if failed:
 834            self.dataset.update_status("Could not save all annotations, make sure that all items have an `id` value.")
 835
 836        source_dataset.save_annotation_fields(annotation_fields)
 837
 838        return annotations_saved
 839
 840    @classmethod
 841    def map_item_method_available(cls, dataset):
 842        """
 843        Check if this processor can use map_item
 844
 845        Checks if map_item method exists and is compatible with dataset. If
 846        dataset has a different extension than the default for this processor,
 847        or if the dataset has no extension, this means we cannot be sure the
 848        data is in the right format to be mapped, so `False` is returned in
 849        that case even if a map_item() method is available.
 850
 851        :param BasicProcessor processor:    The BasicProcessor subclass object
 852        with which to use map_item
 853        :param DataSet dataset:                The DataSet object with which to
 854        use map_item
 855        """
 856        # only run item mapper if extension of processor == extension of
 857        # data file, for the scenario where a csv file was uploaded and
 858        # converted to an ndjson-based data source, for example
 859        # todo: this is kind of ugly, and a better fix may be possible
 860        dataset_extension = dataset.get_extension()
 861        if not dataset_extension:
 862            # DataSet results file does not exist or has no extension, use expected extension
 863            if hasattr(dataset, "extension"):
 864                dataset_extension = dataset.extension
 865            else:
 866                # No known DataSet extension; cannot determine if map_item method compatible
 867                return False
 868
 869        return hasattr(cls, "map_item") and cls.extension == dataset_extension
 870
 871    @classmethod
 872    def get_mapped_item(cls, item):
 873        """
 874        Get the mapped item using a processors map_item method.
 875
 876        Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
 877        """
 878        try:
 879            mapped_item = cls.map_item(item)
 880        except (KeyError, IndexError) as e:
 881            raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
 882
 883        if not mapped_item:
 884            raise MapItemException("Unable to map item!")
 885
 886        return mapped_item
 887
 888    @classmethod
 889    def is_filter(cls):
 890        """
 891        Is this processor a filter?
 892
 893        Filters do not produce their own dataset but replace the source_dataset dataset
 894        instead.
 895
 896        :todo: Make this a bit more robust than sniffing the processor category
 897        :return bool:
 898        """
 899        return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
 900
 901    @classmethod
 902    def get_options(cls, parent_dataset=None, config=None) -> dict:
 903        """
 904        Get processor options
 905
 906        This method by default returns the class's "options" attribute, or an
 907        empty dictionary. It can be redefined by processors that need more
 908        fine-grained options, e.g. in cases where the availability of options
 909        is partially determined by the parent dataset's parameters.
 910
 911        :param parent_dataset DataSet:  An object representing the dataset that
 912            the processor would be or was run on. Can be used, in conjunction with
 913            config, to show some options only to privileged users.
 914        :param config ConfigManager|None config:  Configuration reader (context-aware)
 915        :return dict:   Options for this processor
 916        """
 917
 918        return cls.options if hasattr(cls, "options") else {}
 919
 920    @classmethod
 921    def get_status(cls):
 922        """
 923        Get processor status
 924
 925        :return list:    Statuses of this processor
 926        """
 927        return cls.status if hasattr(cls, "status") else None
 928
 929    @classmethod
 930    def is_top_dataset(cls):
 931        """
 932        Confirm this is *not* a top dataset, but a processor.
 933
 934        Used for processor compatibility checks.
 935
 936        :return bool:  Always `False`, because this is a processor.
 937        """
 938        return False
 939
 940    @classmethod
 941    def is_from_collector(cls):
 942        """
 943        Check if this processor is one that collects data, i.e. a search or
 944        import worker.
 945
 946        :return bool:
 947        """
 948        return cls.type.endswith("-search") or cls.type.endswith("-import")
 949
 950    @classmethod
 951    def get_extension(self, parent_dataset=None):
 952        """
 953        Return the extension of the processor's dataset
 954
 955        Used for processor compatibility checks.
 956
 957        :param DataSet parent_dataset:  An object representing the dataset that
 958          the processor would be run on
 959        :return str|None:  Dataset extension (without leading `.`) or `None`.
 960        """
 961        if self.is_filter():
 962            if parent_dataset is not None:
 963                # Filters should use the same extension as the parent dataset
 964                return parent_dataset.get_extension()
 965            else:
 966                # No dataset provided, unable to determine extension of parent dataset
 967                # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
 968                return None
 969        elif self.extension:
 970            # Use explicitly defined extension in class (Processor class defaults to "csv")
 971            return self.extension
 972        else:
 973            # A non filter processor updated the base Processor extension to None/False?
 974            return None
 975
 976    @classmethod
 977    def is_rankable(cls, multiple_items=True):
 978        """
 979        Used for processor compatibility
 980
 981        :param bool multiple_items:  Consider datasets with multiple items per
 982          item (e.g. word_1, word_2, etc)? Included for compatibility
 983        """
 984        return False
 985
 986    @classmethod
 987    def exclude_followup_processors(cls, processor_type=None):
 988        """
 989        Used for processor compatibility
 990
 991        To be defined by the child processor if it should exclude certain follow-up processors.
 992        e.g.:
 993
 994        def exclude_followup_processors(cls, processor_type):
 995            if processor_type in ["undesirable-followup-processor"]:
 996                return True
 997            return False
 998
 999        :param str processor_type:  Processor type to exclude
1000        :return bool:  True if processor should be excluded, False otherwise
1001        """
1002        return False
1003
1004    @abc.abstractmethod
1005    def process(self):
1006        """
1007        Process data
1008
1009        To be defined by the child processor.
1010        """
1011        pass
1012
1013    @staticmethod
1014    def is_4cat_processor():
1015        """
1016        Is this a 4CAT processor?
1017
1018        This is used to determine whether a class is a 4CAT
1019        processor.
1020
1021        :return:  True
1022        """
1023        return True
  29class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
  30    """
  31    Abstract processor class
  32
  33    A processor takes a finished dataset as input and processes its result in
  34    some way, with another dataset set as output. The input thus is a file, and
  35    the output (usually) as well. In other words, the result of a processor can
  36    be used as input for another processor (though whether and when this is
  37    useful is another question).
  38
  39    To determine whether a processor can process a given dataset, you can
  40    define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class
  41    method which takes a dataset as argument and returns a bool that determines
  42    if this processor is considered compatible with that dataset. For example:
  43
  44    .. code-block:: python
  45
  46        @classmethod
  47        def is_compatible_with(cls, module=None, config=None):
  48            return module.type == "linguistic-features"
  49
  50
  51    """
  52
  53    #: Database handler to interface with the 4CAT database
  54    db = None
  55
  56    #: Job object that requests the execution of this processor
  57    job = None
  58
  59    #: The dataset object that the processor is *creating*.
  60    dataset = None
  61
  62    #: Owner (username) of the dataset
  63    owner = None
  64
  65    #: The dataset object that the processor is *processing*.
  66    source_dataset = None
  67
  68    #: The file that is being processed
  69    source_file = None
  70
  71    #: Processor description, which will be displayed in the web interface
  72    description = "No description available"
  73
  74    #: Category identifier, used to group processors in the web interface
  75    category = "Other"
  76
  77    #: Extension of the file created by the processor
  78    extension = "csv"
  79
  80    #: 4CAT settings from the perspective of the dataset's owner
  81    config = None
  82
  83    #: Is this processor running 'within' a preset processor?
  84    is_running_in_preset = False
  85
  86    #: Is this processor hidden in the front-end, and only used internally/in presets?
  87    is_hidden = False
  88
  89    #: This will be defined automatically upon loading the processor. There is
  90    #: no need to override manually
  91    filepath = None
  92
  93    #: This will be a list; files added to it are deleted after the processor
  94    #: terminates, even on failure, if they still exist at that point. Add
  95    #: path objects or dataset objects; for datasets, the
  96    #: `remove_disposable_files()` method will be called.
  97    for_cleanup = None
  98
  99    def work(self):
 100        """
 101        Process a dataset
 102
 103        Loads dataset metadata, sets up the scaffolding for performing some kind
 104        of processing on that dataset, and then processes it. Afterwards, clean
 105        up.
 106        """
 107        try:
 108            # a dataset can have multiple owners, but the creator is the user
 109            # that actually queued the processor, so their config is relevant
 110            self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
 111            self.owner = self.dataset.creator
 112        except DataSetException:
 113            # query has been deleted in the meantime. finish without error,
 114            # as deleting it will have been a conscious choice by a user
 115            self.job.finish()
 116            return
 117
 118        # set up config reader wrapping the worker's config manager, which is
 119        # in turn the one passed to it by the WorkerManager, which is the one
 120        # originally loaded in bootstrap
 121        self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))
 122
 123        if self.dataset.data.get("key_parent", None):
 124            # search workers never have parents (for now), so we don't need to
 125            # find out what the source_dataset dataset is if it's a search worker
 126            try:
 127                self.source_dataset = self.dataset.get_parent()
 128
 129                # for presets, transparently use the *top* dataset as a source_dataset
 130                # since that is where any underlying processors should get
 131                # their data from. However, this should only be done as long as the
 132                # preset is not finished yet, because after that there may be processors
 133                # that run on the final preset result
 134                while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
 135                    self.is_running_in_preset = True
 136                    self.source_dataset = self.source_dataset.get_parent()
 137                    if self.source_dataset is None:
 138                        # this means there is no dataset that is *not* a preset anywhere
 139                        # above this dataset. This should never occur, but if it does, we
 140                        # cannot continue
 141                        self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
 142                                       (self.type, self.dataset.key))
 143                        self.job.finish()
 144                        return
 145
 146            except DataSetException:
 147                # we need to know what the source_dataset dataset was to properly handle the
 148                # analysis
 149                self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
 150                    self.type, self.dataset.key))
 151                self.job.finish()
 152                return
 153
 154            if not self.source_dataset.is_finished() and not self.is_running_in_preset:
 155                # not finished yet - retry after a while
 156                # exception for presets, since these *should* be unfinished
 157                # until underlying processors are done
 158                self.job.release(delay=30)
 159                return
 160
 161            self.source_file = self.source_dataset.get_results_path()
 162            if not self.source_file.exists():
 163                self.dataset.update_status("Finished, no input data found.")
 164
 165        self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
 166
 167        processor_name = self.title if hasattr(self, "title") else self.type
 168        self.dataset.clear_log()
 169        self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
 170
 171        # start log file
 172        self.dataset.update_status("Processing data")
 173        self.dataset.update_version(get_software_commit(self))
 174
 175        # we may create temporary files with the processor; anything in here
 176        # will be deleted after the processor ends (or crashes!). dataset
 177        # objects will have their cleanup methods called
 178        self.for_cleanup = [self.dataset]
 179        if self.source_dataset is not None:
 180            # Add source dataset to cleanup list to remove disposable files
 181            self.for_cleanup.append(self.source_dataset)
 182
 183        # get parameters
 184        # if possible, fill defaults where parameters are not provided
 185        given_parameters = self.dataset.parameters.copy()
 186        all_parameters = self.get_options(self.dataset, config=self.config)
 187        self.parameters = {
 188            param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
 189            for param in [*all_parameters.keys(), *given_parameters.keys()]
 190        }
 191
 192        # now the parameters have been loaded into memory, clear any sensitive
 193        # ones. This has a side-effect that a processor may not run again
 194        # without starting from scratch, but this is the price of progress
 195        options = self.get_options(self.dataset.get_parent(), config=self.config)
 196        for option, option_settings in options.items():
 197            if option_settings.get("sensitive"):
 198                self.dataset.delete_parameter(option)
 199
 200        if self.interrupted:
 201            self.dataset.log("Processing interrupted, trying again later")
 202            return self.abort()
 203
 204        if not self.dataset.is_finished():
 205            try:
 206                self.process()
 207                self.after_process()
 208                
 209                # processors should usually finish their jobs by themselves, but if
 210                # the worker finished without errors, the job can be finished in
 211                # any case
 212                if not self.job.is_finished:
 213                    self.job.finish()
 214            except WorkerInterruptedException as e:
 215                self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
 216                self.abort()
 217            except Exception as e:
 218                self.dataset.log("Processor crashed (%s), trying again later" % str(e))
 219                stack = traceback.extract_tb(e.__traceback__)
 220                frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]]
 221                location = "->".join(frames)
 222
 223                # Not all datasets have source_dataset keys
 224                if len(self.dataset.get_genealogy()) > 1:
 225                    parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
 226                else:
 227                    parent_key = ""
 228                
 229                # Clean up partially created datasets/files
 230                self.clean_up_on_error()
 231
 232                raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
 233                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack)
 234
 235            finally:
 236                # clean up files that have been created and marked as disposable
 237                for item in self.for_cleanup:
 238                    if type(item) is DataSet:
 239                        item.remove_disposable_files()
 240                    elif item.exists():
 241                        shutil.rmtree(item, ignore_errors=True)
 242        else:
 243            # dataset already finished, job shouldn't be open anymore
 244            self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (
 245            self.job.data["jobtype"], self.job.data["remote_id"]))
 246            self.job.finish()
 247
 248    def after_process(self):
 249        """
 250        Run after processing the dataset
 251
 252        This method cleans up temporary files, and if needed, handles logistics
 253        concerning the result file, e.g. running a pre-defined processor on the
 254        result, copying it to another dataset, and so on.
 255        """
 256        if self.dataset.data["num_rows"] > 0:
 257            self.dataset.update_status("Dataset completed.")
 258
 259        if not self.dataset.is_finished():
 260            self.dataset.finish()
 261
 262        # see if we have anything else lined up to run next
 263        for next in self.parameters.get("next", []):
 264            can_run_next = True
 265            next_parameters = next.get("parameters", {})
 266            next_type = next.get("type", "")
 267            try:
 268                available_processors = self.dataset.get_available_processors(config=self.config)
 269            except ValueError:
 270                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
 271                break
 272
 273
 274            # run it only if the post-processor is actually available for this query
 275            if self.dataset.data["num_rows"] <= 0:
 276                can_run_next = False
 277                self.log.info(
 278                    "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (
 279                        next_type, self.dataset.key))
 280
 281            elif next_type in available_processors:
 282                next_analysis = DataSet(
 283                    parameters=next_parameters,
 284                    type=next_type,
 285                    db=self.db,
 286                    parent=self.dataset.key,
 287                    extension=available_processors[next_type].extension,
 288                    is_private=self.dataset.is_private,
 289                    owner=self.dataset.creator,
 290                    modules=self.modules
 291                )
 292                # copy ownership from parent dataset
 293                next_analysis.copy_ownership_from(self.dataset)
 294                # add to queue
 295                self.queue.add_job(self.modules.processors[next_type], dataset=next_analysis)
 296            else:
 297                can_run_next = False
 298                self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (
 299                self.dataset.key, self.type, next_type))
 300
 301            if not can_run_next:
 302                # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
 303                # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
 304                if "attach_to" in self.parameters:
 305                    # Probably should not happen, but for some reason a mid processor has been designated as the processor
 306                    # the parent should attach to
 307                    pass
 308                else:
 309                    # Check for "attach_to" parameter in descendents
 310                    while True:
 311                        if "attach_to" in next_parameters:
 312                            self.parameters["attach_to"] = next_parameters["attach_to"]
 313                            break
 314                        else:
 315                            if "next" in next_parameters:
 316                                next_parameters = next_parameters["next"][0]["parameters"]
 317                            else:
 318                                # No more descendents
 319                                # Should not happen; we cannot find the source dataset
 320                                self.log.warning(
 321                                    "Cannot find preset's source dataset for dataset %s" % self.dataset.key)
 322                                break
 323
 324        # see if we need to register the result somewhere
 325        if "copy_to" in self.parameters:
 326            # copy the results to an arbitrary place that was passed
 327            if self.dataset.get_results_path().exists():
 328                shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
 329            else:
 330                # if copy_to was passed, that means it's important that this
 331                # file exists somewhere, so we create it as an empty file
 332                with open(self.parameters["copy_to"], "w") as empty_file:
 333                    empty_file.write("")
 334
 335        # see if this query chain is to be attached to another query
 336        # if so, the full genealogy of this query (minus the original dataset)
 337        # is attached to the given query - this is mostly useful for presets,
 338        # where a chain of processors can be marked as 'underlying' a preset
 339        if "attach_to" in self.parameters:
 340            try:
 341                # copy metadata and results to the surrogate
 342                surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules)
 343
 344                if self.dataset.get_results_path().exists():
 345                    # Update the surrogate's results file suffix to match this dataset's suffix
 346                    surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(
 347                        self.dataset.get_results_path().suffix)
 348                    shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
 349
 350                try:
 351                    surrogate.finish(self.dataset.data["num_rows"])
 352                except RuntimeError:
 353                    # already finished, could happen (though it shouldn't)
 354                    pass
 355
 356                surrogate.update_status(self.dataset.get_status())
 357
 358            except DataSetException:
 359                # dataset with key to attach to doesn't exist...
 360                self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have "
 361                                 "been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"]))
 362
 363        self.job.finish()
 364
 365        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 366            owner = self.dataset.get_parameters().get("email-complete", False)
 367            # Check that username is email address
 368            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 369                from email.mime.multipart import MIMEMultipart
 370                from email.mime.text import MIMEText
 371                from smtplib import SMTPException
 372                import socket
 373                import html2text
 374
 375        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 376            owner = self.dataset.get_parameters().get("email-complete", False)
 377            # Check that username is email address
 378            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 379                from email.mime.multipart import MIMEMultipart
 380                from email.mime.text import MIMEText
 381                from smtplib import SMTPException
 382                import socket
 383                import html2text
 384
 385                self.log.debug("Sending email to %s" % owner)
 386                dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
 387                sender = self.config.get('mail.noreply')
 388                message = MIMEMultipart("alternative")
 389                message["From"] = sender
 390                message["To"] = owner
 391                message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
 392                mail = """
 393                    <p>Hello %s,</p>
 394                    <p>4CAT has finished collecting your %s dataset labeled: %s</p>
 395                    <p>You can view your dataset via the following link:</p>
 396                    <p><a href="%s">%s</a></p> 
 397                    <p>Sincerely,</p>
 398                    <p>Your friendly neighborhood 4CAT admin</p>
 399                    """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
 400                html_parser = html2text.HTML2Text()
 401                message.attach(MIMEText(html_parser.handle(mail), "plain"))
 402                message.attach(MIMEText(mail, "html"))
 403                try:
 404                    send_email([owner], message, self.config)
 405                except (SMTPException, ConnectionRefusedError, socket.timeout):
 406                    self.log.error("Error sending email to %s" % owner)
 407
 408    def clean_up_on_error(self):
 409        try:
 410            # ensure proxied requests are stopped
 411            self.flush_proxied_requests()
 412            # delete annotations that have been generated as part of this processor
 413            self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True)
 414            # Remove the results file that was created
 415            if self.dataset.get_results_path().exists():
 416                self.dataset.get_results_path().unlink()
 417            if self.dataset.get_results_folder_path().exists():
 418                shutil.rmtree(self.dataset.get_results_folder_path())
 419
 420        except Exception as e:
 421            self.log.error("Error during processor cleanup after error: %s" % str(e))
 422
 423    def abort(self):
 424        """
 425        Abort dataset creation and clean up so it may be attempted again later
 426        """
 427        self.clean_up_on_error()
 428
 429        # we release instead of finish, since interrupting is just that - the
 430        # job should resume at a later point. Delay resuming by 10 seconds to
 431        # give 4CAT the time to do whatever it wants (though usually this isn't
 432        # needed since restarting also stops the spawning of new workers)
 433        if self.interrupted == self.INTERRUPT_RETRY:
 434            # retry later - wait at least 10 seconds to give the backend time to shut down
 435            self.job.release(delay=10)
 436        elif self.interrupted == self.INTERRUPT_CANCEL:
 437            # cancel job
 438            self.job.finish()
 439
 440    def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
 441        """
 442        Request an iterable of URLs and return results
 443
 444        This method takes an iterable yielding URLs and yields the result for
 445        a GET request for that URL in return. This is done through the worker
 446        manager's DelegatedRequestHandler, which can run multiple requests in
 447        parallel and divide them over the proxies configured in 4CAT (if any).
 448        Proxy cooloff and queueing is shared with other processors, so that a
 449        processor will never accidentally request from the same site as another
 450        processor, potentially triggering rate limits.
 451
 452        :param urls:  Something that can be iterated over and yields URLs
 453        :param kwargs:  Other keyword arguments are passed on to `add_urls`
 454        and eventually to `requests.get()`.
 455        :param bool preserve_order:  Return items in the original order. Use
 456        `False` to potentially speed up processing, if order is not important.
 457        :return:  A generator yielding request results, i.e. tuples of a
 458        URL and a `requests` response objects
 459        """
 460        queue_name = self._proxy_queue_name()
 461        delegator = self.manager.proxy_delegator
 462
 463        delegator.refresh_settings(self.config)
 464
 465        # 50 is an arbitrary batch size - but we top up every 0.05s, so
 466        # that should be sufficient
 467        batch_size = 50
 468
 469        # we need an iterable, so we can use next() and StopIteration
 470        urls = iter(urls)
 471
 472        have_urls = True
 473        while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls:
 474            if queue_length < batch_size and have_urls:
 475                batch = []
 476                while len(batch) < (batch_size - queue_length):
 477                    try:
 478                        batch.append(next(urls))
 479                    except StopIteration:
 480                        have_urls = False
 481                        break
 482
 483                delegator.add_urls(batch, queue_name, **kwargs)
 484
 485            time.sleep(0.05)  # arbitrary...
 486            for url, result in delegator.get_results(queue_name, preserve_order=preserve_order):
 487                # result may also be a FailedProxiedRequest!
 488                # up to the processor to decide how to deal with it
 489                yield url, result
 490
 491    def push_proxied_request(self, url, position=-1, **kwargs):
 492        """
 493        Add a single URL to the proxied requests queue
 494
 495        :param str url:  URL to add
 496        :param position:  Position to add to queue; can be used to add priority
 497        requests, adds to end of queue by default
 498        :param kwargs:
 499        """
 500        self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)
 501
 502    def flush_proxied_requests(self):
 503        """
 504        Get rid of remaining proxied requests
 505
 506        Can be used if enough results are available and any remaining ones need
 507        to be stopped ASAP and are otherwise unneeded.
 508
 509        Blocking!
 510        """
 511        self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())
 512
 513    def _proxy_queue_name(self):
 514        """
 515        Get proxy queue name
 516
 517        For internal use.
 518
 519        :return str:
 520        """
 521        return f"{self.type}-{self.dataset.key}"
 522
 523    def unpack_archive_contents(self, path, staging_area=None):
 524        """
 525        Unpack all files in an archive to a staging area
 526
 527        With every iteration, the processor's 'interrupted' flag is checked,
 528        and if set a ProcessorInterruptedException is raised, which by default
 529        is caught and subsequently stops execution gracefully.
 530
 531        Files are unzipped to a staging area. The staging area is *not*
 532        cleaned up automatically.
 533
 534        :param Path path:     Path to zip file to read
 535        :param Path staging_area:  Where to store the files while they're
 536          being worked with. If omitted, a temporary folder is created and
 537          deleted after use
 538        :param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
 539        :return Path:  A path to the staging area
 540        """
 541
 542        if not path.exists():
 543            return
 544
 545        if not staging_area:
 546            staging_area = self.dataset.get_staging_area()
 547
 548        if not staging_area.exists() or not staging_area.is_dir():
 549            raise RuntimeError("Staging area %s is not a valid folder")
 550
 551        paths = []
 552        with zipfile.ZipFile(path, "r") as archive_file:
 553            archive_contents = sorted(archive_file.namelist())
 554
 555            for archived_file in archive_contents:
 556                if self.interrupted:
 557                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
 558
 559                file_name = archived_file.split("/")[-1]
 560                temp_file = staging_area.joinpath(file_name)
 561                archive_file.extract(archived_file, staging_area)
 562                paths.append(temp_file)
 563
 564        return staging_area
 565
 566    def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
 567        """
 568        Extract a file from an archive by name
 569
 570        :param str filename:  Name of file to extract
 571        :param Path archive_path:  Path to zip file to read
 572        :param Path staging_area:  Where to store the files while they're
 573                  being worked with. If omitted, a temporary folder is created
 574        :return Path:  A path to the extracted file
 575        """
 576        if not archive_path.exists():
 577            return
 578
 579        if not staging_area:
 580            staging_area = self.dataset.get_staging_area()
 581
 582        if not staging_area.exists() or not staging_area.is_dir():
 583            raise RuntimeError("Staging area %s is not a valid folder")
 584
 585        with zipfile.ZipFile(archive_path, "r") as archive_file:
 586            if filename not in archive_file.namelist():
 587                raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
 588            else:
 589                archive_file.extract(filename, staging_area)
 590                return staging_area.joinpath(filename)
 591
 592    def write_csv_items_and_finish(self, data):
 593        """
 594        Write data as csv to results file and finish dataset
 595
 596        Determines result file path using dataset's path determination helper
 597        methods. After writing results, the dataset is marked finished. Will
 598        raise a ProcessorInterruptedException if the interrupted flag for this
 599        processor is set while iterating.
 600
 601        :param data: A list or tuple of dictionaries, all with the same keys
 602        """
 603        if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
 604            raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
 605
 606        if not data:
 607            raise ValueError("write_csv_items requires a dictionary with at least one item")
 608
 609        self.dataset.update_status("Writing results file")
 610        writer = False
 611        with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
 612            for row in data:
 613                if self.interrupted:
 614                    raise ProcessorInterruptedException("Interrupted while writing results file")
 615
 616                row = remove_nuls(row)
 617                if not writer:
 618                    writer = csv.DictWriter(results, fieldnames=row.keys())
 619                    writer.writeheader()
 620
 621                writer.writerow(row)
 622
 623        self.dataset.update_status("Finished")
 624        self.dataset.finish(len(data))
 625
 626    def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
 627        """
 628        Archive a bunch of files into a zip archive and finish processing
 629
 630        :param list|Path files: If a list, all files will be added to the
 631          archive and deleted afterwards. If a folder, all files in the folder
 632          will be added and the folder will be deleted afterwards.
 633        :param int num_items: Items in the dataset. If None, the amount of
 634          files added to the archive will be used.
 635        :param int compression:  Type of compression to use. By default, files
 636          are not compressed, to speed up unarchiving.
 637        :param bool finish:  Finish the dataset/job afterwards or not?
 638        """
 639        is_folder = False
 640        if issubclass(type(files), PurePath):
 641            is_folder = files
 642            if not files.exists() or not files.is_dir():
 643                raise RuntimeError("Folder %s is not a folder that can be archived" % files)
 644
 645            files = files.glob("*")
 646
 647        # create zip of archive and delete temporary files and folder
 648        self.dataset.update_status("Compressing results into archive")
 649        done = 0
 650        with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
 651            for output_path in files:
 652                zip.write(output_path, output_path.name)
 653                output_path.unlink()
 654                done += 1
 655
 656        # delete temporary folder
 657        if is_folder:
 658            shutil.rmtree(is_folder)
 659
 660        self.dataset.update_status("Finished")
 661        if num_items is None:
 662            num_items = done
 663
 664        if finish:
 665            self.dataset.finish(num_items)
 666
 667    def create_standalone(self, item_ids=None):
 668        """
 669        Copy this dataset and make that copy standalone.
 670
 671        This has the benefit of allowing for all analyses that can be run on
 672        full datasets on the new, filtered copy as well.
 673        
 674        This also transfers annotations and annotation fields.
 675
 676        :param list item_ids:   The item_ids that are copied-over. Used to check what annotations need to be copied.
 677
 678        :return DataSet:  The new standalone dataset
 679        """
 680
 681        top_parent = self.source_dataset
 682
 683        finished = self.dataset.check_dataset_finished()
 684        if finished == 'empty':
 685            # No data to process, so we can't create a standalone dataset
 686            return
 687        elif finished is None:
 688            # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
 689            pass
 690
 691        standalone = self.dataset.copy(shallow=False)
 692        standalone.body_match = "(Filtered) " + top_parent.query
 693        standalone.datasource = top_parent.parameters.get("datasource", "custom")
 694
 695        if top_parent.annotation_fields and top_parent.num_annotations() > 0:
 696            # Get column names dynamically
 697            annotation_cols = self.db.fetchone("SELECT * FROM annotations LIMIT 1")
 698            annotation_cols = list(annotation_cols.keys())
 699            annotation_cols.remove("id")  # Set by the DB
 700            cols_str = ",".join(annotation_cols)
 701
 702            cols_list = ["a." + col for col in annotation_cols if col != "dataset"]
 703            query = f"INSERT INTO annotations ({cols_str}) OVERRIDING USER VALUE " \
 704                    f"SELECT %s, {', '.join(cols_list)} " \
 705                    f"FROM annotations AS a WHERE a.dataset = %s"
 706
 707            # Copy over all annotations if no item_ids are given
 708            if not item_ids or top_parent.num_rows == standalone.num_rows:
 709                self.db.execute(query, replacements=(standalone.key, top_parent.key))
 710            else:
 711                query += " AND a.item_id = ANY(%s)"
 712                self.db.execute(query, replacements=(standalone.key, top_parent.key, item_ids))
 713
 714        # Copy over annotation fields and update annotations with new field IDs
 715        if top_parent.annotation_fields:
 716            # New field IDs based on the new dataset key
 717            annotation_fields = {
 718                hash_to_md5(old_field_id + standalone.key): field_values
 719                for old_field_id, field_values in top_parent.annotation_fields.items()
 720            }
 721            standalone.annotation_fields = {}  # Reset to insert everything without checking for changes
 722            standalone.save_annotation_fields(annotation_fields)  # Save to db
 723
 724            # Also update field IDs in annotations
 725            for i, old_field_id in enumerate(top_parent.annotation_fields.keys()):
 726                self.db.update(
 727                    "annotations",
 728                    where={"field_id": old_field_id, "dataset": standalone.key},
 729                    data={"field_id": hash_to_md5(old_field_id + standalone.key)
 730                })
 731
 732        try:
 733            standalone.board = top_parent.board
 734        except AttributeError:
 735            standalone.board = self.type
 736
 737        standalone.type = top_parent.type
 738
 739        standalone.detach()
 740        standalone.delete_parameter("key_parent")
 741
 742        self.dataset.copied_to = standalone.key
 743
 744        # we don't need this file anymore - it has been copied to the new
 745        # standalone dataset, and this one is not accessible via the interface
 746        # except as a link to the copied standalone dataset
 747        os.unlink(self.dataset.get_results_path())
 748
 749        # Copy the log
 750        shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
 751
 752        return standalone
 753
 754    def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
 755        """
 756        Saves annotations made by this processor on the basis of another dataset.
 757        Also adds some data regarding this processor: set `author` and `label` to processor name,
 758        and add parameters to `metadata` (unless explicitly indicated).
 759
 760        :param annotations:				List of dictionaries with annotation items. Must have `item_id` and `value`.
 761                                        E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
 762        :param source_dataset:			The dataset that these annotations will be saved on. If None, will use the
 763                                        top parent.
 764        :param bool hide_in_explorer:	Whether this annotation is included in the Explorer. 'Hidden' annotations
 765                                        are still shown in `iterate_items()`).
 766
 767        :returns int:					How many annotations were saved.
 768
 769        """
 770
 771        if not annotations:
 772            return 0
 773
 774        # Default to parent dataset
 775        if not source_dataset:
 776            source_dataset = self.source_dataset.top_parent()
 777
 778        # Check if this dataset already has annotation fields, and if so, store some values to use per annotation.
 779        annotation_fields = source_dataset.annotation_fields
 780
 781        # Keep track of what fields we've already seen, so we don't need to hash every time.
 782        seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id
 783                       for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items}
 784
 785        annotations_saved = 0
 786        failed = 0
 787
 788        # Loop through all annotations. This may be batched.
 789        for annotation in annotations:
 790
 791            # item_id always needs to be present
 792            if not annotation.get("item_id"):
 793                failed += 1
 794                continue
 795
 796            # Keep track of what dataset generated this annotation
 797            annotation["from_dataset"] = self.dataset.key
 798            # Set the author to this processor's name
 799            if not annotation.get("author"):
 800                annotation["author"] = self.name
 801            if not annotation.get("author_original"):
 802                annotation["author_original"] = self.name
 803            annotation["by_processor"] = True
 804
 805            # Only use a default label if no custom one is given
 806            if not annotation.get("label"):
 807                annotation["label"] = self.name
 808
 809            # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet.
 810            # We need to do this within this loop because this function may be called in batches and with different
 811            # annotation types.
 812            if (annotation["from_dataset"], annotation["label"]) not in seen_fields:
 813                # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key.
 814                # This should create unique fields, even if there's multiple annotation types for one processor.
 815                field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"])
 816                seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id
 817                annotation_fields[field_id] = {
 818                    "label": annotation["label"],
 819                    "type": annotation["type"] if annotation.get("type") else "text",
 820                    "from_dataset": annotation["from_dataset"],
 821                    "hide_in_explorer": hide_in_explorer
 822                }
 823            else:
 824                # Else just get the field ID
 825                field_id = seen_fields[(annotation["from_dataset"], annotation["label"])]
 826
 827            # Add field ID to the annotation
 828            annotation["field_id"] = field_id
 829
 830        try:
 831            annotations_saved = source_dataset.save_annotations(annotations)
 832        except AnnotationException as e:
 833            self.source_dataset.update_status(str(e))
 834        if failed:
 835            self.dataset.update_status("Could not save all annotations, make sure that all items have an `id` value.")
 836
 837        source_dataset.save_annotation_fields(annotation_fields)
 838
 839        return annotations_saved
 840
 841    @classmethod
 842    def map_item_method_available(cls, dataset):
 843        """
 844        Check if this processor can use map_item
 845
 846        Checks if map_item method exists and is compatible with dataset. If
 847        dataset has a different extension than the default for this processor,
 848        or if the dataset has no extension, this means we cannot be sure the
 849        data is in the right format to be mapped, so `False` is returned in
 850        that case even if a map_item() method is available.
 851
 852        :param BasicProcessor processor:    The BasicProcessor subclass object
 853        with which to use map_item
 854        :param DataSet dataset:                The DataSet object with which to
 855        use map_item
 856        """
 857        # only run item mapper if extension of processor == extension of
 858        # data file, for the scenario where a csv file was uploaded and
 859        # converted to an ndjson-based data source, for example
 860        # todo: this is kind of ugly, and a better fix may be possible
 861        dataset_extension = dataset.get_extension()
 862        if not dataset_extension:
 863            # DataSet results file does not exist or has no extension, use expected extension
 864            if hasattr(dataset, "extension"):
 865                dataset_extension = dataset.extension
 866            else:
 867                # No known DataSet extension; cannot determine if map_item method compatible
 868                return False
 869
 870        return hasattr(cls, "map_item") and cls.extension == dataset_extension
 871
 872    @classmethod
 873    def get_mapped_item(cls, item):
 874        """
 875        Get the mapped item using a processors map_item method.
 876
 877        Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
 878        """
 879        try:
 880            mapped_item = cls.map_item(item)
 881        except (KeyError, IndexError) as e:
 882            raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
 883
 884        if not mapped_item:
 885            raise MapItemException("Unable to map item!")
 886
 887        return mapped_item
 888
 889    @classmethod
 890    def is_filter(cls):
 891        """
 892        Is this processor a filter?
 893
 894        Filters do not produce their own dataset but replace the source_dataset dataset
 895        instead.
 896
 897        :todo: Make this a bit more robust than sniffing the processor category
 898        :return bool:
 899        """
 900        return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
 901
 902    @classmethod
 903    def get_options(cls, parent_dataset=None, config=None) -> dict:
 904        """
 905        Get processor options
 906
 907        This method by default returns the class's "options" attribute, or an
 908        empty dictionary. It can be redefined by processors that need more
 909        fine-grained options, e.g. in cases where the availability of options
 910        is partially determined by the parent dataset's parameters.
 911
 912        :param parent_dataset DataSet:  An object representing the dataset that
 913            the processor would be or was run on. Can be used, in conjunction with
 914            config, to show some options only to privileged users.
 915        :param config ConfigManager|None config:  Configuration reader (context-aware)
 916        :return dict:   Options for this processor
 917        """
 918
 919        return cls.options if hasattr(cls, "options") else {}
 920
 921    @classmethod
 922    def get_status(cls):
 923        """
 924        Get processor status
 925
 926        :return list:    Statuses of this processor
 927        """
 928        return cls.status if hasattr(cls, "status") else None
 929
 930    @classmethod
 931    def is_top_dataset(cls):
 932        """
 933        Confirm this is *not* a top dataset, but a processor.
 934
 935        Used for processor compatibility checks.
 936
 937        :return bool:  Always `False`, because this is a processor.
 938        """
 939        return False
 940
 941    @classmethod
 942    def is_from_collector(cls):
 943        """
 944        Check if this processor is one that collects data, i.e. a search or
 945        import worker.
 946
 947        :return bool:
 948        """
 949        return cls.type.endswith("-search") or cls.type.endswith("-import")
 950
 951    @classmethod
 952    def get_extension(self, parent_dataset=None):
 953        """
 954        Return the extension of the processor's dataset
 955
 956        Used for processor compatibility checks.
 957
 958        :param DataSet parent_dataset:  An object representing the dataset that
 959          the processor would be run on
 960        :return str|None:  Dataset extension (without leading `.`) or `None`.
 961        """
 962        if self.is_filter():
 963            if parent_dataset is not None:
 964                # Filters should use the same extension as the parent dataset
 965                return parent_dataset.get_extension()
 966            else:
 967                # No dataset provided, unable to determine extension of parent dataset
 968                # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
 969                return None
 970        elif self.extension:
 971            # Use explicitly defined extension in class (Processor class defaults to "csv")
 972            return self.extension
 973        else:
 974            # A non filter processor updated the base Processor extension to None/False?
 975            return None
 976
 977    @classmethod
 978    def is_rankable(cls, multiple_items=True):
 979        """
 980        Used for processor compatibility
 981
 982        :param bool multiple_items:  Consider datasets with multiple items per
 983          item (e.g. word_1, word_2, etc)? Included for compatibility
 984        """
 985        return False
 986
 987    @classmethod
 988    def exclude_followup_processors(cls, processor_type=None):
 989        """
 990        Used for processor compatibility
 991
 992        To be defined by the child processor if it should exclude certain follow-up processors.
 993        e.g.:
 994
 995        def exclude_followup_processors(cls, processor_type):
 996            if processor_type in ["undesirable-followup-processor"]:
 997                return True
 998            return False
 999
1000        :param str processor_type:  Processor type to exclude
1001        :return bool:  True if processor should be excluded, False otherwise
1002        """
1003        return False
1004
1005    @abc.abstractmethod
1006    def process(self):
1007        """
1008        Process data
1009
1010        To be defined by the child processor.
1011        """
1012        pass
1013
1014    @staticmethod
1015    def is_4cat_processor():
1016        """
1017        Is this a 4CAT processor?
1018
1019        This is used to determine whether a class is a 4CAT
1020        processor.
1021
1022        :return:  True
1023        """
1024        return True

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:

@classmethod
def is_compatible_with(cls, module=None, config=None):
    return module.type == "linguistic-features"
db = None
job = None
dataset = None
owner = None
source_dataset = None
source_file = None
description = 'No description available'
category = 'Other'
extension = 'csv'
config = None
is_running_in_preset = False
is_hidden = False
filepath = None
for_cleanup = None
def work(self):
 99    def work(self):
100        """
101        Process a dataset
102
103        Loads dataset metadata, sets up the scaffolding for performing some kind
104        of processing on that dataset, and then processes it. Afterwards, clean
105        up.
106        """
107        try:
108            # a dataset can have multiple owners, but the creator is the user
109            # that actually queued the processor, so their config is relevant
110            self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
111            self.owner = self.dataset.creator
112        except DataSetException:
113            # query has been deleted in the meantime. finish without error,
114            # as deleting it will have been a conscious choice by a user
115            self.job.finish()
116            return
117
118        # set up config reader wrapping the worker's config manager, which is
119        # in turn the one passed to it by the WorkerManager, which is the one
120        # originally loaded in bootstrap
121        self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))
122
123        if self.dataset.data.get("key_parent", None):
124            # search workers never have parents (for now), so we don't need to
125            # find out what the source_dataset dataset is if it's a search worker
126            try:
127                self.source_dataset = self.dataset.get_parent()
128
129                # for presets, transparently use the *top* dataset as a source_dataset
130                # since that is where any underlying processors should get
131                # their data from. However, this should only be done as long as the
132                # preset is not finished yet, because after that there may be processors
133                # that run on the final preset result
134                while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
135                    self.is_running_in_preset = True
136                    self.source_dataset = self.source_dataset.get_parent()
137                    if self.source_dataset is None:
138                        # this means there is no dataset that is *not* a preset anywhere
139                        # above this dataset. This should never occur, but if it does, we
140                        # cannot continue
141                        self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
142                                       (self.type, self.dataset.key))
143                        self.job.finish()
144                        return
145
146            except DataSetException:
147                # we need to know what the source_dataset dataset was to properly handle the
148                # analysis
149                self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
150                    self.type, self.dataset.key))
151                self.job.finish()
152                return
153
154            if not self.source_dataset.is_finished() and not self.is_running_in_preset:
155                # not finished yet - retry after a while
156                # exception for presets, since these *should* be unfinished
157                # until underlying processors are done
158                self.job.release(delay=30)
159                return
160
161            self.source_file = self.source_dataset.get_results_path()
162            if not self.source_file.exists():
163                self.dataset.update_status("Finished, no input data found.")
164
165        self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
166
167        processor_name = self.title if hasattr(self, "title") else self.type
168        self.dataset.clear_log()
169        self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
170
171        # start log file
172        self.dataset.update_status("Processing data")
173        self.dataset.update_version(get_software_commit(self))
174
175        # we may create temporary files with the processor; anything in here
176        # will be deleted after the processor ends (or crashes!). dataset
177        # objects will have their cleanup methods called
178        self.for_cleanup = [self.dataset]
179        if self.source_dataset is not None:
180            # Add source dataset to cleanup list to remove disposable files
181            self.for_cleanup.append(self.source_dataset)
182
183        # get parameters
184        # if possible, fill defaults where parameters are not provided
185        given_parameters = self.dataset.parameters.copy()
186        all_parameters = self.get_options(self.dataset, config=self.config)
187        self.parameters = {
188            param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
189            for param in [*all_parameters.keys(), *given_parameters.keys()]
190        }
191
192        # now the parameters have been loaded into memory, clear any sensitive
193        # ones. This has a side-effect that a processor may not run again
194        # without starting from scratch, but this is the price of progress
195        options = self.get_options(self.dataset.get_parent(), config=self.config)
196        for option, option_settings in options.items():
197            if option_settings.get("sensitive"):
198                self.dataset.delete_parameter(option)
199
200        if self.interrupted:
201            self.dataset.log("Processing interrupted, trying again later")
202            return self.abort()
203
204        if not self.dataset.is_finished():
205            try:
206                self.process()
207                self.after_process()
208                
209                # processors should usually finish their jobs by themselves, but if
210                # the worker finished without errors, the job can be finished in
211                # any case
212                if not self.job.is_finished:
213                    self.job.finish()
214            except WorkerInterruptedException as e:
215                self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
216                self.abort()
217            except Exception as e:
218                self.dataset.log("Processor crashed (%s), trying again later" % str(e))
219                stack = traceback.extract_tb(e.__traceback__)
220                frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]]
221                location = "->".join(frames)
222
223                # Not all datasets have source_dataset keys
224                if len(self.dataset.get_genealogy()) > 1:
225                    parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
226                else:
227                    parent_key = ""
228                
229                # Clean up partially created datasets/files
230                self.clean_up_on_error()
231
232                raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
233                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack)
234
235            finally:
236                # clean up files that have been created and marked as disposable
237                for item in self.for_cleanup:
238                    if type(item) is DataSet:
239                        item.remove_disposable_files()
240                    elif item.exists():
241                        shutil.rmtree(item, ignore_errors=True)
242        else:
243            # dataset already finished, job shouldn't be open anymore
244            self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (
245            self.job.data["jobtype"], self.job.data["remote_id"]))
246            self.job.finish()

Process a dataset

Loads dataset metadata, sets up the scaffolding for performing some kind of processing on that dataset, and then processes it. Afterwards, clean up.

def after_process(self):
248    def after_process(self):
249        """
250        Run after processing the dataset
251
252        This method cleans up temporary files, and if needed, handles logistics
253        concerning the result file, e.g. running a pre-defined processor on the
254        result, copying it to another dataset, and so on.
255        """
256        if self.dataset.data["num_rows"] > 0:
257            self.dataset.update_status("Dataset completed.")
258
259        if not self.dataset.is_finished():
260            self.dataset.finish()
261
262        # see if we have anything else lined up to run next
263        for next in self.parameters.get("next", []):
264            can_run_next = True
265            next_parameters = next.get("parameters", {})
266            next_type = next.get("type", "")
267            try:
268                available_processors = self.dataset.get_available_processors(config=self.config)
269            except ValueError:
270                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
271                break
272
273
274            # run it only if the post-processor is actually available for this query
275            if self.dataset.data["num_rows"] <= 0:
276                can_run_next = False
277                self.log.info(
278                    "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (
279                        next_type, self.dataset.key))
280
281            elif next_type in available_processors:
282                next_analysis = DataSet(
283                    parameters=next_parameters,
284                    type=next_type,
285                    db=self.db,
286                    parent=self.dataset.key,
287                    extension=available_processors[next_type].extension,
288                    is_private=self.dataset.is_private,
289                    owner=self.dataset.creator,
290                    modules=self.modules
291                )
292                # copy ownership from parent dataset
293                next_analysis.copy_ownership_from(self.dataset)
294                # add to queue
295                self.queue.add_job(self.modules.processors[next_type], dataset=next_analysis)
296            else:
297                can_run_next = False
298                self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (
299                self.dataset.key, self.type, next_type))
300
301            if not can_run_next:
302                # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
303                # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
304                if "attach_to" in self.parameters:
305                    # Probably should not happen, but for some reason a mid processor has been designated as the processor
306                    # the parent should attach to
307                    pass
308                else:
309                    # Check for "attach_to" parameter in descendents
310                    while True:
311                        if "attach_to" in next_parameters:
312                            self.parameters["attach_to"] = next_parameters["attach_to"]
313                            break
314                        else:
315                            if "next" in next_parameters:
316                                next_parameters = next_parameters["next"][0]["parameters"]
317                            else:
318                                # No more descendents
319                                # Should not happen; we cannot find the source dataset
320                                self.log.warning(
321                                    "Cannot find preset's source dataset for dataset %s" % self.dataset.key)
322                                break
323
324        # see if we need to register the result somewhere
325        if "copy_to" in self.parameters:
326            # copy the results to an arbitrary place that was passed
327            if self.dataset.get_results_path().exists():
328                shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
329            else:
330                # if copy_to was passed, that means it's important that this
331                # file exists somewhere, so we create it as an empty file
332                with open(self.parameters["copy_to"], "w") as empty_file:
333                    empty_file.write("")
334
335        # see if this query chain is to be attached to another query
336        # if so, the full genealogy of this query (minus the original dataset)
337        # is attached to the given query - this is mostly useful for presets,
338        # where a chain of processors can be marked as 'underlying' a preset
339        if "attach_to" in self.parameters:
340            try:
341                # copy metadata and results to the surrogate
342                surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules)
343
344                if self.dataset.get_results_path().exists():
345                    # Update the surrogate's results file suffix to match this dataset's suffix
346                    surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(
347                        self.dataset.get_results_path().suffix)
348                    shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
349
350                try:
351                    surrogate.finish(self.dataset.data["num_rows"])
352                except RuntimeError:
353                    # already finished, could happen (though it shouldn't)
354                    pass
355
356                surrogate.update_status(self.dataset.get_status())
357
358            except DataSetException:
359                # dataset with key to attach to doesn't exist...
360                self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have "
361                                 "been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"]))
362
363        self.job.finish()
364
365        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
366            owner = self.dataset.get_parameters().get("email-complete", False)
367            # Check that username is email address
368            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
369                from email.mime.multipart import MIMEMultipart
370                from email.mime.text import MIMEText
371                from smtplib import SMTPException
372                import socket
373                import html2text
374
375        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
376            owner = self.dataset.get_parameters().get("email-complete", False)
377            # Check that username is email address
378            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
379                from email.mime.multipart import MIMEMultipart
380                from email.mime.text import MIMEText
381                from smtplib import SMTPException
382                import socket
383                import html2text
384
385                self.log.debug("Sending email to %s" % owner)
386                dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
387                sender = self.config.get('mail.noreply')
388                message = MIMEMultipart("alternative")
389                message["From"] = sender
390                message["To"] = owner
391                message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
392                mail = """
393                    <p>Hello %s,</p>
394                    <p>4CAT has finished collecting your %s dataset labeled: %s</p>
395                    <p>You can view your dataset via the following link:</p>
396                    <p><a href="%s">%s</a></p> 
397                    <p>Sincerely,</p>
398                    <p>Your friendly neighborhood 4CAT admin</p>
399                    """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
400                html_parser = html2text.HTML2Text()
401                message.attach(MIMEText(html_parser.handle(mail), "plain"))
402                message.attach(MIMEText(mail, "html"))
403                try:
404                    send_email([owner], message, self.config)
405                except (SMTPException, ConnectionRefusedError, socket.timeout):
406                    self.log.error("Error sending email to %s" % owner)

Run after processing the dataset

This method cleans up temporary files, and if needed, handles logistics concerning the result file, e.g. running a pre-defined processor on the result, copying it to another dataset, and so on.

def clean_up_on_error(self):
408    def clean_up_on_error(self):
409        try:
410            # ensure proxied requests are stopped
411            self.flush_proxied_requests()
412            # delete annotations that have been generated as part of this processor
413            self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True)
414            # Remove the results file that was created
415            if self.dataset.get_results_path().exists():
416                self.dataset.get_results_path().unlink()
417            if self.dataset.get_results_folder_path().exists():
418                shutil.rmtree(self.dataset.get_results_folder_path())
419
420        except Exception as e:
421            self.log.error("Error during processor cleanup after error: %s" % str(e))
def abort(self):
423    def abort(self):
424        """
425        Abort dataset creation and clean up so it may be attempted again later
426        """
427        self.clean_up_on_error()
428
429        # we release instead of finish, since interrupting is just that - the
430        # job should resume at a later point. Delay resuming by 10 seconds to
431        # give 4CAT the time to do whatever it wants (though usually this isn't
432        # needed since restarting also stops the spawning of new workers)
433        if self.interrupted == self.INTERRUPT_RETRY:
434            # retry later - wait at least 10 seconds to give the backend time to shut down
435            self.job.release(delay=10)
436        elif self.interrupted == self.INTERRUPT_CANCEL:
437            # cancel job
438            self.job.finish()

Abort dataset creation and clean up so it may be attempted again later

def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
440    def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
441        """
442        Request an iterable of URLs and return results
443
444        This method takes an iterable yielding URLs and yields the result for
445        a GET request for that URL in return. This is done through the worker
446        manager's DelegatedRequestHandler, which can run multiple requests in
447        parallel and divide them over the proxies configured in 4CAT (if any).
448        Proxy cooloff and queueing is shared with other processors, so that a
449        processor will never accidentally request from the same site as another
450        processor, potentially triggering rate limits.
451
452        :param urls:  Something that can be iterated over and yields URLs
453        :param kwargs:  Other keyword arguments are passed on to `add_urls`
454        and eventually to `requests.get()`.
455        :param bool preserve_order:  Return items in the original order. Use
456        `False` to potentially speed up processing, if order is not important.
457        :return:  A generator yielding request results, i.e. tuples of a
458        URL and a `requests` response objects
459        """
460        queue_name = self._proxy_queue_name()
461        delegator = self.manager.proxy_delegator
462
463        delegator.refresh_settings(self.config)
464
465        # 50 is an arbitrary batch size - but we top up every 0.05s, so
466        # that should be sufficient
467        batch_size = 50
468
469        # we need an iterable, so we can use next() and StopIteration
470        urls = iter(urls)
471
472        have_urls = True
473        while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls:
474            if queue_length < batch_size and have_urls:
475                batch = []
476                while len(batch) < (batch_size - queue_length):
477                    try:
478                        batch.append(next(urls))
479                    except StopIteration:
480                        have_urls = False
481                        break
482
483                delegator.add_urls(batch, queue_name, **kwargs)
484
485            time.sleep(0.05)  # arbitrary...
486            for url, result in delegator.get_results(queue_name, preserve_order=preserve_order):
487                # result may also be a FailedProxiedRequest!
488                # up to the processor to decide how to deal with it
489                yield url, result

Request an iterable of URLs and return results

This method takes an iterable yielding URLs and yields the result for a GET request for that URL in return. This is done through the worker manager's DelegatedRequestHandler, which can run multiple requests in parallel and divide them over the proxies configured in 4CAT (if any). Proxy cooloff and queueing is shared with other processors, so that a processor will never accidentally request from the same site as another processor, potentially triggering rate limits.

Parameters
  • urls: Something that can be iterated over and yields URLs
  • kwargs: Other keyword arguments are passed on to add_urls and eventually to requests.get().
  • bool preserve_order: Return items in the original order. Use False to potentially speed up processing, if order is not important.
Returns

A generator yielding request results, i.e. tuples of a URL and a requests response objects

def push_proxied_request(self, url, position=-1, **kwargs):
491    def push_proxied_request(self, url, position=-1, **kwargs):
492        """
493        Add a single URL to the proxied requests queue
494
495        :param str url:  URL to add
496        :param position:  Position to add to queue; can be used to add priority
497        requests, adds to end of queue by default
498        :param kwargs:
499        """
500        self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)

Add a single URL to the proxied requests queue

Parameters
  • str url: URL to add
  • position: Position to add to queue; can be used to add priority requests, adds to end of queue by default
  • kwargs:
def flush_proxied_requests(self):
502    def flush_proxied_requests(self):
503        """
504        Get rid of remaining proxied requests
505
506        Can be used if enough results are available and any remaining ones need
507        to be stopped ASAP and are otherwise unneeded.
508
509        Blocking!
510        """
511        self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())

Get rid of remaining proxied requests

Can be used if enough results are available and any remaining ones need to be stopped ASAP and are otherwise unneeded.

Blocking!

def unpack_archive_contents(self, path, staging_area=None):
523    def unpack_archive_contents(self, path, staging_area=None):
524        """
525        Unpack all files in an archive to a staging area
526
527        With every iteration, the processor's 'interrupted' flag is checked,
528        and if set a ProcessorInterruptedException is raised, which by default
529        is caught and subsequently stops execution gracefully.
530
531        Files are unzipped to a staging area. The staging area is *not*
532        cleaned up automatically.
533
534        :param Path path:     Path to zip file to read
535        :param Path staging_area:  Where to store the files while they're
536          being worked with. If omitted, a temporary folder is created and
537          deleted after use
538        :param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
539        :return Path:  A path to the staging area
540        """
541
542        if not path.exists():
543            return
544
545        if not staging_area:
546            staging_area = self.dataset.get_staging_area()
547
548        if not staging_area.exists() or not staging_area.is_dir():
549            raise RuntimeError("Staging area %s is not a valid folder")
550
551        paths = []
552        with zipfile.ZipFile(path, "r") as archive_file:
553            archive_contents = sorted(archive_file.namelist())
554
555            for archived_file in archive_contents:
556                if self.interrupted:
557                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
558
559                file_name = archived_file.split("/")[-1]
560                temp_file = staging_area.joinpath(file_name)
561                archive_file.extract(archived_file, staging_area)
562                paths.append(temp_file)
563
564        return staging_area

Unpack all files in an archive to a staging area

With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.

Files are unzipped to a staging area. The staging area is not cleaned up automatically.

Parameters
  • Path path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
  • int max_number_files: Maximum number of files to unpack. If None, all files unpacked
Returns

A path to the staging area

def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
566    def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
567        """
568        Extract a file from an archive by name
569
570        :param str filename:  Name of file to extract
571        :param Path archive_path:  Path to zip file to read
572        :param Path staging_area:  Where to store the files while they're
573                  being worked with. If omitted, a temporary folder is created
574        :return Path:  A path to the extracted file
575        """
576        if not archive_path.exists():
577            return
578
579        if not staging_area:
580            staging_area = self.dataset.get_staging_area()
581
582        if not staging_area.exists() or not staging_area.is_dir():
583            raise RuntimeError("Staging area %s is not a valid folder")
584
585        with zipfile.ZipFile(archive_path, "r") as archive_file:
586            if filename not in archive_file.namelist():
587                raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
588            else:
589                archive_file.extract(filename, staging_area)
590                return staging_area.joinpath(filename)

Extract a file from an archive by name

Parameters
  • str filename: Name of file to extract
  • Path archive_path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created
Returns

A path to the extracted file

def write_csv_items_and_finish(self, data):
592    def write_csv_items_and_finish(self, data):
593        """
594        Write data as csv to results file and finish dataset
595
596        Determines result file path using dataset's path determination helper
597        methods. After writing results, the dataset is marked finished. Will
598        raise a ProcessorInterruptedException if the interrupted flag for this
599        processor is set while iterating.
600
601        :param data: A list or tuple of dictionaries, all with the same keys
602        """
603        if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
604            raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
605
606        if not data:
607            raise ValueError("write_csv_items requires a dictionary with at least one item")
608
609        self.dataset.update_status("Writing results file")
610        writer = False
611        with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
612            for row in data:
613                if self.interrupted:
614                    raise ProcessorInterruptedException("Interrupted while writing results file")
615
616                row = remove_nuls(row)
617                if not writer:
618                    writer = csv.DictWriter(results, fieldnames=row.keys())
619                    writer.writeheader()
620
621                writer.writerow(row)
622
623        self.dataset.update_status("Finished")
624        self.dataset.finish(len(data))

Write data as csv to results file and finish dataset

Determines result file path using dataset's path determination helper methods. After writing results, the dataset is marked finished. Will raise a ProcessorInterruptedException if the interrupted flag for this processor is set while iterating.

Parameters
  • data: A list or tuple of dictionaries, all with the same keys
def write_archive_and_finish(self, files, num_items=None, compression=0, finish=True):
626    def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
627        """
628        Archive a bunch of files into a zip archive and finish processing
629
630        :param list|Path files: If a list, all files will be added to the
631          archive and deleted afterwards. If a folder, all files in the folder
632          will be added and the folder will be deleted afterwards.
633        :param int num_items: Items in the dataset. If None, the amount of
634          files added to the archive will be used.
635        :param int compression:  Type of compression to use. By default, files
636          are not compressed, to speed up unarchiving.
637        :param bool finish:  Finish the dataset/job afterwards or not?
638        """
639        is_folder = False
640        if issubclass(type(files), PurePath):
641            is_folder = files
642            if not files.exists() or not files.is_dir():
643                raise RuntimeError("Folder %s is not a folder that can be archived" % files)
644
645            files = files.glob("*")
646
647        # create zip of archive and delete temporary files and folder
648        self.dataset.update_status("Compressing results into archive")
649        done = 0
650        with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
651            for output_path in files:
652                zip.write(output_path, output_path.name)
653                output_path.unlink()
654                done += 1
655
656        # delete temporary folder
657        if is_folder:
658            shutil.rmtree(is_folder)
659
660        self.dataset.update_status("Finished")
661        if num_items is None:
662            num_items = done
663
664        if finish:
665            self.dataset.finish(num_items)

Archive a bunch of files into a zip archive and finish processing

Parameters
  • list|Path files: If a list, all files will be added to the archive and deleted afterwards. If a folder, all files in the folder will be added and the folder will be deleted afterwards.
  • int num_items: Items in the dataset. If None, the amount of files added to the archive will be used.
  • int compression: Type of compression to use. By default, files are not compressed, to speed up unarchiving.
  • bool finish: Finish the dataset/job afterwards or not?
def create_standalone(self, item_ids=None):
667    def create_standalone(self, item_ids=None):
668        """
669        Copy this dataset and make that copy standalone.
670
671        This has the benefit of allowing for all analyses that can be run on
672        full datasets on the new, filtered copy as well.
673        
674        This also transfers annotations and annotation fields.
675
676        :param list item_ids:   The item_ids that are copied-over. Used to check what annotations need to be copied.
677
678        :return DataSet:  The new standalone dataset
679        """
680
681        top_parent = self.source_dataset
682
683        finished = self.dataset.check_dataset_finished()
684        if finished == 'empty':
685            # No data to process, so we can't create a standalone dataset
686            return
687        elif finished is None:
688            # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
689            pass
690
691        standalone = self.dataset.copy(shallow=False)
692        standalone.body_match = "(Filtered) " + top_parent.query
693        standalone.datasource = top_parent.parameters.get("datasource", "custom")
694
695        if top_parent.annotation_fields and top_parent.num_annotations() > 0:
696            # Get column names dynamically
697            annotation_cols = self.db.fetchone("SELECT * FROM annotations LIMIT 1")
698            annotation_cols = list(annotation_cols.keys())
699            annotation_cols.remove("id")  # Set by the DB
700            cols_str = ",".join(annotation_cols)
701
702            cols_list = ["a." + col for col in annotation_cols if col != "dataset"]
703            query = f"INSERT INTO annotations ({cols_str}) OVERRIDING USER VALUE " \
704                    f"SELECT %s, {', '.join(cols_list)} " \
705                    f"FROM annotations AS a WHERE a.dataset = %s"
706
707            # Copy over all annotations if no item_ids are given
708            if not item_ids or top_parent.num_rows == standalone.num_rows:
709                self.db.execute(query, replacements=(standalone.key, top_parent.key))
710            else:
711                query += " AND a.item_id = ANY(%s)"
712                self.db.execute(query, replacements=(standalone.key, top_parent.key, item_ids))
713
714        # Copy over annotation fields and update annotations with new field IDs
715        if top_parent.annotation_fields:
716            # New field IDs based on the new dataset key
717            annotation_fields = {
718                hash_to_md5(old_field_id + standalone.key): field_values
719                for old_field_id, field_values in top_parent.annotation_fields.items()
720            }
721            standalone.annotation_fields = {}  # Reset to insert everything without checking for changes
722            standalone.save_annotation_fields(annotation_fields)  # Save to db
723
724            # Also update field IDs in annotations
725            for i, old_field_id in enumerate(top_parent.annotation_fields.keys()):
726                self.db.update(
727                    "annotations",
728                    where={"field_id": old_field_id, "dataset": standalone.key},
729                    data={"field_id": hash_to_md5(old_field_id + standalone.key)
730                })
731
732        try:
733            standalone.board = top_parent.board
734        except AttributeError:
735            standalone.board = self.type
736
737        standalone.type = top_parent.type
738
739        standalone.detach()
740        standalone.delete_parameter("key_parent")
741
742        self.dataset.copied_to = standalone.key
743
744        # we don't need this file anymore - it has been copied to the new
745        # standalone dataset, and this one is not accessible via the interface
746        # except as a link to the copied standalone dataset
747        os.unlink(self.dataset.get_results_path())
748
749        # Copy the log
750        shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
751
752        return standalone

Copy this dataset and make that copy standalone.

This has the benefit of allowing for all analyses that can be run on full datasets on the new, filtered copy as well.

This also transfers annotations and annotation fields.

Parameters
  • list item_ids: The item_ids that are copied-over. Used to check what annotations need to be copied.
Returns

The new standalone dataset

def save_annotations( self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
754    def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
755        """
756        Saves annotations made by this processor on the basis of another dataset.
757        Also adds some data regarding this processor: set `author` and `label` to processor name,
758        and add parameters to `metadata` (unless explicitly indicated).
759
760        :param annotations:				List of dictionaries with annotation items. Must have `item_id` and `value`.
761                                        E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
762        :param source_dataset:			The dataset that these annotations will be saved on. If None, will use the
763                                        top parent.
764        :param bool hide_in_explorer:	Whether this annotation is included in the Explorer. 'Hidden' annotations
765                                        are still shown in `iterate_items()`).
766
767        :returns int:					How many annotations were saved.
768
769        """
770
771        if not annotations:
772            return 0
773
774        # Default to parent dataset
775        if not source_dataset:
776            source_dataset = self.source_dataset.top_parent()
777
778        # Check if this dataset already has annotation fields, and if so, store some values to use per annotation.
779        annotation_fields = source_dataset.annotation_fields
780
781        # Keep track of what fields we've already seen, so we don't need to hash every time.
782        seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id
783                       for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items}
784
785        annotations_saved = 0
786        failed = 0
787
788        # Loop through all annotations. This may be batched.
789        for annotation in annotations:
790
791            # item_id always needs to be present
792            if not annotation.get("item_id"):
793                failed += 1
794                continue
795
796            # Keep track of what dataset generated this annotation
797            annotation["from_dataset"] = self.dataset.key
798            # Set the author to this processor's name
799            if not annotation.get("author"):
800                annotation["author"] = self.name
801            if not annotation.get("author_original"):
802                annotation["author_original"] = self.name
803            annotation["by_processor"] = True
804
805            # Only use a default label if no custom one is given
806            if not annotation.get("label"):
807                annotation["label"] = self.name
808
809            # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet.
810            # We need to do this within this loop because this function may be called in batches and with different
811            # annotation types.
812            if (annotation["from_dataset"], annotation["label"]) not in seen_fields:
813                # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key.
814                # This should create unique fields, even if there's multiple annotation types for one processor.
815                field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"])
816                seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id
817                annotation_fields[field_id] = {
818                    "label": annotation["label"],
819                    "type": annotation["type"] if annotation.get("type") else "text",
820                    "from_dataset": annotation["from_dataset"],
821                    "hide_in_explorer": hide_in_explorer
822                }
823            else:
824                # Else just get the field ID
825                field_id = seen_fields[(annotation["from_dataset"], annotation["label"])]
826
827            # Add field ID to the annotation
828            annotation["field_id"] = field_id
829
830        try:
831            annotations_saved = source_dataset.save_annotations(annotations)
832        except AnnotationException as e:
833            self.source_dataset.update_status(str(e))
834        if failed:
835            self.dataset.update_status("Could not save all annotations, make sure that all items have an `id` value.")
836
837        source_dataset.save_annotation_fields(annotation_fields)
838
839        return annotations_saved

Saves annotations made by this processor on the basis of another dataset. Also adds some data regarding this processor: set author and label to processor name, and add parameters to metadata (unless explicitly indicated).

Parameters
  • annotations: List of dictionaries with annotation items. Must have item_id and value. E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
  • source_dataset: The dataset that these annotations will be saved on. If None, will use the top parent.
  • bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations are still shown in iterate_items()).

:returns int: How many annotations were saved.

@classmethod
def map_item_method_available(cls, dataset):
841    @classmethod
842    def map_item_method_available(cls, dataset):
843        """
844        Check if this processor can use map_item
845
846        Checks if map_item method exists and is compatible with dataset. If
847        dataset has a different extension than the default for this processor,
848        or if the dataset has no extension, this means we cannot be sure the
849        data is in the right format to be mapped, so `False` is returned in
850        that case even if a map_item() method is available.
851
852        :param BasicProcessor processor:    The BasicProcessor subclass object
853        with which to use map_item
854        :param DataSet dataset:                The DataSet object with which to
855        use map_item
856        """
857        # only run item mapper if extension of processor == extension of
858        # data file, for the scenario where a csv file was uploaded and
859        # converted to an ndjson-based data source, for example
860        # todo: this is kind of ugly, and a better fix may be possible
861        dataset_extension = dataset.get_extension()
862        if not dataset_extension:
863            # DataSet results file does not exist or has no extension, use expected extension
864            if hasattr(dataset, "extension"):
865                dataset_extension = dataset.extension
866            else:
867                # No known DataSet extension; cannot determine if map_item method compatible
868                return False
869
870        return hasattr(cls, "map_item") and cls.extension == dataset_extension

Check if this processor can use map_item

Checks if map_item method exists and is compatible with dataset. If dataset has a different extension than the default for this processor, or if the dataset has no extension, this means we cannot be sure the data is in the right format to be mapped, so False is returned in that case even if a map_item() method is available.

Parameters
  • BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
  • DataSet dataset: The DataSet object with which to use map_item
@classmethod
def get_mapped_item(cls, item):
872    @classmethod
873    def get_mapped_item(cls, item):
874        """
875        Get the mapped item using a processors map_item method.
876
877        Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
878        """
879        try:
880            mapped_item = cls.map_item(item)
881        except (KeyError, IndexError) as e:
882            raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
883
884        if not mapped_item:
885            raise MapItemException("Unable to map item!")
886
887        return mapped_item

Get the mapped item using a processors map_item method.

Ensure map_item method is compatible with a dataset by checking map_item_method_available first.

@classmethod
def is_filter(cls):
889    @classmethod
890    def is_filter(cls):
891        """
892        Is this processor a filter?
893
894        Filters do not produce their own dataset but replace the source_dataset dataset
895        instead.
896
897        :todo: Make this a bit more robust than sniffing the processor category
898        :return bool:
899        """
900        return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()

Is this processor a filter?

Filters do not produce their own dataset but replace the source_dataset dataset instead.

:todo: Make this a bit more robust than sniffing the processor category

Returns
@classmethod
def get_options(cls, parent_dataset=None, config=None) -> dict:
902    @classmethod
903    def get_options(cls, parent_dataset=None, config=None) -> dict:
904        """
905        Get processor options
906
907        This method by default returns the class's "options" attribute, or an
908        empty dictionary. It can be redefined by processors that need more
909        fine-grained options, e.g. in cases where the availability of options
910        is partially determined by the parent dataset's parameters.
911
912        :param parent_dataset DataSet:  An object representing the dataset that
913            the processor would be or was run on. Can be used, in conjunction with
914            config, to show some options only to privileged users.
915        :param config ConfigManager|None config:  Configuration reader (context-aware)
916        :return dict:   Options for this processor
917        """
918
919        return cls.options if hasattr(cls, "options") else {}

Get processor options

This method by default returns the class's "options" attribute, or an empty dictionary. It can be redefined by processors that need more fine-grained options, e.g. in cases where the availability of options is partially determined by the parent dataset's parameters.

Parameters
  • parent_dataset DataSet: An object representing the dataset that the processor would be or was run on. Can be used, in conjunction with config, to show some options only to privileged users.
  • config ConfigManager|None config: Configuration reader (context-aware)
Returns

Options for this processor

@classmethod
def get_status(cls):
921    @classmethod
922    def get_status(cls):
923        """
924        Get processor status
925
926        :return list:    Statuses of this processor
927        """
928        return cls.status if hasattr(cls, "status") else None

Get processor status

Returns
Statuses of this processor
@classmethod
def is_top_dataset(cls):
930    @classmethod
931    def is_top_dataset(cls):
932        """
933        Confirm this is *not* a top dataset, but a processor.
934
935        Used for processor compatibility checks.
936
937        :return bool:  Always `False`, because this is a processor.
938        """
939        return False

Confirm this is not a top dataset, but a processor.

Used for processor compatibility checks.

Returns

Always False, because this is a processor.

@classmethod
def is_from_collector(cls):
941    @classmethod
942    def is_from_collector(cls):
943        """
944        Check if this processor is one that collects data, i.e. a search or
945        import worker.
946
947        :return bool:
948        """
949        return cls.type.endswith("-search") or cls.type.endswith("-import")

Check if this processor is one that collects data, i.e. a search or import worker.

Returns
@classmethod
def get_extension(self, parent_dataset=None):
951    @classmethod
952    def get_extension(self, parent_dataset=None):
953        """
954        Return the extension of the processor's dataset
955
956        Used for processor compatibility checks.
957
958        :param DataSet parent_dataset:  An object representing the dataset that
959          the processor would be run on
960        :return str|None:  Dataset extension (without leading `.`) or `None`.
961        """
962        if self.is_filter():
963            if parent_dataset is not None:
964                # Filters should use the same extension as the parent dataset
965                return parent_dataset.get_extension()
966            else:
967                # No dataset provided, unable to determine extension of parent dataset
968                # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
969                return None
970        elif self.extension:
971            # Use explicitly defined extension in class (Processor class defaults to "csv")
972            return self.extension
973        else:
974            # A non filter processor updated the base Processor extension to None/False?
975            return None

Return the extension of the processor's dataset

Used for processor compatibility checks.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
Returns

Dataset extension (without leading .) or None.

@classmethod
def is_rankable(cls, multiple_items=True):
977    @classmethod
978    def is_rankable(cls, multiple_items=True):
979        """
980        Used for processor compatibility
981
982        :param bool multiple_items:  Consider datasets with multiple items per
983          item (e.g. word_1, word_2, etc)? Included for compatibility
984        """
985        return False

Used for processor compatibility

Parameters
  • bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)? Included for compatibility
@classmethod
def exclude_followup_processors(cls, processor_type=None):
 987    @classmethod
 988    def exclude_followup_processors(cls, processor_type=None):
 989        """
 990        Used for processor compatibility
 991
 992        To be defined by the child processor if it should exclude certain follow-up processors.
 993        e.g.:
 994
 995        def exclude_followup_processors(cls, processor_type):
 996            if processor_type in ["undesirable-followup-processor"]:
 997                return True
 998            return False
 999
1000        :param str processor_type:  Processor type to exclude
1001        :return bool:  True if processor should be excluded, False otherwise
1002        """
1003        return False

Used for processor compatibility

To be defined by the child processor if it should exclude certain follow-up processors. e.g.:

def exclude_followup_processors(cls, processor_type): if processor_type in ["undesirable-followup-processor"]: return True return False

Parameters
  • str processor_type: Processor type to exclude
Returns

True if processor should be excluded, False otherwise

@abc.abstractmethod
def process(self):
1005    @abc.abstractmethod
1006    def process(self):
1007        """
1008        Process data
1009
1010        To be defined by the child processor.
1011        """
1012        pass

Process data

To be defined by the child processor.

@staticmethod
def is_4cat_processor():
1014    @staticmethod
1015    def is_4cat_processor():
1016        """
1017        Is this a 4CAT processor?
1018
1019        This is used to determine whether a class is a 4CAT
1020        processor.
1021
1022        :return:  True
1023        """
1024        return True

Is this a 4CAT processor?

This is used to determine whether a class is a 4CAT processor.

Returns

True