Edit on GitHub

backend.lib.processor

Basic post-processor worker - should be inherited by workers to post-process results

   1"""
   2Basic post-processor worker - should be inherited by workers to post-process results
   3"""
   4import traceback
   5import zipfile
   6import typing
   7import shutil
   8import abc
   9import csv
  10import os
  11import re
  12import time
  13
  14from pathlib import PurePath
  15
  16from backend.lib.worker import BasicWorker
  17from common.lib.dataset import DataSet
  18from common.lib.fourcat_module import FourcatModule
  19from common.lib.helpers import get_software_commit, remove_nuls, send_email, hash_to_md5
  20from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException,
  21                                   DataSetException, MapItemException)
  22from common.config_manager import ConfigWrapper
  23from common.lib.user import User
  24
  25csv.field_size_limit(1024 * 1024 * 1024)
  26
  27
  28class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
  29    """
  30    Abstract processor class
  31
  32    A processor takes a finished dataset as input and processes its result in
  33    some way, with another dataset set as output. The input thus is a file, and
  34    the output (usually) as well. In other words, the result of a processor can
  35    be used as input for another processor (though whether and when this is
  36    useful is another question).
  37
  38    To determine whether a processor can process a given dataset, you can
  39    define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class
  40    method which takes a dataset as argument and returns a bool that determines
  41    if this processor is considered compatible with that dataset. For example:
  42
  43    .. code-block:: python
  44
  45        @classmethod
  46        def is_compatible_with(cls, module=None, config=None):
  47            return module.type == "linguistic-features"
  48
  49
  50    """
  51
  52    #: Database handler to interface with the 4CAT database
  53    db = None
  54
  55    #: Job object that requests the execution of this processor
  56    job = None
  57
  58    #: The dataset object that the processor is *creating*.
  59    dataset = None
  60
  61    #: Owner (username) of the dataset
  62    owner = None
  63
  64    #: The dataset object that the processor is *processing*.
  65    source_dataset = None
  66
  67    #: The file that is being processed
  68    source_file = None
  69
  70    #: Processor description, which will be displayed in the web interface
  71    description = "No description available"
  72
  73    #: Category identifier, used to group processors in the web interface
  74    category = "Other"
  75
  76    #: Extension of the file created by the processor
  77    extension = "csv"
  78
  79    #: 4CAT settings from the perspective of the dataset's owner
  80    config = None
  81
  82    #: Is this processor running 'within' a preset processor?
  83    is_running_in_preset = False
  84
  85    #: Is this processor hidden in the front-end, and only used internally/in presets?
  86    is_hidden = False
  87
  88    #: This will be defined automatically upon loading the processor. There is
  89    #: no need to override manually
  90    filepath = None
  91
  92    def work(self):
  93        """
  94        Process a dataset
  95
  96        Loads dataset metadata, sets up the scaffolding for performing some kind
  97        of processing on that dataset, and then processes it. Afterwards, clean
  98        up.
  99        """
 100        try:
 101            # a dataset can have multiple owners, but the creator is the user
 102            # that actually queued the processor, so their config is relevant
 103            self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
 104            self.owner = self.dataset.creator
 105        except DataSetException:
 106            # query has been deleted in the meantime. finish without error,
 107            # as deleting it will have been a conscious choice by a user
 108            self.job.finish()
 109            return
 110
 111        # set up config reader wrapping the worker's config manager, which is
 112        # in turn the one passed to it by the WorkerManager, which is the one
 113        # originally loaded in bootstrap
 114        self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))
 115
 116        if self.dataset.data.get("key_parent", None):
 117            # search workers never have parents (for now), so we don't need to
 118            # find out what the source_dataset dataset is if it's a search worker
 119            try:
 120                self.source_dataset = self.dataset.get_parent()
 121
 122                # for presets, transparently use the *top* dataset as a source_dataset
 123                # since that is where any underlying processors should get
 124                # their data from. However, this should only be done as long as the
 125                # preset is not finished yet, because after that there may be processors
 126                # that run on the final preset result
 127                while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
 128                    self.is_running_in_preset = True
 129                    self.source_dataset = self.source_dataset.get_parent()
 130                    if self.source_dataset is None:
 131                        # this means there is no dataset that is *not* a preset anywhere
 132                        # above this dataset. This should never occur, but if it does, we
 133                        # cannot continue
 134                        self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
 135                                       (self.type, self.dataset.key))
 136                        self.job.finish()
 137                        return
 138
 139            except DataSetException:
 140                # we need to know what the source_dataset dataset was to properly handle the
 141                # analysis
 142                self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
 143                    self.type, self.dataset.key))
 144                self.job.finish()
 145                return
 146
 147            if not self.source_dataset.is_finished() and not self.is_running_in_preset:
 148                # not finished yet - retry after a while
 149                # exception for presets, since these *should* be unfinished
 150                # until underlying processors are done
 151                self.job.release(delay=30)
 152                return
 153
 154            self.source_file = self.source_dataset.get_results_path()
 155            if not self.source_file.exists():
 156                self.dataset.update_status("Finished, no input data found.")
 157
 158        self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
 159
 160        processor_name = self.title if hasattr(self, "title") else self.type
 161        self.dataset.clear_log()
 162        self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
 163
 164        # start log file
 165        self.dataset.update_status("Processing data")
 166        self.dataset.update_version(get_software_commit(self))
 167
 168        # get parameters
 169        # if possible, fill defaults where parameters are not provided
 170        given_parameters = self.dataset.parameters.copy()
 171        all_parameters = self.get_options(self.dataset, config=self.config)
 172        self.parameters = {
 173            param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
 174            for param in [*all_parameters.keys(), *given_parameters.keys()]
 175        }
 176
 177        # now the parameters have been loaded into memory, clear any sensitive
 178        # ones. This has a side-effect that a processor may not run again
 179        # without starting from scratch, but this is the price of progress
 180        options = self.get_options(self.dataset.get_parent(), config=self.config)
 181        for option, option_settings in options.items():
 182            if option_settings.get("sensitive"):
 183                self.dataset.delete_parameter(option)
 184
 185        if self.interrupted:
 186            self.dataset.log("Processing interrupted, trying again later")
 187            return self.abort()
 188
 189        if not self.dataset.is_finished():
 190            try:
 191                self.process()
 192                self.after_process()
 193            except WorkerInterruptedException as e:
 194                self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
 195                self.abort()
 196            except Exception as e:
 197                self.dataset.log("Processor crashed (%s), trying again later" % str(e))
 198                stack = traceback.extract_tb(e.__traceback__)
 199                frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]]
 200                location = "->".join(frames)
 201
 202                # Not all datasets have source_dataset keys
 203                if len(self.dataset.get_genealogy()) > 1:
 204                    parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
 205                else:
 206                    parent_key = ""
 207
 208                print("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
 209                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)))
 210                # remove any result files that have been created so far
 211                self.remove_files()
 212
 213                raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
 214                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack)
 215        else:
 216            # dataset already finished, job shouldn't be open anymore
 217            self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (
 218            self.job.data["jobtype"], self.job.data["remote_id"]))
 219            self.job.finish()
 220
 221    def after_process(self):
 222        """
 223        Run after processing the dataset
 224
 225        This method cleans up temporary files, and if needed, handles logistics
 226        concerning the result file, e.g. running a pre-defined processor on the
 227        result, copying it to another dataset, and so on.
 228        """
 229        if self.dataset.data["num_rows"] > 0:
 230            self.dataset.update_status("Dataset completed.")
 231
 232        if not self.dataset.is_finished():
 233            self.dataset.finish()
 234
 235        self.dataset.remove_staging_areas()
 236
 237        # see if we have anything else lined up to run next
 238        for next in self.parameters.get("next", []):
 239            can_run_next = True
 240            next_parameters = next.get("parameters", {})
 241            next_type = next.get("type", "")
 242            try:
 243                available_processors = self.dataset.get_available_processors(config=self.config)
 244            except ValueError:
 245                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
 246                break
 247
 248        # see if we have anything else lined up to run next
 249        for next in self.parameters.get("next", []):
 250            can_run_next = True
 251            next_parameters = next.get("parameters", {})
 252            next_type = next.get("type", "")
 253            try:
 254                available_processors = self.dataset.get_available_processors(config=self.config)
 255            except ValueError:
 256                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
 257                break
 258
 259
 260            # run it only if the post-processor is actually available for this query
 261            if self.dataset.data["num_rows"] <= 0:
 262                can_run_next = False
 263                self.log.info(
 264                    "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (
 265                        next_type, self.dataset.key))
 266
 267            elif next_type in available_processors:
 268                next_analysis = DataSet(
 269                    parameters=next_parameters,
 270                    type=next_type,
 271                    db=self.db,
 272                    parent=self.dataset.key,
 273                    extension=available_processors[next_type].extension,
 274                    is_private=self.dataset.is_private,
 275                    owner=self.dataset.creator,
 276                    modules=self.modules
 277                )
 278                # copy ownership from parent dataset
 279                next_analysis.copy_ownership_from(self.dataset)
 280                # add to queue
 281                self.queue.add_job(next_type, remote_id=next_analysis.key)
 282            else:
 283                can_run_next = False
 284                self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (
 285                self.dataset.key, self.type, next_type))
 286
 287            if not can_run_next:
 288                # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
 289                # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
 290                if "attach_to" in self.parameters:
 291                    # Probably should not happen, but for some reason a mid processor has been designated as the processor
 292                    # the parent should attach to
 293                    pass
 294                else:
 295                    # Check for "attach_to" parameter in descendents
 296                    while True:
 297                        if "attach_to" in next_parameters:
 298                            self.parameters["attach_to"] = next_parameters["attach_to"]
 299                            break
 300                        else:
 301                            if "next" in next_parameters:
 302                                next_parameters = next_parameters["next"][0]["parameters"]
 303                            else:
 304                                # No more descendents
 305                                # Should not happen; we cannot find the source dataset
 306                                self.log.warning(
 307                                    "Cannot find preset's source dataset for dataset %s" % self.dataset.key)
 308                                break
 309
 310        # see if we need to register the result somewhere
 311        if "copy_to" in self.parameters:
 312            # copy the results to an arbitrary place that was passed
 313            if self.dataset.get_results_path().exists():
 314                shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
 315            else:
 316                # if copy_to was passed, that means it's important that this
 317                # file exists somewhere, so we create it as an empty file
 318                with open(self.parameters["copy_to"], "w") as empty_file:
 319                    empty_file.write("")
 320
 321        # see if this query chain is to be attached to another query
 322        # if so, the full genealogy of this query (minus the original dataset)
 323        # is attached to the given query - this is mostly useful for presets,
 324        # where a chain of processors can be marked as 'underlying' a preset
 325        if "attach_to" in self.parameters:
 326            try:
 327                # copy metadata and results to the surrogate
 328                surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules)
 329
 330                if self.dataset.get_results_path().exists():
 331                    # Update the surrogate's results file suffix to match this dataset's suffix
 332                    surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(
 333                        self.dataset.get_results_path().suffix)
 334                    shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
 335
 336                try:
 337                    surrogate.finish(self.dataset.data["num_rows"])
 338                except RuntimeError:
 339                    # already finished, could happen (though it shouldn't)
 340                    pass
 341
 342                surrogate.update_status(self.dataset.get_status())
 343
 344            except ValueError:
 345                # dataset with key to attach to doesn't exist...
 346                self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % (
 347                    self.dataset.key, self.parameters["attach_to"]))
 348
 349        self.job.finish()
 350
 351        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 352            owner = self.dataset.get_parameters().get("email-complete", False)
 353            # Check that username is email address
 354            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 355                from email.mime.multipart import MIMEMultipart
 356                from email.mime.text import MIMEText
 357                from smtplib import SMTPException
 358                import socket
 359                import html2text
 360
 361        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 362            owner = self.dataset.get_parameters().get("email-complete", False)
 363            # Check that username is email address
 364            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 365                from email.mime.multipart import MIMEMultipart
 366                from email.mime.text import MIMEText
 367                from smtplib import SMTPException
 368                import socket
 369                import html2text
 370
 371                self.log.debug("Sending email to %s" % owner)
 372                dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
 373                sender = self.config.get('mail.noreply')
 374                message = MIMEMultipart("alternative")
 375                message["From"] = sender
 376                message["To"] = owner
 377                message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
 378                mail = """
 379                    <p>Hello %s,</p>
 380                    <p>4CAT has finished collecting your %s dataset labeled: %s</p>
 381                    <p>You can view your dataset via the following link:</p>
 382                    <p><a href="%s">%s</a></p> 
 383                    <p>Sincerely,</p>
 384                    <p>Your friendly neighborhood 4CAT admin</p>
 385                    """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
 386                html_parser = html2text.HTML2Text()
 387                message.attach(MIMEText(html_parser.handle(mail), "plain"))
 388                message.attach(MIMEText(mail, "html"))
 389                try:
 390                    send_email([owner], message, self.config)
 391                except (SMTPException, ConnectionRefusedError, socket.timeout):
 392                    self.log.error("Error sending email to %s" % owner)
 393
 394    def remove_files(self):
 395        """
 396        Clean up result files and any staging files for processor to be attempted
 397        later if desired.
 398        """
 399        # Remove the results file that was created
 400        if self.dataset.get_results_path().exists():
 401            self.dataset.get_results_path().unlink()
 402        if self.dataset.get_results_folder_path().exists():
 403            shutil.rmtree(self.dataset.get_results_folder_path())
 404
 405        # Remove any staging areas with temporary data
 406        self.dataset.remove_staging_areas()
 407
 408    def abort(self):
 409        """
 410        Abort dataset creation and clean up so it may be attempted again later
 411        """
 412
 413        # delete annotations that have been generated as part of this processor
 414        self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True)
 415        # remove any result files that have been created so far
 416        self.remove_files()
 417
 418        # we release instead of finish, since interrupting is just that - the
 419        # job should resume at a later point. Delay resuming by 10 seconds to
 420        # give 4CAT the time to do whatever it wants (though usually this isn't
 421        # needed since restarting also stops the spawning of new workers)
 422        if self.interrupted == self.INTERRUPT_RETRY:
 423            # retry later - wait at least 10 seconds to give the backend time to shut down
 424            self.job.release(delay=10)
 425        elif self.interrupted == self.INTERRUPT_CANCEL:
 426            # cancel job
 427            self.job.finish()
 428
 429    def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
 430        """
 431        Request an iterable of URLs and return results
 432
 433        This method takes an iterable yielding URLs and yields the result for
 434        a GET request for that URL in return. This is done through the worker
 435        manager's DelegatedRequestHandler, which can run multiple requests in
 436        parallel and divide them over the proxies configured in 4CAT (if any).
 437        Proxy cooloff and queueing is shared with other processors, so that a
 438        processor will never accidentally request from the same site as another
 439        processor, potentially triggering rate limits.
 440
 441        :param urls:  Something that can be iterated over and yields URLs
 442        :param kwargs:  Other keyword arguments are passed on to `add_urls`
 443        and eventually to `requests.get()`.
 444        :param bool preserve_order:  Return items in the original order. Use
 445        `False` to potentially speed up processing, if order is not important.
 446        :return:  A generator yielding request results, i.e. tuples of a
 447        URL and a `requests` response objects
 448        """
 449        queue_name = self._proxy_queue_name()
 450        delegator = self.manager.proxy_delegator
 451
 452        delegator.refresh_settings(self.config)
 453
 454        # 50 is an arbitrary batch size - but we top up every 0.05s, so
 455        # that should be sufficient
 456        batch_size = 50
 457
 458        # we need an iterable, so we can use next() and StopIteration
 459        urls = iter(urls)
 460
 461        have_urls = True
 462        while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls:
 463            if queue_length < batch_size and have_urls:
 464                batch = []
 465                while len(batch) < (batch_size - queue_length):
 466                    try:
 467                        batch.append(next(urls))
 468                    except StopIteration:
 469                        have_urls = False
 470                        break
 471
 472                delegator.add_urls(batch, queue_name, **kwargs)
 473
 474            time.sleep(0.05)  # arbitrary...
 475            for url, result in delegator.get_results(queue_name, preserve_order=preserve_order):
 476                # result may also be a FailedProxiedRequest!
 477                # up to the processor to decide how to deal with it
 478                yield url, result
 479
 480    def push_proxied_request(self, url, position=-1, **kwargs):
 481        """
 482        Add a single URL to the proxied requests queue
 483
 484        :param str url:  URL to add
 485        :param position:  Position to add to queue; can be used to add priority
 486        requests, adds to end of queue by default
 487        :param kwargs:
 488        """
 489        self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)
 490
 491    def flush_proxied_requests(self):
 492        """
 493        Get rid of remaining proxied requests
 494
 495        Can be used if enough results are available and any remaining ones need
 496        to be stopped ASAP and are otherwise unneeded.
 497
 498        Blocking!
 499        """
 500        self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())
 501
 502    def _proxy_queue_name(self):
 503        """
 504        Get proxy queue name
 505
 506        For internal use.
 507
 508        :return str:
 509        """
 510        return f"{self.type}-{self.dataset.key}"
 511
 512    def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
 513        """
 514        A generator that iterates through files in an archive
 515
 516        With every iteration, the processor's 'interrupted' flag is checked,
 517        and if set a ProcessorInterruptedException is raised, which by default
 518        is caught and subsequently stops execution gracefully.
 519
 520        Files are temporarily unzipped and deleted after use.
 521
 522        :param Path path:     Path to zip file to read
 523        :param Path staging_area:  Where to store the files while they're
 524          being worked with. If omitted, a temporary folder is created and
 525          deleted after use
 526        :param bool immediately_delete:  Temporary files are removed after yielded;
 527          False keeps files until the staging_area is removed (usually during processor
 528          cleanup)
 529        :param list filename_filter:  Whitelist of filenames to iterate.
 530        Other files will be ignored. If empty, do not ignore anything.
 531        :return:  An iterator with a Path item for each file
 532        """
 533
 534        if not path.exists():
 535            return
 536
 537        if not staging_area:
 538            staging_area = self.dataset.get_staging_area()
 539
 540        if not staging_area.exists() or not staging_area.is_dir():
 541            raise RuntimeError("Staging area %s is not a valid folder")
 542
 543        with zipfile.ZipFile(path, "r") as archive_file:
 544            archive_contents = sorted(archive_file.namelist())
 545
 546            for archived_file in archive_contents:
 547                if filename_filter and archived_file not in filename_filter:
 548                    continue
 549
 550                info = archive_file.getinfo(archived_file)
 551                if info.is_dir():
 552                    continue
 553
 554                if self.interrupted:
 555                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
 556
 557                temp_file = staging_area.joinpath(archived_file)
 558                archive_file.extract(archived_file, staging_area)
 559
 560                yield temp_file
 561                if immediately_delete:
 562                    temp_file.unlink()
 563
 564    def unpack_archive_contents(self, path, staging_area=None):
 565        """
 566        Unpack all files in an archive to a staging area
 567
 568        With every iteration, the processor's 'interrupted' flag is checked,
 569        and if set a ProcessorInterruptedException is raised, which by default
 570        is caught and subsequently stops execution gracefully.
 571
 572        Files are unzipped to a staging area. The staging area is *not*
 573        cleaned up automatically.
 574
 575        :param Path path:     Path to zip file to read
 576        :param Path staging_area:  Where to store the files while they're
 577          being worked with. If omitted, a temporary folder is created and
 578          deleted after use
 579        :param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
 580        :return Path:  A path to the staging area
 581        """
 582
 583        if not path.exists():
 584            return
 585
 586        if not staging_area:
 587            staging_area = self.dataset.get_staging_area()
 588
 589        if not staging_area.exists() or not staging_area.is_dir():
 590            raise RuntimeError("Staging area %s is not a valid folder")
 591
 592        paths = []
 593        with zipfile.ZipFile(path, "r") as archive_file:
 594            archive_contents = sorted(archive_file.namelist())
 595
 596            for archived_file in archive_contents:
 597                if self.interrupted:
 598                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
 599
 600                file_name = archived_file.split("/")[-1]
 601                temp_file = staging_area.joinpath(file_name)
 602                archive_file.extract(archived_file, staging_area)
 603                paths.append(temp_file)
 604
 605        return staging_area
 606
 607    def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
 608        """
 609        Extract a file from an archive by name
 610
 611        :param str filename:  Name of file to extract
 612        :param Path archive_path:  Path to zip file to read
 613        :param Path staging_area:  Where to store the files while they're
 614                  being worked with. If omitted, a temporary folder is created
 615        :return Path:  A path to the extracted file
 616        """
 617        if not archive_path.exists():
 618            return
 619
 620        if not staging_area:
 621            staging_area = self.dataset.get_staging_area()
 622
 623        if not staging_area.exists() or not staging_area.is_dir():
 624            raise RuntimeError("Staging area %s is not a valid folder")
 625
 626        with zipfile.ZipFile(archive_path, "r") as archive_file:
 627            if filename not in archive_file.namelist():
 628                raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
 629            else:
 630                archive_file.extract(filename, staging_area)
 631                return staging_area.joinpath(filename)
 632
 633    def write_csv_items_and_finish(self, data):
 634        """
 635        Write data as csv to results file and finish dataset
 636
 637        Determines result file path using dataset's path determination helper
 638        methods. After writing results, the dataset is marked finished. Will
 639        raise a ProcessorInterruptedException if the interrupted flag for this
 640        processor is set while iterating.
 641
 642        :param data: A list or tuple of dictionaries, all with the same keys
 643        """
 644        if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
 645            raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
 646
 647        if not data:
 648            raise ValueError("write_csv_items requires a dictionary with at least one item")
 649
 650        self.dataset.update_status("Writing results file")
 651        writer = False
 652        with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
 653            for row in data:
 654                if self.interrupted:
 655                    raise ProcessorInterruptedException("Interrupted while writing results file")
 656
 657                row = remove_nuls(row)
 658                if not writer:
 659                    writer = csv.DictWriter(results, fieldnames=row.keys())
 660                    writer.writeheader()
 661
 662                writer.writerow(row)
 663
 664        self.dataset.update_status("Finished")
 665        self.dataset.finish(len(data))
 666
 667    def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
 668        """
 669        Archive a bunch of files into a zip archive and finish processing
 670
 671        :param list|Path files: If a list, all files will be added to the
 672          archive and deleted afterwards. If a folder, all files in the folder
 673          will be added and the folder will be deleted afterwards.
 674        :param int num_items: Items in the dataset. If None, the amount of
 675          files added to the archive will be used.
 676        :param int compression:  Type of compression to use. By default, files
 677          are not compressed, to speed up unarchiving.
 678        :param bool finish:  Finish the dataset/job afterwards or not?
 679        """
 680        is_folder = False
 681        if issubclass(type(files), PurePath):
 682            is_folder = files
 683            if not files.exists() or not files.is_dir():
 684                raise RuntimeError("Folder %s is not a folder that can be archived" % files)
 685
 686            files = files.glob("*")
 687
 688        # create zip of archive and delete temporary files and folder
 689        self.dataset.update_status("Compressing results into archive")
 690        done = 0
 691        with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
 692            for output_path in files:
 693                zip.write(output_path, output_path.name)
 694                output_path.unlink()
 695                done += 1
 696
 697        # delete temporary folder
 698        if is_folder:
 699            shutil.rmtree(is_folder)
 700
 701        self.dataset.update_status("Finished")
 702        if num_items is None:
 703            num_items = done
 704
 705        if finish:
 706            self.dataset.finish(num_items)
 707
 708    def create_standalone(self):
 709        """
 710        Copy this dataset and make that copy standalone
 711
 712        This has the benefit of allowing for all analyses that can be run on
 713        full datasets on the new, filtered copy as well.
 714
 715        :return DataSet:  The new standalone dataset
 716        """
 717        top_parent = self.source_dataset
 718
 719        finished = self.dataset.check_dataset_finished()
 720        if finished == 'empty':
 721            # No data to process, so we can't create a standalone dataset
 722            return
 723        elif finished is None:
 724            # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
 725            pass
 726
 727        standalone = self.dataset.copy(shallow=False)
 728        standalone.body_match = "(Filtered) " + top_parent.query
 729        standalone.datasource = top_parent.parameters.get("datasource", "custom")
 730
 731        try:
 732            standalone.board = top_parent.board
 733        except AttributeError:
 734            standalone.board = self.type
 735
 736        standalone.type = top_parent.type
 737
 738        standalone.detach()
 739        standalone.delete_parameter("key_parent")
 740
 741        self.dataset.copied_to = standalone.key
 742
 743        # we don't need this file anymore - it has been copied to the new
 744        # standalone dataset, and this one is not accessible via the interface
 745        # except as a link to the copied standalone dataset
 746        os.unlink(self.dataset.get_results_path())
 747
 748        # Copy the log
 749        shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
 750
 751        return standalone
 752
 753    def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
 754        """
 755        Saves annotations made by this processor on the basis of another dataset.
 756        Also adds some data regarding this processor: set `author` and `label` to processor name,
 757        and add parameters to `metadata` (unless explicitly indicated).
 758
 759        :param annotations:				List of dictionaries with annotation items. Must have `item_id` and `value`.
 760                                        E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
 761        :param source_dataset:			The dataset that these annotations will be saved on. If None, will use the
 762                                        top parent.
 763        :param bool hide_in_explorer:	Whether this annotation is included in the Explorer. 'Hidden' annotations
 764                                        are still shown in `iterate_items()`).
 765
 766        :returns int:					How many annotations were saved.
 767
 768        """
 769
 770        if not annotations:
 771            return 0
 772
 773        # Default to parent dataset
 774        if not source_dataset:
 775            source_dataset = self.source_dataset.top_parent()
 776
 777        # Check if this dataset already has annotation fields, and if so, store some values to use per annotation.
 778        annotation_fields = source_dataset.annotation_fields
 779
 780        # Keep track of what fields we've already seen, so we don't need to hash every time.
 781        seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id
 782                       for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items}
 783
 784        # Loop through all annotations. This may be batched.
 785        for annotation in annotations:
 786
 787            # Keep track of what dataset generated this annotation
 788            annotation["from_dataset"] = self.dataset.key
 789            # Set the author to this processor's name
 790            if not annotation.get("author"):
 791                annotation["author"] = self.name
 792            if not annotation.get("author_original"):
 793                annotation["author_original"] = self.name
 794            annotation["by_processor"] = True
 795
 796            # Only use a default label if no custom one is given
 797            if not annotation.get("label"):
 798                annotation["label"] = self.name
 799
 800            # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet.
 801            # We need to do this within this loop because this function may be called in batches and with different
 802            # annotation types.
 803            if (annotation["from_dataset"], annotation["label"]) not in seen_fields:
 804                # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key.
 805                # This should create unique fields, even if there's multiple annotation types for one processor.
 806                field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"])
 807                seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id
 808                annotation_fields[field_id] = {
 809                    "label": annotation["label"],
 810                    "type": annotation["type"] if annotation.get("type") else "text",
 811                    "from_dataset": annotation["from_dataset"],
 812                    "hide_in_explorer": hide_in_explorer
 813                }
 814            else:
 815                # Else just get the field ID
 816                field_id = seen_fields[(annotation["from_dataset"], annotation["label"])]
 817
 818            # Add field ID to the annotation
 819            annotation["field_id"] = field_id
 820
 821        annotations_saved = source_dataset.save_annotations(annotations)
 822        source_dataset.save_annotation_fields(annotation_fields)
 823
 824        return annotations_saved
 825
 826    @classmethod
 827    def map_item_method_available(cls, dataset):
 828        """
 829        Check if this processor can use map_item
 830
 831        Checks if map_item method exists and is compatible with dataset. If
 832        dataset has a different extension than the default for this processor,
 833        or if the dataset has no extension, this means we cannot be sure the
 834        data is in the right format to be mapped, so `False` is returned in
 835        that case even if a map_item() method is available.
 836
 837        :param BasicProcessor processor:    The BasicProcessor subclass object
 838        with which to use map_item
 839        :param DataSet dataset:                The DataSet object with which to
 840        use map_item
 841        """
 842        # only run item mapper if extension of processor == extension of
 843        # data file, for the scenario where a csv file was uploaded and
 844        # converted to an ndjson-based data source, for example
 845        # todo: this is kind of ugly, and a better fix may be possible
 846        dataset_extension = dataset.get_extension()
 847        if not dataset_extension:
 848            # DataSet results file does not exist or has no extension, use expected extension
 849            if hasattr(dataset, "extension"):
 850                dataset_extension = dataset.extension
 851            else:
 852                # No known DataSet extension; cannot determine if map_item method compatible
 853                return False
 854
 855        return hasattr(cls, "map_item") and cls.extension == dataset_extension
 856
 857    @classmethod
 858    def get_mapped_item(cls, item):
 859        """
 860        Get the mapped item using a processors map_item method.
 861
 862        Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
 863        """
 864        try:
 865            mapped_item = cls.map_item(item)
 866        except (KeyError, IndexError) as e:
 867            raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
 868
 869        if not mapped_item:
 870            raise MapItemException("Unable to map item!")
 871
 872        return mapped_item
 873
 874    @classmethod
 875    def is_filter(cls):
 876        """
 877        Is this processor a filter?
 878
 879        Filters do not produce their own dataset but replace the source_dataset dataset
 880        instead.
 881
 882        :todo: Make this a bit more robust than sniffing the processor category
 883        :return bool:
 884        """
 885        return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
 886
 887    @classmethod
 888    def get_options(cls, parent_dataset=None, config=None):
 889        """
 890        Get processor options
 891
 892        This method by default returns the class's "options" attribute, or an
 893        empty dictionary. It can be redefined by processors that need more
 894        fine-grained options, e.g. in cases where the availability of options
 895        is partially determined by the parent dataset's parameters.
 896
 897        :param config:
 898        :param DataSet parent_dataset:  An object representing the dataset that
 899          the processor would be run on
 900        """
 901
 902        return cls.options if hasattr(cls, "options") else {}
 903
 904    @classmethod
 905    def get_status(cls):
 906        """
 907        Get processor status
 908
 909        :return list:    Statuses of this processor
 910        """
 911        return cls.status if hasattr(cls, "status") else None
 912
 913    @classmethod
 914    def is_top_dataset(cls):
 915        """
 916        Confirm this is *not* a top dataset, but a processor.
 917
 918        Used for processor compatibility checks.
 919
 920        :return bool:  Always `False`, because this is a processor.
 921        """
 922        return False
 923
 924    @classmethod
 925    def is_from_collector(cls):
 926        """
 927        Check if this processor is one that collects data, i.e. a search or
 928        import worker.
 929
 930        :return bool:
 931        """
 932        return cls.type.endswith("-search") or cls.type.endswith("-import")
 933
 934    @classmethod
 935    def get_extension(self, parent_dataset=None):
 936        """
 937        Return the extension of the processor's dataset
 938
 939        Used for processor compatibility checks.
 940
 941        :param DataSet parent_dataset:  An object representing the dataset that
 942          the processor would be run on
 943        :return str|None:  Dataset extension (without leading `.`) or `None`.
 944        """
 945        if self.is_filter():
 946            if parent_dataset is not None:
 947                # Filters should use the same extension as the parent dataset
 948                return parent_dataset.get_extension()
 949            else:
 950                # No dataset provided, unable to determine extension of parent dataset
 951                # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
 952                return None
 953        elif self.extension:
 954            # Use explicitly defined extension in class (Processor class defaults to "csv")
 955            return self.extension
 956        else:
 957            # A non filter processor updated the base Processor extension to None/False?
 958            return None
 959
 960    @classmethod
 961    def is_rankable(cls, multiple_items=True):
 962        """
 963        Used for processor compatibility
 964
 965        :param bool multiple_items:  Consider datasets with multiple items per
 966          item (e.g. word_1, word_2, etc)? Included for compatibility
 967        """
 968        return False
 969
 970    @classmethod
 971    def exclude_followup_processors(cls, processor_type=None):
 972        """
 973        Used for processor compatibility
 974
 975        To be defined by the child processor if it should exclude certain follow-up processors.
 976        e.g.:
 977
 978        def exclude_followup_processors(cls, processor_type):
 979            if processor_type in ["undesirable-followup-processor"]:
 980                return True
 981            return False
 982
 983        :param str processor_type:  Processor type to exclude
 984        :return bool:  True if processor should be excluded, False otherwise
 985        """
 986        return False
 987
 988    @abc.abstractmethod
 989    def process(self):
 990        """
 991        Process data
 992
 993        To be defined by the child processor.
 994        """
 995        pass
 996
 997    @staticmethod
 998    def is_4cat_processor():
 999        """
1000        Is this a 4CAT processor?
1001
1002        This is used to determine whether a class is a 4CAT
1003        processor.
1004
1005        :return:  True
1006        """
1007        return True
  29class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
  30    """
  31    Abstract processor class
  32
  33    A processor takes a finished dataset as input and processes its result in
  34    some way, with another dataset set as output. The input thus is a file, and
  35    the output (usually) as well. In other words, the result of a processor can
  36    be used as input for another processor (though whether and when this is
  37    useful is another question).
  38
  39    To determine whether a processor can process a given dataset, you can
  40    define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class
  41    method which takes a dataset as argument and returns a bool that determines
  42    if this processor is considered compatible with that dataset. For example:
  43
  44    .. code-block:: python
  45
  46        @classmethod
  47        def is_compatible_with(cls, module=None, config=None):
  48            return module.type == "linguistic-features"
  49
  50
  51    """
  52
  53    #: Database handler to interface with the 4CAT database
  54    db = None
  55
  56    #: Job object that requests the execution of this processor
  57    job = None
  58
  59    #: The dataset object that the processor is *creating*.
  60    dataset = None
  61
  62    #: Owner (username) of the dataset
  63    owner = None
  64
  65    #: The dataset object that the processor is *processing*.
  66    source_dataset = None
  67
  68    #: The file that is being processed
  69    source_file = None
  70
  71    #: Processor description, which will be displayed in the web interface
  72    description = "No description available"
  73
  74    #: Category identifier, used to group processors in the web interface
  75    category = "Other"
  76
  77    #: Extension of the file created by the processor
  78    extension = "csv"
  79
  80    #: 4CAT settings from the perspective of the dataset's owner
  81    config = None
  82
  83    #: Is this processor running 'within' a preset processor?
  84    is_running_in_preset = False
  85
  86    #: Is this processor hidden in the front-end, and only used internally/in presets?
  87    is_hidden = False
  88
  89    #: This will be defined automatically upon loading the processor. There is
  90    #: no need to override manually
  91    filepath = None
  92
  93    def work(self):
  94        """
  95        Process a dataset
  96
  97        Loads dataset metadata, sets up the scaffolding for performing some kind
  98        of processing on that dataset, and then processes it. Afterwards, clean
  99        up.
 100        """
 101        try:
 102            # a dataset can have multiple owners, but the creator is the user
 103            # that actually queued the processor, so their config is relevant
 104            self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
 105            self.owner = self.dataset.creator
 106        except DataSetException:
 107            # query has been deleted in the meantime. finish without error,
 108            # as deleting it will have been a conscious choice by a user
 109            self.job.finish()
 110            return
 111
 112        # set up config reader wrapping the worker's config manager, which is
 113        # in turn the one passed to it by the WorkerManager, which is the one
 114        # originally loaded in bootstrap
 115        self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))
 116
 117        if self.dataset.data.get("key_parent", None):
 118            # search workers never have parents (for now), so we don't need to
 119            # find out what the source_dataset dataset is if it's a search worker
 120            try:
 121                self.source_dataset = self.dataset.get_parent()
 122
 123                # for presets, transparently use the *top* dataset as a source_dataset
 124                # since that is where any underlying processors should get
 125                # their data from. However, this should only be done as long as the
 126                # preset is not finished yet, because after that there may be processors
 127                # that run on the final preset result
 128                while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
 129                    self.is_running_in_preset = True
 130                    self.source_dataset = self.source_dataset.get_parent()
 131                    if self.source_dataset is None:
 132                        # this means there is no dataset that is *not* a preset anywhere
 133                        # above this dataset. This should never occur, but if it does, we
 134                        # cannot continue
 135                        self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
 136                                       (self.type, self.dataset.key))
 137                        self.job.finish()
 138                        return
 139
 140            except DataSetException:
 141                # we need to know what the source_dataset dataset was to properly handle the
 142                # analysis
 143                self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
 144                    self.type, self.dataset.key))
 145                self.job.finish()
 146                return
 147
 148            if not self.source_dataset.is_finished() and not self.is_running_in_preset:
 149                # not finished yet - retry after a while
 150                # exception for presets, since these *should* be unfinished
 151                # until underlying processors are done
 152                self.job.release(delay=30)
 153                return
 154
 155            self.source_file = self.source_dataset.get_results_path()
 156            if not self.source_file.exists():
 157                self.dataset.update_status("Finished, no input data found.")
 158
 159        self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
 160
 161        processor_name = self.title if hasattr(self, "title") else self.type
 162        self.dataset.clear_log()
 163        self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
 164
 165        # start log file
 166        self.dataset.update_status("Processing data")
 167        self.dataset.update_version(get_software_commit(self))
 168
 169        # get parameters
 170        # if possible, fill defaults where parameters are not provided
 171        given_parameters = self.dataset.parameters.copy()
 172        all_parameters = self.get_options(self.dataset, config=self.config)
 173        self.parameters = {
 174            param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
 175            for param in [*all_parameters.keys(), *given_parameters.keys()]
 176        }
 177
 178        # now the parameters have been loaded into memory, clear any sensitive
 179        # ones. This has a side-effect that a processor may not run again
 180        # without starting from scratch, but this is the price of progress
 181        options = self.get_options(self.dataset.get_parent(), config=self.config)
 182        for option, option_settings in options.items():
 183            if option_settings.get("sensitive"):
 184                self.dataset.delete_parameter(option)
 185
 186        if self.interrupted:
 187            self.dataset.log("Processing interrupted, trying again later")
 188            return self.abort()
 189
 190        if not self.dataset.is_finished():
 191            try:
 192                self.process()
 193                self.after_process()
 194            except WorkerInterruptedException as e:
 195                self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
 196                self.abort()
 197            except Exception as e:
 198                self.dataset.log("Processor crashed (%s), trying again later" % str(e))
 199                stack = traceback.extract_tb(e.__traceback__)
 200                frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]]
 201                location = "->".join(frames)
 202
 203                # Not all datasets have source_dataset keys
 204                if len(self.dataset.get_genealogy()) > 1:
 205                    parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
 206                else:
 207                    parent_key = ""
 208
 209                print("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
 210                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)))
 211                # remove any result files that have been created so far
 212                self.remove_files()
 213
 214                raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
 215                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack)
 216        else:
 217            # dataset already finished, job shouldn't be open anymore
 218            self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (
 219            self.job.data["jobtype"], self.job.data["remote_id"]))
 220            self.job.finish()
 221
 222    def after_process(self):
 223        """
 224        Run after processing the dataset
 225
 226        This method cleans up temporary files, and if needed, handles logistics
 227        concerning the result file, e.g. running a pre-defined processor on the
 228        result, copying it to another dataset, and so on.
 229        """
 230        if self.dataset.data["num_rows"] > 0:
 231            self.dataset.update_status("Dataset completed.")
 232
 233        if not self.dataset.is_finished():
 234            self.dataset.finish()
 235
 236        self.dataset.remove_staging_areas()
 237
 238        # see if we have anything else lined up to run next
 239        for next in self.parameters.get("next", []):
 240            can_run_next = True
 241            next_parameters = next.get("parameters", {})
 242            next_type = next.get("type", "")
 243            try:
 244                available_processors = self.dataset.get_available_processors(config=self.config)
 245            except ValueError:
 246                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
 247                break
 248
 249        # see if we have anything else lined up to run next
 250        for next in self.parameters.get("next", []):
 251            can_run_next = True
 252            next_parameters = next.get("parameters", {})
 253            next_type = next.get("type", "")
 254            try:
 255                available_processors = self.dataset.get_available_processors(config=self.config)
 256            except ValueError:
 257                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
 258                break
 259
 260
 261            # run it only if the post-processor is actually available for this query
 262            if self.dataset.data["num_rows"] <= 0:
 263                can_run_next = False
 264                self.log.info(
 265                    "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (
 266                        next_type, self.dataset.key))
 267
 268            elif next_type in available_processors:
 269                next_analysis = DataSet(
 270                    parameters=next_parameters,
 271                    type=next_type,
 272                    db=self.db,
 273                    parent=self.dataset.key,
 274                    extension=available_processors[next_type].extension,
 275                    is_private=self.dataset.is_private,
 276                    owner=self.dataset.creator,
 277                    modules=self.modules
 278                )
 279                # copy ownership from parent dataset
 280                next_analysis.copy_ownership_from(self.dataset)
 281                # add to queue
 282                self.queue.add_job(next_type, remote_id=next_analysis.key)
 283            else:
 284                can_run_next = False
 285                self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (
 286                self.dataset.key, self.type, next_type))
 287
 288            if not can_run_next:
 289                # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
 290                # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
 291                if "attach_to" in self.parameters:
 292                    # Probably should not happen, but for some reason a mid processor has been designated as the processor
 293                    # the parent should attach to
 294                    pass
 295                else:
 296                    # Check for "attach_to" parameter in descendents
 297                    while True:
 298                        if "attach_to" in next_parameters:
 299                            self.parameters["attach_to"] = next_parameters["attach_to"]
 300                            break
 301                        else:
 302                            if "next" in next_parameters:
 303                                next_parameters = next_parameters["next"][0]["parameters"]
 304                            else:
 305                                # No more descendents
 306                                # Should not happen; we cannot find the source dataset
 307                                self.log.warning(
 308                                    "Cannot find preset's source dataset for dataset %s" % self.dataset.key)
 309                                break
 310
 311        # see if we need to register the result somewhere
 312        if "copy_to" in self.parameters:
 313            # copy the results to an arbitrary place that was passed
 314            if self.dataset.get_results_path().exists():
 315                shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
 316            else:
 317                # if copy_to was passed, that means it's important that this
 318                # file exists somewhere, so we create it as an empty file
 319                with open(self.parameters["copy_to"], "w") as empty_file:
 320                    empty_file.write("")
 321
 322        # see if this query chain is to be attached to another query
 323        # if so, the full genealogy of this query (minus the original dataset)
 324        # is attached to the given query - this is mostly useful for presets,
 325        # where a chain of processors can be marked as 'underlying' a preset
 326        if "attach_to" in self.parameters:
 327            try:
 328                # copy metadata and results to the surrogate
 329                surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules)
 330
 331                if self.dataset.get_results_path().exists():
 332                    # Update the surrogate's results file suffix to match this dataset's suffix
 333                    surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(
 334                        self.dataset.get_results_path().suffix)
 335                    shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
 336
 337                try:
 338                    surrogate.finish(self.dataset.data["num_rows"])
 339                except RuntimeError:
 340                    # already finished, could happen (though it shouldn't)
 341                    pass
 342
 343                surrogate.update_status(self.dataset.get_status())
 344
 345            except ValueError:
 346                # dataset with key to attach to doesn't exist...
 347                self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % (
 348                    self.dataset.key, self.parameters["attach_to"]))
 349
 350        self.job.finish()
 351
 352        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 353            owner = self.dataset.get_parameters().get("email-complete", False)
 354            # Check that username is email address
 355            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 356                from email.mime.multipart import MIMEMultipart
 357                from email.mime.text import MIMEText
 358                from smtplib import SMTPException
 359                import socket
 360                import html2text
 361
 362        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
 363            owner = self.dataset.get_parameters().get("email-complete", False)
 364            # Check that username is email address
 365            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
 366                from email.mime.multipart import MIMEMultipart
 367                from email.mime.text import MIMEText
 368                from smtplib import SMTPException
 369                import socket
 370                import html2text
 371
 372                self.log.debug("Sending email to %s" % owner)
 373                dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
 374                sender = self.config.get('mail.noreply')
 375                message = MIMEMultipart("alternative")
 376                message["From"] = sender
 377                message["To"] = owner
 378                message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
 379                mail = """
 380                    <p>Hello %s,</p>
 381                    <p>4CAT has finished collecting your %s dataset labeled: %s</p>
 382                    <p>You can view your dataset via the following link:</p>
 383                    <p><a href="%s">%s</a></p> 
 384                    <p>Sincerely,</p>
 385                    <p>Your friendly neighborhood 4CAT admin</p>
 386                    """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
 387                html_parser = html2text.HTML2Text()
 388                message.attach(MIMEText(html_parser.handle(mail), "plain"))
 389                message.attach(MIMEText(mail, "html"))
 390                try:
 391                    send_email([owner], message, self.config)
 392                except (SMTPException, ConnectionRefusedError, socket.timeout):
 393                    self.log.error("Error sending email to %s" % owner)
 394
 395    def remove_files(self):
 396        """
 397        Clean up result files and any staging files for processor to be attempted
 398        later if desired.
 399        """
 400        # Remove the results file that was created
 401        if self.dataset.get_results_path().exists():
 402            self.dataset.get_results_path().unlink()
 403        if self.dataset.get_results_folder_path().exists():
 404            shutil.rmtree(self.dataset.get_results_folder_path())
 405
 406        # Remove any staging areas with temporary data
 407        self.dataset.remove_staging_areas()
 408
 409    def abort(self):
 410        """
 411        Abort dataset creation and clean up so it may be attempted again later
 412        """
 413
 414        # delete annotations that have been generated as part of this processor
 415        self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True)
 416        # remove any result files that have been created so far
 417        self.remove_files()
 418
 419        # we release instead of finish, since interrupting is just that - the
 420        # job should resume at a later point. Delay resuming by 10 seconds to
 421        # give 4CAT the time to do whatever it wants (though usually this isn't
 422        # needed since restarting also stops the spawning of new workers)
 423        if self.interrupted == self.INTERRUPT_RETRY:
 424            # retry later - wait at least 10 seconds to give the backend time to shut down
 425            self.job.release(delay=10)
 426        elif self.interrupted == self.INTERRUPT_CANCEL:
 427            # cancel job
 428            self.job.finish()
 429
 430    def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
 431        """
 432        Request an iterable of URLs and return results
 433
 434        This method takes an iterable yielding URLs and yields the result for
 435        a GET request for that URL in return. This is done through the worker
 436        manager's DelegatedRequestHandler, which can run multiple requests in
 437        parallel and divide them over the proxies configured in 4CAT (if any).
 438        Proxy cooloff and queueing is shared with other processors, so that a
 439        processor will never accidentally request from the same site as another
 440        processor, potentially triggering rate limits.
 441
 442        :param urls:  Something that can be iterated over and yields URLs
 443        :param kwargs:  Other keyword arguments are passed on to `add_urls`
 444        and eventually to `requests.get()`.
 445        :param bool preserve_order:  Return items in the original order. Use
 446        `False` to potentially speed up processing, if order is not important.
 447        :return:  A generator yielding request results, i.e. tuples of a
 448        URL and a `requests` response objects
 449        """
 450        queue_name = self._proxy_queue_name()
 451        delegator = self.manager.proxy_delegator
 452
 453        delegator.refresh_settings(self.config)
 454
 455        # 50 is an arbitrary batch size - but we top up every 0.05s, so
 456        # that should be sufficient
 457        batch_size = 50
 458
 459        # we need an iterable, so we can use next() and StopIteration
 460        urls = iter(urls)
 461
 462        have_urls = True
 463        while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls:
 464            if queue_length < batch_size and have_urls:
 465                batch = []
 466                while len(batch) < (batch_size - queue_length):
 467                    try:
 468                        batch.append(next(urls))
 469                    except StopIteration:
 470                        have_urls = False
 471                        break
 472
 473                delegator.add_urls(batch, queue_name, **kwargs)
 474
 475            time.sleep(0.05)  # arbitrary...
 476            for url, result in delegator.get_results(queue_name, preserve_order=preserve_order):
 477                # result may also be a FailedProxiedRequest!
 478                # up to the processor to decide how to deal with it
 479                yield url, result
 480
 481    def push_proxied_request(self, url, position=-1, **kwargs):
 482        """
 483        Add a single URL to the proxied requests queue
 484
 485        :param str url:  URL to add
 486        :param position:  Position to add to queue; can be used to add priority
 487        requests, adds to end of queue by default
 488        :param kwargs:
 489        """
 490        self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)
 491
 492    def flush_proxied_requests(self):
 493        """
 494        Get rid of remaining proxied requests
 495
 496        Can be used if enough results are available and any remaining ones need
 497        to be stopped ASAP and are otherwise unneeded.
 498
 499        Blocking!
 500        """
 501        self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())
 502
 503    def _proxy_queue_name(self):
 504        """
 505        Get proxy queue name
 506
 507        For internal use.
 508
 509        :return str:
 510        """
 511        return f"{self.type}-{self.dataset.key}"
 512
 513    def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
 514        """
 515        A generator that iterates through files in an archive
 516
 517        With every iteration, the processor's 'interrupted' flag is checked,
 518        and if set a ProcessorInterruptedException is raised, which by default
 519        is caught and subsequently stops execution gracefully.
 520
 521        Files are temporarily unzipped and deleted after use.
 522
 523        :param Path path:     Path to zip file to read
 524        :param Path staging_area:  Where to store the files while they're
 525          being worked with. If omitted, a temporary folder is created and
 526          deleted after use
 527        :param bool immediately_delete:  Temporary files are removed after yielded;
 528          False keeps files until the staging_area is removed (usually during processor
 529          cleanup)
 530        :param list filename_filter:  Whitelist of filenames to iterate.
 531        Other files will be ignored. If empty, do not ignore anything.
 532        :return:  An iterator with a Path item for each file
 533        """
 534
 535        if not path.exists():
 536            return
 537
 538        if not staging_area:
 539            staging_area = self.dataset.get_staging_area()
 540
 541        if not staging_area.exists() or not staging_area.is_dir():
 542            raise RuntimeError("Staging area %s is not a valid folder")
 543
 544        with zipfile.ZipFile(path, "r") as archive_file:
 545            archive_contents = sorted(archive_file.namelist())
 546
 547            for archived_file in archive_contents:
 548                if filename_filter and archived_file not in filename_filter:
 549                    continue
 550
 551                info = archive_file.getinfo(archived_file)
 552                if info.is_dir():
 553                    continue
 554
 555                if self.interrupted:
 556                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
 557
 558                temp_file = staging_area.joinpath(archived_file)
 559                archive_file.extract(archived_file, staging_area)
 560
 561                yield temp_file
 562                if immediately_delete:
 563                    temp_file.unlink()
 564
 565    def unpack_archive_contents(self, path, staging_area=None):
 566        """
 567        Unpack all files in an archive to a staging area
 568
 569        With every iteration, the processor's 'interrupted' flag is checked,
 570        and if set a ProcessorInterruptedException is raised, which by default
 571        is caught and subsequently stops execution gracefully.
 572
 573        Files are unzipped to a staging area. The staging area is *not*
 574        cleaned up automatically.
 575
 576        :param Path path:     Path to zip file to read
 577        :param Path staging_area:  Where to store the files while they're
 578          being worked with. If omitted, a temporary folder is created and
 579          deleted after use
 580        :param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
 581        :return Path:  A path to the staging area
 582        """
 583
 584        if not path.exists():
 585            return
 586
 587        if not staging_area:
 588            staging_area = self.dataset.get_staging_area()
 589
 590        if not staging_area.exists() or not staging_area.is_dir():
 591            raise RuntimeError("Staging area %s is not a valid folder")
 592
 593        paths = []
 594        with zipfile.ZipFile(path, "r") as archive_file:
 595            archive_contents = sorted(archive_file.namelist())
 596
 597            for archived_file in archive_contents:
 598                if self.interrupted:
 599                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
 600
 601                file_name = archived_file.split("/")[-1]
 602                temp_file = staging_area.joinpath(file_name)
 603                archive_file.extract(archived_file, staging_area)
 604                paths.append(temp_file)
 605
 606        return staging_area
 607
 608    def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
 609        """
 610        Extract a file from an archive by name
 611
 612        :param str filename:  Name of file to extract
 613        :param Path archive_path:  Path to zip file to read
 614        :param Path staging_area:  Where to store the files while they're
 615                  being worked with. If omitted, a temporary folder is created
 616        :return Path:  A path to the extracted file
 617        """
 618        if not archive_path.exists():
 619            return
 620
 621        if not staging_area:
 622            staging_area = self.dataset.get_staging_area()
 623
 624        if not staging_area.exists() or not staging_area.is_dir():
 625            raise RuntimeError("Staging area %s is not a valid folder")
 626
 627        with zipfile.ZipFile(archive_path, "r") as archive_file:
 628            if filename not in archive_file.namelist():
 629                raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
 630            else:
 631                archive_file.extract(filename, staging_area)
 632                return staging_area.joinpath(filename)
 633
 634    def write_csv_items_and_finish(self, data):
 635        """
 636        Write data as csv to results file and finish dataset
 637
 638        Determines result file path using dataset's path determination helper
 639        methods. After writing results, the dataset is marked finished. Will
 640        raise a ProcessorInterruptedException if the interrupted flag for this
 641        processor is set while iterating.
 642
 643        :param data: A list or tuple of dictionaries, all with the same keys
 644        """
 645        if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
 646            raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
 647
 648        if not data:
 649            raise ValueError("write_csv_items requires a dictionary with at least one item")
 650
 651        self.dataset.update_status("Writing results file")
 652        writer = False
 653        with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
 654            for row in data:
 655                if self.interrupted:
 656                    raise ProcessorInterruptedException("Interrupted while writing results file")
 657
 658                row = remove_nuls(row)
 659                if not writer:
 660                    writer = csv.DictWriter(results, fieldnames=row.keys())
 661                    writer.writeheader()
 662
 663                writer.writerow(row)
 664
 665        self.dataset.update_status("Finished")
 666        self.dataset.finish(len(data))
 667
 668    def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
 669        """
 670        Archive a bunch of files into a zip archive and finish processing
 671
 672        :param list|Path files: If a list, all files will be added to the
 673          archive and deleted afterwards. If a folder, all files in the folder
 674          will be added and the folder will be deleted afterwards.
 675        :param int num_items: Items in the dataset. If None, the amount of
 676          files added to the archive will be used.
 677        :param int compression:  Type of compression to use. By default, files
 678          are not compressed, to speed up unarchiving.
 679        :param bool finish:  Finish the dataset/job afterwards or not?
 680        """
 681        is_folder = False
 682        if issubclass(type(files), PurePath):
 683            is_folder = files
 684            if not files.exists() or not files.is_dir():
 685                raise RuntimeError("Folder %s is not a folder that can be archived" % files)
 686
 687            files = files.glob("*")
 688
 689        # create zip of archive and delete temporary files and folder
 690        self.dataset.update_status("Compressing results into archive")
 691        done = 0
 692        with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
 693            for output_path in files:
 694                zip.write(output_path, output_path.name)
 695                output_path.unlink()
 696                done += 1
 697
 698        # delete temporary folder
 699        if is_folder:
 700            shutil.rmtree(is_folder)
 701
 702        self.dataset.update_status("Finished")
 703        if num_items is None:
 704            num_items = done
 705
 706        if finish:
 707            self.dataset.finish(num_items)
 708
 709    def create_standalone(self):
 710        """
 711        Copy this dataset and make that copy standalone
 712
 713        This has the benefit of allowing for all analyses that can be run on
 714        full datasets on the new, filtered copy as well.
 715
 716        :return DataSet:  The new standalone dataset
 717        """
 718        top_parent = self.source_dataset
 719
 720        finished = self.dataset.check_dataset_finished()
 721        if finished == 'empty':
 722            # No data to process, so we can't create a standalone dataset
 723            return
 724        elif finished is None:
 725            # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
 726            pass
 727
 728        standalone = self.dataset.copy(shallow=False)
 729        standalone.body_match = "(Filtered) " + top_parent.query
 730        standalone.datasource = top_parent.parameters.get("datasource", "custom")
 731
 732        try:
 733            standalone.board = top_parent.board
 734        except AttributeError:
 735            standalone.board = self.type
 736
 737        standalone.type = top_parent.type
 738
 739        standalone.detach()
 740        standalone.delete_parameter("key_parent")
 741
 742        self.dataset.copied_to = standalone.key
 743
 744        # we don't need this file anymore - it has been copied to the new
 745        # standalone dataset, and this one is not accessible via the interface
 746        # except as a link to the copied standalone dataset
 747        os.unlink(self.dataset.get_results_path())
 748
 749        # Copy the log
 750        shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
 751
 752        return standalone
 753
 754    def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
 755        """
 756        Saves annotations made by this processor on the basis of another dataset.
 757        Also adds some data regarding this processor: set `author` and `label` to processor name,
 758        and add parameters to `metadata` (unless explicitly indicated).
 759
 760        :param annotations:				List of dictionaries with annotation items. Must have `item_id` and `value`.
 761                                        E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
 762        :param source_dataset:			The dataset that these annotations will be saved on. If None, will use the
 763                                        top parent.
 764        :param bool hide_in_explorer:	Whether this annotation is included in the Explorer. 'Hidden' annotations
 765                                        are still shown in `iterate_items()`).
 766
 767        :returns int:					How many annotations were saved.
 768
 769        """
 770
 771        if not annotations:
 772            return 0
 773
 774        # Default to parent dataset
 775        if not source_dataset:
 776            source_dataset = self.source_dataset.top_parent()
 777
 778        # Check if this dataset already has annotation fields, and if so, store some values to use per annotation.
 779        annotation_fields = source_dataset.annotation_fields
 780
 781        # Keep track of what fields we've already seen, so we don't need to hash every time.
 782        seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id
 783                       for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items}
 784
 785        # Loop through all annotations. This may be batched.
 786        for annotation in annotations:
 787
 788            # Keep track of what dataset generated this annotation
 789            annotation["from_dataset"] = self.dataset.key
 790            # Set the author to this processor's name
 791            if not annotation.get("author"):
 792                annotation["author"] = self.name
 793            if not annotation.get("author_original"):
 794                annotation["author_original"] = self.name
 795            annotation["by_processor"] = True
 796
 797            # Only use a default label if no custom one is given
 798            if not annotation.get("label"):
 799                annotation["label"] = self.name
 800
 801            # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet.
 802            # We need to do this within this loop because this function may be called in batches and with different
 803            # annotation types.
 804            if (annotation["from_dataset"], annotation["label"]) not in seen_fields:
 805                # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key.
 806                # This should create unique fields, even if there's multiple annotation types for one processor.
 807                field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"])
 808                seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id
 809                annotation_fields[field_id] = {
 810                    "label": annotation["label"],
 811                    "type": annotation["type"] if annotation.get("type") else "text",
 812                    "from_dataset": annotation["from_dataset"],
 813                    "hide_in_explorer": hide_in_explorer
 814                }
 815            else:
 816                # Else just get the field ID
 817                field_id = seen_fields[(annotation["from_dataset"], annotation["label"])]
 818
 819            # Add field ID to the annotation
 820            annotation["field_id"] = field_id
 821
 822        annotations_saved = source_dataset.save_annotations(annotations)
 823        source_dataset.save_annotation_fields(annotation_fields)
 824
 825        return annotations_saved
 826
 827    @classmethod
 828    def map_item_method_available(cls, dataset):
 829        """
 830        Check if this processor can use map_item
 831
 832        Checks if map_item method exists and is compatible with dataset. If
 833        dataset has a different extension than the default for this processor,
 834        or if the dataset has no extension, this means we cannot be sure the
 835        data is in the right format to be mapped, so `False` is returned in
 836        that case even if a map_item() method is available.
 837
 838        :param BasicProcessor processor:    The BasicProcessor subclass object
 839        with which to use map_item
 840        :param DataSet dataset:                The DataSet object with which to
 841        use map_item
 842        """
 843        # only run item mapper if extension of processor == extension of
 844        # data file, for the scenario where a csv file was uploaded and
 845        # converted to an ndjson-based data source, for example
 846        # todo: this is kind of ugly, and a better fix may be possible
 847        dataset_extension = dataset.get_extension()
 848        if not dataset_extension:
 849            # DataSet results file does not exist or has no extension, use expected extension
 850            if hasattr(dataset, "extension"):
 851                dataset_extension = dataset.extension
 852            else:
 853                # No known DataSet extension; cannot determine if map_item method compatible
 854                return False
 855
 856        return hasattr(cls, "map_item") and cls.extension == dataset_extension
 857
 858    @classmethod
 859    def get_mapped_item(cls, item):
 860        """
 861        Get the mapped item using a processors map_item method.
 862
 863        Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
 864        """
 865        try:
 866            mapped_item = cls.map_item(item)
 867        except (KeyError, IndexError) as e:
 868            raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
 869
 870        if not mapped_item:
 871            raise MapItemException("Unable to map item!")
 872
 873        return mapped_item
 874
 875    @classmethod
 876    def is_filter(cls):
 877        """
 878        Is this processor a filter?
 879
 880        Filters do not produce their own dataset but replace the source_dataset dataset
 881        instead.
 882
 883        :todo: Make this a bit more robust than sniffing the processor category
 884        :return bool:
 885        """
 886        return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
 887
 888    @classmethod
 889    def get_options(cls, parent_dataset=None, config=None):
 890        """
 891        Get processor options
 892
 893        This method by default returns the class's "options" attribute, or an
 894        empty dictionary. It can be redefined by processors that need more
 895        fine-grained options, e.g. in cases where the availability of options
 896        is partially determined by the parent dataset's parameters.
 897
 898        :param config:
 899        :param DataSet parent_dataset:  An object representing the dataset that
 900          the processor would be run on
 901        """
 902
 903        return cls.options if hasattr(cls, "options") else {}
 904
 905    @classmethod
 906    def get_status(cls):
 907        """
 908        Get processor status
 909
 910        :return list:    Statuses of this processor
 911        """
 912        return cls.status if hasattr(cls, "status") else None
 913
 914    @classmethod
 915    def is_top_dataset(cls):
 916        """
 917        Confirm this is *not* a top dataset, but a processor.
 918
 919        Used for processor compatibility checks.
 920
 921        :return bool:  Always `False`, because this is a processor.
 922        """
 923        return False
 924
 925    @classmethod
 926    def is_from_collector(cls):
 927        """
 928        Check if this processor is one that collects data, i.e. a search or
 929        import worker.
 930
 931        :return bool:
 932        """
 933        return cls.type.endswith("-search") or cls.type.endswith("-import")
 934
 935    @classmethod
 936    def get_extension(self, parent_dataset=None):
 937        """
 938        Return the extension of the processor's dataset
 939
 940        Used for processor compatibility checks.
 941
 942        :param DataSet parent_dataset:  An object representing the dataset that
 943          the processor would be run on
 944        :return str|None:  Dataset extension (without leading `.`) or `None`.
 945        """
 946        if self.is_filter():
 947            if parent_dataset is not None:
 948                # Filters should use the same extension as the parent dataset
 949                return parent_dataset.get_extension()
 950            else:
 951                # No dataset provided, unable to determine extension of parent dataset
 952                # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
 953                return None
 954        elif self.extension:
 955            # Use explicitly defined extension in class (Processor class defaults to "csv")
 956            return self.extension
 957        else:
 958            # A non filter processor updated the base Processor extension to None/False?
 959            return None
 960
 961    @classmethod
 962    def is_rankable(cls, multiple_items=True):
 963        """
 964        Used for processor compatibility
 965
 966        :param bool multiple_items:  Consider datasets with multiple items per
 967          item (e.g. word_1, word_2, etc)? Included for compatibility
 968        """
 969        return False
 970
 971    @classmethod
 972    def exclude_followup_processors(cls, processor_type=None):
 973        """
 974        Used for processor compatibility
 975
 976        To be defined by the child processor if it should exclude certain follow-up processors.
 977        e.g.:
 978
 979        def exclude_followup_processors(cls, processor_type):
 980            if processor_type in ["undesirable-followup-processor"]:
 981                return True
 982            return False
 983
 984        :param str processor_type:  Processor type to exclude
 985        :return bool:  True if processor should be excluded, False otherwise
 986        """
 987        return False
 988
 989    @abc.abstractmethod
 990    def process(self):
 991        """
 992        Process data
 993
 994        To be defined by the child processor.
 995        """
 996        pass
 997
 998    @staticmethod
 999    def is_4cat_processor():
1000        """
1001        Is this a 4CAT processor?
1002
1003        This is used to determine whether a class is a 4CAT
1004        processor.
1005
1006        :return:  True
1007        """
1008        return True

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:

@classmethod
def is_compatible_with(cls, module=None, config=None):
    return module.type == "linguistic-features"
db = None
job = None
dataset = None
owner = None
source_dataset = None
source_file = None
description = 'No description available'
category = 'Other'
extension = 'csv'
config = None
is_running_in_preset = False
is_hidden = False
filepath = None
def work(self):
 93    def work(self):
 94        """
 95        Process a dataset
 96
 97        Loads dataset metadata, sets up the scaffolding for performing some kind
 98        of processing on that dataset, and then processes it. Afterwards, clean
 99        up.
100        """
101        try:
102            # a dataset can have multiple owners, but the creator is the user
103            # that actually queued the processor, so their config is relevant
104            self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
105            self.owner = self.dataset.creator
106        except DataSetException:
107            # query has been deleted in the meantime. finish without error,
108            # as deleting it will have been a conscious choice by a user
109            self.job.finish()
110            return
111
112        # set up config reader wrapping the worker's config manager, which is
113        # in turn the one passed to it by the WorkerManager, which is the one
114        # originally loaded in bootstrap
115        self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner))
116
117        if self.dataset.data.get("key_parent", None):
118            # search workers never have parents (for now), so we don't need to
119            # find out what the source_dataset dataset is if it's a search worker
120            try:
121                self.source_dataset = self.dataset.get_parent()
122
123                # for presets, transparently use the *top* dataset as a source_dataset
124                # since that is where any underlying processors should get
125                # their data from. However, this should only be done as long as the
126                # preset is not finished yet, because after that there may be processors
127                # that run on the final preset result
128                while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
129                    self.is_running_in_preset = True
130                    self.source_dataset = self.source_dataset.get_parent()
131                    if self.source_dataset is None:
132                        # this means there is no dataset that is *not* a preset anywhere
133                        # above this dataset. This should never occur, but if it does, we
134                        # cannot continue
135                        self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
136                                       (self.type, self.dataset.key))
137                        self.job.finish()
138                        return
139
140            except DataSetException:
141                # we need to know what the source_dataset dataset was to properly handle the
142                # analysis
143                self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
144                    self.type, self.dataset.key))
145                self.job.finish()
146                return
147
148            if not self.source_dataset.is_finished() and not self.is_running_in_preset:
149                # not finished yet - retry after a while
150                # exception for presets, since these *should* be unfinished
151                # until underlying processors are done
152                self.job.release(delay=30)
153                return
154
155            self.source_file = self.source_dataset.get_results_path()
156            if not self.source_file.exists():
157                self.dataset.update_status("Finished, no input data found.")
158
159        self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
160
161        processor_name = self.title if hasattr(self, "title") else self.type
162        self.dataset.clear_log()
163        self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
164
165        # start log file
166        self.dataset.update_status("Processing data")
167        self.dataset.update_version(get_software_commit(self))
168
169        # get parameters
170        # if possible, fill defaults where parameters are not provided
171        given_parameters = self.dataset.parameters.copy()
172        all_parameters = self.get_options(self.dataset, config=self.config)
173        self.parameters = {
174            param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
175            for param in [*all_parameters.keys(), *given_parameters.keys()]
176        }
177
178        # now the parameters have been loaded into memory, clear any sensitive
179        # ones. This has a side-effect that a processor may not run again
180        # without starting from scratch, but this is the price of progress
181        options = self.get_options(self.dataset.get_parent(), config=self.config)
182        for option, option_settings in options.items():
183            if option_settings.get("sensitive"):
184                self.dataset.delete_parameter(option)
185
186        if self.interrupted:
187            self.dataset.log("Processing interrupted, trying again later")
188            return self.abort()
189
190        if not self.dataset.is_finished():
191            try:
192                self.process()
193                self.after_process()
194            except WorkerInterruptedException as e:
195                self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
196                self.abort()
197            except Exception as e:
198                self.dataset.log("Processor crashed (%s), trying again later" % str(e))
199                stack = traceback.extract_tb(e.__traceback__)
200                frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]]
201                location = "->".join(frames)
202
203                # Not all datasets have source_dataset keys
204                if len(self.dataset.get_genealogy()) > 1:
205                    parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
206                else:
207                    parent_key = ""
208
209                print("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
210                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)))
211                # remove any result files that have been created so far
212                self.remove_files()
213
214                raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
215                    self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack)
216        else:
217            # dataset already finished, job shouldn't be open anymore
218            self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (
219            self.job.data["jobtype"], self.job.data["remote_id"]))
220            self.job.finish()

Process a dataset

Loads dataset metadata, sets up the scaffolding for performing some kind of processing on that dataset, and then processes it. Afterwards, clean up.

def after_process(self):
222    def after_process(self):
223        """
224        Run after processing the dataset
225
226        This method cleans up temporary files, and if needed, handles logistics
227        concerning the result file, e.g. running a pre-defined processor on the
228        result, copying it to another dataset, and so on.
229        """
230        if self.dataset.data["num_rows"] > 0:
231            self.dataset.update_status("Dataset completed.")
232
233        if not self.dataset.is_finished():
234            self.dataset.finish()
235
236        self.dataset.remove_staging_areas()
237
238        # see if we have anything else lined up to run next
239        for next in self.parameters.get("next", []):
240            can_run_next = True
241            next_parameters = next.get("parameters", {})
242            next_type = next.get("type", "")
243            try:
244                available_processors = self.dataset.get_available_processors(config=self.config)
245            except ValueError:
246                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
247                break
248
249        # see if we have anything else lined up to run next
250        for next in self.parameters.get("next", []):
251            can_run_next = True
252            next_parameters = next.get("parameters", {})
253            next_type = next.get("type", "")
254            try:
255                available_processors = self.dataset.get_available_processors(config=self.config)
256            except ValueError:
257                self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
258                break
259
260
261            # run it only if the post-processor is actually available for this query
262            if self.dataset.data["num_rows"] <= 0:
263                can_run_next = False
264                self.log.info(
265                    "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (
266                        next_type, self.dataset.key))
267
268            elif next_type in available_processors:
269                next_analysis = DataSet(
270                    parameters=next_parameters,
271                    type=next_type,
272                    db=self.db,
273                    parent=self.dataset.key,
274                    extension=available_processors[next_type].extension,
275                    is_private=self.dataset.is_private,
276                    owner=self.dataset.creator,
277                    modules=self.modules
278                )
279                # copy ownership from parent dataset
280                next_analysis.copy_ownership_from(self.dataset)
281                # add to queue
282                self.queue.add_job(next_type, remote_id=next_analysis.key)
283            else:
284                can_run_next = False
285                self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (
286                self.dataset.key, self.type, next_type))
287
288            if not can_run_next:
289                # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
290                # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
291                if "attach_to" in self.parameters:
292                    # Probably should not happen, but for some reason a mid processor has been designated as the processor
293                    # the parent should attach to
294                    pass
295                else:
296                    # Check for "attach_to" parameter in descendents
297                    while True:
298                        if "attach_to" in next_parameters:
299                            self.parameters["attach_to"] = next_parameters["attach_to"]
300                            break
301                        else:
302                            if "next" in next_parameters:
303                                next_parameters = next_parameters["next"][0]["parameters"]
304                            else:
305                                # No more descendents
306                                # Should not happen; we cannot find the source dataset
307                                self.log.warning(
308                                    "Cannot find preset's source dataset for dataset %s" % self.dataset.key)
309                                break
310
311        # see if we need to register the result somewhere
312        if "copy_to" in self.parameters:
313            # copy the results to an arbitrary place that was passed
314            if self.dataset.get_results_path().exists():
315                shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
316            else:
317                # if copy_to was passed, that means it's important that this
318                # file exists somewhere, so we create it as an empty file
319                with open(self.parameters["copy_to"], "w") as empty_file:
320                    empty_file.write("")
321
322        # see if this query chain is to be attached to another query
323        # if so, the full genealogy of this query (minus the original dataset)
324        # is attached to the given query - this is mostly useful for presets,
325        # where a chain of processors can be marked as 'underlying' a preset
326        if "attach_to" in self.parameters:
327            try:
328                # copy metadata and results to the surrogate
329                surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules)
330
331                if self.dataset.get_results_path().exists():
332                    # Update the surrogate's results file suffix to match this dataset's suffix
333                    surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(
334                        self.dataset.get_results_path().suffix)
335                    shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
336
337                try:
338                    surrogate.finish(self.dataset.data["num_rows"])
339                except RuntimeError:
340                    # already finished, could happen (though it shouldn't)
341                    pass
342
343                surrogate.update_status(self.dataset.get_status())
344
345            except ValueError:
346                # dataset with key to attach to doesn't exist...
347                self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % (
348                    self.dataset.key, self.parameters["attach_to"]))
349
350        self.job.finish()
351
352        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
353            owner = self.dataset.get_parameters().get("email-complete", False)
354            # Check that username is email address
355            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
356                from email.mime.multipart import MIMEMultipart
357                from email.mime.text import MIMEText
358                from smtplib import SMTPException
359                import socket
360                import html2text
361
362        if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
363            owner = self.dataset.get_parameters().get("email-complete", False)
364            # Check that username is email address
365            if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
366                from email.mime.multipart import MIMEMultipart
367                from email.mime.text import MIMEText
368                from smtplib import SMTPException
369                import socket
370                import html2text
371
372                self.log.debug("Sending email to %s" % owner)
373                dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key
374                sender = self.config.get('mail.noreply')
375                message = MIMEMultipart("alternative")
376                message["From"] = sender
377                message["To"] = owner
378                message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
379                mail = """
380                    <p>Hello %s,</p>
381                    <p>4CAT has finished collecting your %s dataset labeled: %s</p>
382                    <p>You can view your dataset via the following link:</p>
383                    <p><a href="%s">%s</a></p> 
384                    <p>Sincerely,</p>
385                    <p>Your friendly neighborhood 4CAT admin</p>
386                    """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
387                html_parser = html2text.HTML2Text()
388                message.attach(MIMEText(html_parser.handle(mail), "plain"))
389                message.attach(MIMEText(mail, "html"))
390                try:
391                    send_email([owner], message, self.config)
392                except (SMTPException, ConnectionRefusedError, socket.timeout):
393                    self.log.error("Error sending email to %s" % owner)

Run after processing the dataset

This method cleans up temporary files, and if needed, handles logistics concerning the result file, e.g. running a pre-defined processor on the result, copying it to another dataset, and so on.

def remove_files(self):
395    def remove_files(self):
396        """
397        Clean up result files and any staging files for processor to be attempted
398        later if desired.
399        """
400        # Remove the results file that was created
401        if self.dataset.get_results_path().exists():
402            self.dataset.get_results_path().unlink()
403        if self.dataset.get_results_folder_path().exists():
404            shutil.rmtree(self.dataset.get_results_folder_path())
405
406        # Remove any staging areas with temporary data
407        self.dataset.remove_staging_areas()

Clean up result files and any staging files for processor to be attempted later if desired.

def abort(self):
409    def abort(self):
410        """
411        Abort dataset creation and clean up so it may be attempted again later
412        """
413
414        # delete annotations that have been generated as part of this processor
415        self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True)
416        # remove any result files that have been created so far
417        self.remove_files()
418
419        # we release instead of finish, since interrupting is just that - the
420        # job should resume at a later point. Delay resuming by 10 seconds to
421        # give 4CAT the time to do whatever it wants (though usually this isn't
422        # needed since restarting also stops the spawning of new workers)
423        if self.interrupted == self.INTERRUPT_RETRY:
424            # retry later - wait at least 10 seconds to give the backend time to shut down
425            self.job.release(delay=10)
426        elif self.interrupted == self.INTERRUPT_CANCEL:
427            # cancel job
428            self.job.finish()

Abort dataset creation and clean up so it may be attempted again later

def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
430    def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs):
431        """
432        Request an iterable of URLs and return results
433
434        This method takes an iterable yielding URLs and yields the result for
435        a GET request for that URL in return. This is done through the worker
436        manager's DelegatedRequestHandler, which can run multiple requests in
437        parallel and divide them over the proxies configured in 4CAT (if any).
438        Proxy cooloff and queueing is shared with other processors, so that a
439        processor will never accidentally request from the same site as another
440        processor, potentially triggering rate limits.
441
442        :param urls:  Something that can be iterated over and yields URLs
443        :param kwargs:  Other keyword arguments are passed on to `add_urls`
444        and eventually to `requests.get()`.
445        :param bool preserve_order:  Return items in the original order. Use
446        `False` to potentially speed up processing, if order is not important.
447        :return:  A generator yielding request results, i.e. tuples of a
448        URL and a `requests` response objects
449        """
450        queue_name = self._proxy_queue_name()
451        delegator = self.manager.proxy_delegator
452
453        delegator.refresh_settings(self.config)
454
455        # 50 is an arbitrary batch size - but we top up every 0.05s, so
456        # that should be sufficient
457        batch_size = 50
458
459        # we need an iterable, so we can use next() and StopIteration
460        urls = iter(urls)
461
462        have_urls = True
463        while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls:
464            if queue_length < batch_size and have_urls:
465                batch = []
466                while len(batch) < (batch_size - queue_length):
467                    try:
468                        batch.append(next(urls))
469                    except StopIteration:
470                        have_urls = False
471                        break
472
473                delegator.add_urls(batch, queue_name, **kwargs)
474
475            time.sleep(0.05)  # arbitrary...
476            for url, result in delegator.get_results(queue_name, preserve_order=preserve_order):
477                # result may also be a FailedProxiedRequest!
478                # up to the processor to decide how to deal with it
479                yield url, result

Request an iterable of URLs and return results

This method takes an iterable yielding URLs and yields the result for a GET request for that URL in return. This is done through the worker manager's DelegatedRequestHandler, which can run multiple requests in parallel and divide them over the proxies configured in 4CAT (if any). Proxy cooloff and queueing is shared with other processors, so that a processor will never accidentally request from the same site as another processor, potentially triggering rate limits.

Parameters
  • urls: Something that can be iterated over and yields URLs
  • kwargs: Other keyword arguments are passed on to add_urls and eventually to requests.get().
  • bool preserve_order: Return items in the original order. Use False to potentially speed up processing, if order is not important.
Returns

A generator yielding request results, i.e. tuples of a URL and a requests response objects

def push_proxied_request(self, url, position=-1, **kwargs):
481    def push_proxied_request(self, url, position=-1, **kwargs):
482        """
483        Add a single URL to the proxied requests queue
484
485        :param str url:  URL to add
486        :param position:  Position to add to queue; can be used to add priority
487        requests, adds to end of queue by default
488        :param kwargs:
489        """
490        self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)

Add a single URL to the proxied requests queue

Parameters
  • str url: URL to add
  • position: Position to add to queue; can be used to add priority requests, adds to end of queue by default
  • kwargs:
def flush_proxied_requests(self):
492    def flush_proxied_requests(self):
493        """
494        Get rid of remaining proxied requests
495
496        Can be used if enough results are available and any remaining ones need
497        to be stopped ASAP and are otherwise unneeded.
498
499        Blocking!
500        """
501        self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())

Get rid of remaining proxied requests

Can be used if enough results are available and any remaining ones need to be stopped ASAP and are otherwise unneeded.

Blocking!

def iterate_archive_contents( self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
513    def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
514        """
515        A generator that iterates through files in an archive
516
517        With every iteration, the processor's 'interrupted' flag is checked,
518        and if set a ProcessorInterruptedException is raised, which by default
519        is caught and subsequently stops execution gracefully.
520
521        Files are temporarily unzipped and deleted after use.
522
523        :param Path path:     Path to zip file to read
524        :param Path staging_area:  Where to store the files while they're
525          being worked with. If omitted, a temporary folder is created and
526          deleted after use
527        :param bool immediately_delete:  Temporary files are removed after yielded;
528          False keeps files until the staging_area is removed (usually during processor
529          cleanup)
530        :param list filename_filter:  Whitelist of filenames to iterate.
531        Other files will be ignored. If empty, do not ignore anything.
532        :return:  An iterator with a Path item for each file
533        """
534
535        if not path.exists():
536            return
537
538        if not staging_area:
539            staging_area = self.dataset.get_staging_area()
540
541        if not staging_area.exists() or not staging_area.is_dir():
542            raise RuntimeError("Staging area %s is not a valid folder")
543
544        with zipfile.ZipFile(path, "r") as archive_file:
545            archive_contents = sorted(archive_file.namelist())
546
547            for archived_file in archive_contents:
548                if filename_filter and archived_file not in filename_filter:
549                    continue
550
551                info = archive_file.getinfo(archived_file)
552                if info.is_dir():
553                    continue
554
555                if self.interrupted:
556                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
557
558                temp_file = staging_area.joinpath(archived_file)
559                archive_file.extract(archived_file, staging_area)
560
561                yield temp_file
562                if immediately_delete:
563                    temp_file.unlink()

A generator that iterates through files in an archive

With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.

Files are temporarily unzipped and deleted after use.

Parameters
  • Path path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
  • bool immediately_delete: Temporary files are removed after yielded; False keeps files until the staging_area is removed (usually during processor cleanup)
  • list filename_filter: Whitelist of filenames to iterate. Other files will be ignored. If empty, do not ignore anything.
Returns

An iterator with a Path item for each file

def unpack_archive_contents(self, path, staging_area=None):
565    def unpack_archive_contents(self, path, staging_area=None):
566        """
567        Unpack all files in an archive to a staging area
568
569        With every iteration, the processor's 'interrupted' flag is checked,
570        and if set a ProcessorInterruptedException is raised, which by default
571        is caught and subsequently stops execution gracefully.
572
573        Files are unzipped to a staging area. The staging area is *not*
574        cleaned up automatically.
575
576        :param Path path:     Path to zip file to read
577        :param Path staging_area:  Where to store the files while they're
578          being worked with. If omitted, a temporary folder is created and
579          deleted after use
580        :param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
581        :return Path:  A path to the staging area
582        """
583
584        if not path.exists():
585            return
586
587        if not staging_area:
588            staging_area = self.dataset.get_staging_area()
589
590        if not staging_area.exists() or not staging_area.is_dir():
591            raise RuntimeError("Staging area %s is not a valid folder")
592
593        paths = []
594        with zipfile.ZipFile(path, "r") as archive_file:
595            archive_contents = sorted(archive_file.namelist())
596
597            for archived_file in archive_contents:
598                if self.interrupted:
599                    raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
600
601                file_name = archived_file.split("/")[-1]
602                temp_file = staging_area.joinpath(file_name)
603                archive_file.extract(archived_file, staging_area)
604                paths.append(temp_file)
605
606        return staging_area

Unpack all files in an archive to a staging area

With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.

Files are unzipped to a staging area. The staging area is not cleaned up automatically.

Parameters
  • Path path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
  • int max_number_files: Maximum number of files to unpack. If None, all files unpacked
Returns

A path to the staging area

def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
608    def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
609        """
610        Extract a file from an archive by name
611
612        :param str filename:  Name of file to extract
613        :param Path archive_path:  Path to zip file to read
614        :param Path staging_area:  Where to store the files while they're
615                  being worked with. If omitted, a temporary folder is created
616        :return Path:  A path to the extracted file
617        """
618        if not archive_path.exists():
619            return
620
621        if not staging_area:
622            staging_area = self.dataset.get_staging_area()
623
624        if not staging_area.exists() or not staging_area.is_dir():
625            raise RuntimeError("Staging area %s is not a valid folder")
626
627        with zipfile.ZipFile(archive_path, "r") as archive_file:
628            if filename not in archive_file.namelist():
629                raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
630            else:
631                archive_file.extract(filename, staging_area)
632                return staging_area.joinpath(filename)

Extract a file from an archive by name

Parameters
  • str filename: Name of file to extract
  • Path archive_path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created
Returns

A path to the extracted file

def write_csv_items_and_finish(self, data):
634    def write_csv_items_and_finish(self, data):
635        """
636        Write data as csv to results file and finish dataset
637
638        Determines result file path using dataset's path determination helper
639        methods. After writing results, the dataset is marked finished. Will
640        raise a ProcessorInterruptedException if the interrupted flag for this
641        processor is set while iterating.
642
643        :param data: A list or tuple of dictionaries, all with the same keys
644        """
645        if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
646            raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
647
648        if not data:
649            raise ValueError("write_csv_items requires a dictionary with at least one item")
650
651        self.dataset.update_status("Writing results file")
652        writer = False
653        with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
654            for row in data:
655                if self.interrupted:
656                    raise ProcessorInterruptedException("Interrupted while writing results file")
657
658                row = remove_nuls(row)
659                if not writer:
660                    writer = csv.DictWriter(results, fieldnames=row.keys())
661                    writer.writeheader()
662
663                writer.writerow(row)
664
665        self.dataset.update_status("Finished")
666        self.dataset.finish(len(data))

Write data as csv to results file and finish dataset

Determines result file path using dataset's path determination helper methods. After writing results, the dataset is marked finished. Will raise a ProcessorInterruptedException if the interrupted flag for this processor is set while iterating.

Parameters
  • data: A list or tuple of dictionaries, all with the same keys
def write_archive_and_finish(self, files, num_items=None, compression=0, finish=True):
668    def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
669        """
670        Archive a bunch of files into a zip archive and finish processing
671
672        :param list|Path files: If a list, all files will be added to the
673          archive and deleted afterwards. If a folder, all files in the folder
674          will be added and the folder will be deleted afterwards.
675        :param int num_items: Items in the dataset. If None, the amount of
676          files added to the archive will be used.
677        :param int compression:  Type of compression to use. By default, files
678          are not compressed, to speed up unarchiving.
679        :param bool finish:  Finish the dataset/job afterwards or not?
680        """
681        is_folder = False
682        if issubclass(type(files), PurePath):
683            is_folder = files
684            if not files.exists() or not files.is_dir():
685                raise RuntimeError("Folder %s is not a folder that can be archived" % files)
686
687            files = files.glob("*")
688
689        # create zip of archive and delete temporary files and folder
690        self.dataset.update_status("Compressing results into archive")
691        done = 0
692        with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
693            for output_path in files:
694                zip.write(output_path, output_path.name)
695                output_path.unlink()
696                done += 1
697
698        # delete temporary folder
699        if is_folder:
700            shutil.rmtree(is_folder)
701
702        self.dataset.update_status("Finished")
703        if num_items is None:
704            num_items = done
705
706        if finish:
707            self.dataset.finish(num_items)

Archive a bunch of files into a zip archive and finish processing

Parameters
  • list|Path files: If a list, all files will be added to the archive and deleted afterwards. If a folder, all files in the folder will be added and the folder will be deleted afterwards.
  • int num_items: Items in the dataset. If None, the amount of files added to the archive will be used.
  • int compression: Type of compression to use. By default, files are not compressed, to speed up unarchiving.
  • bool finish: Finish the dataset/job afterwards or not?
def create_standalone(self):
709    def create_standalone(self):
710        """
711        Copy this dataset and make that copy standalone
712
713        This has the benefit of allowing for all analyses that can be run on
714        full datasets on the new, filtered copy as well.
715
716        :return DataSet:  The new standalone dataset
717        """
718        top_parent = self.source_dataset
719
720        finished = self.dataset.check_dataset_finished()
721        if finished == 'empty':
722            # No data to process, so we can't create a standalone dataset
723            return
724        elif finished is None:
725            # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
726            pass
727
728        standalone = self.dataset.copy(shallow=False)
729        standalone.body_match = "(Filtered) " + top_parent.query
730        standalone.datasource = top_parent.parameters.get("datasource", "custom")
731
732        try:
733            standalone.board = top_parent.board
734        except AttributeError:
735            standalone.board = self.type
736
737        standalone.type = top_parent.type
738
739        standalone.detach()
740        standalone.delete_parameter("key_parent")
741
742        self.dataset.copied_to = standalone.key
743
744        # we don't need this file anymore - it has been copied to the new
745        # standalone dataset, and this one is not accessible via the interface
746        # except as a link to the copied standalone dataset
747        os.unlink(self.dataset.get_results_path())
748
749        # Copy the log
750        shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
751
752        return standalone

Copy this dataset and make that copy standalone

This has the benefit of allowing for all analyses that can be run on full datasets on the new, filtered copy as well.

Returns

The new standalone dataset

def save_annotations( self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
754    def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int:
755        """
756        Saves annotations made by this processor on the basis of another dataset.
757        Also adds some data regarding this processor: set `author` and `label` to processor name,
758        and add parameters to `metadata` (unless explicitly indicated).
759
760        :param annotations:				List of dictionaries with annotation items. Must have `item_id` and `value`.
761                                        E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
762        :param source_dataset:			The dataset that these annotations will be saved on. If None, will use the
763                                        top parent.
764        :param bool hide_in_explorer:	Whether this annotation is included in the Explorer. 'Hidden' annotations
765                                        are still shown in `iterate_items()`).
766
767        :returns int:					How many annotations were saved.
768
769        """
770
771        if not annotations:
772            return 0
773
774        # Default to parent dataset
775        if not source_dataset:
776            source_dataset = self.source_dataset.top_parent()
777
778        # Check if this dataset already has annotation fields, and if so, store some values to use per annotation.
779        annotation_fields = source_dataset.annotation_fields
780
781        # Keep track of what fields we've already seen, so we don't need to hash every time.
782        seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id
783                       for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items}
784
785        # Loop through all annotations. This may be batched.
786        for annotation in annotations:
787
788            # Keep track of what dataset generated this annotation
789            annotation["from_dataset"] = self.dataset.key
790            # Set the author to this processor's name
791            if not annotation.get("author"):
792                annotation["author"] = self.name
793            if not annotation.get("author_original"):
794                annotation["author_original"] = self.name
795            annotation["by_processor"] = True
796
797            # Only use a default label if no custom one is given
798            if not annotation.get("label"):
799                annotation["label"] = self.name
800
801            # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet.
802            # We need to do this within this loop because this function may be called in batches and with different
803            # annotation types.
804            if (annotation["from_dataset"], annotation["label"]) not in seen_fields:
805                # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key.
806                # This should create unique fields, even if there's multiple annotation types for one processor.
807                field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"])
808                seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id
809                annotation_fields[field_id] = {
810                    "label": annotation["label"],
811                    "type": annotation["type"] if annotation.get("type") else "text",
812                    "from_dataset": annotation["from_dataset"],
813                    "hide_in_explorer": hide_in_explorer
814                }
815            else:
816                # Else just get the field ID
817                field_id = seen_fields[(annotation["from_dataset"], annotation["label"])]
818
819            # Add field ID to the annotation
820            annotation["field_id"] = field_id
821
822        annotations_saved = source_dataset.save_annotations(annotations)
823        source_dataset.save_annotation_fields(annotation_fields)
824
825        return annotations_saved

Saves annotations made by this processor on the basis of another dataset. Also adds some data regarding this processor: set author and label to processor name, and add parameters to metadata (unless explicitly indicated).

Parameters
  • annotations: List of dictionaries with annotation items. Must have item_id and value. E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}]
  • source_dataset: The dataset that these annotations will be saved on. If None, will use the top parent.
  • bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations are still shown in iterate_items()).

:returns int: How many annotations were saved.

@classmethod
def map_item_method_available(cls, dataset):
827    @classmethod
828    def map_item_method_available(cls, dataset):
829        """
830        Check if this processor can use map_item
831
832        Checks if map_item method exists and is compatible with dataset. If
833        dataset has a different extension than the default for this processor,
834        or if the dataset has no extension, this means we cannot be sure the
835        data is in the right format to be mapped, so `False` is returned in
836        that case even if a map_item() method is available.
837
838        :param BasicProcessor processor:    The BasicProcessor subclass object
839        with which to use map_item
840        :param DataSet dataset:                The DataSet object with which to
841        use map_item
842        """
843        # only run item mapper if extension of processor == extension of
844        # data file, for the scenario where a csv file was uploaded and
845        # converted to an ndjson-based data source, for example
846        # todo: this is kind of ugly, and a better fix may be possible
847        dataset_extension = dataset.get_extension()
848        if not dataset_extension:
849            # DataSet results file does not exist or has no extension, use expected extension
850            if hasattr(dataset, "extension"):
851                dataset_extension = dataset.extension
852            else:
853                # No known DataSet extension; cannot determine if map_item method compatible
854                return False
855
856        return hasattr(cls, "map_item") and cls.extension == dataset_extension

Check if this processor can use map_item

Checks if map_item method exists and is compatible with dataset. If dataset has a different extension than the default for this processor, or if the dataset has no extension, this means we cannot be sure the data is in the right format to be mapped, so False is returned in that case even if a map_item() method is available.

Parameters
  • BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
  • DataSet dataset: The DataSet object with which to use map_item
@classmethod
def get_mapped_item(cls, item):
858    @classmethod
859    def get_mapped_item(cls, item):
860        """
861        Get the mapped item using a processors map_item method.
862
863        Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
864        """
865        try:
866            mapped_item = cls.map_item(item)
867        except (KeyError, IndexError) as e:
868            raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
869
870        if not mapped_item:
871            raise MapItemException("Unable to map item!")
872
873        return mapped_item

Get the mapped item using a processors map_item method.

Ensure map_item method is compatible with a dataset by checking map_item_method_available first.

@classmethod
def is_filter(cls):
875    @classmethod
876    def is_filter(cls):
877        """
878        Is this processor a filter?
879
880        Filters do not produce their own dataset but replace the source_dataset dataset
881        instead.
882
883        :todo: Make this a bit more robust than sniffing the processor category
884        :return bool:
885        """
886        return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()

Is this processor a filter?

Filters do not produce their own dataset but replace the source_dataset dataset instead.

:todo: Make this a bit more robust than sniffing the processor category

Returns
@classmethod
def get_options(cls, parent_dataset=None, config=None):
888    @classmethod
889    def get_options(cls, parent_dataset=None, config=None):
890        """
891        Get processor options
892
893        This method by default returns the class's "options" attribute, or an
894        empty dictionary. It can be redefined by processors that need more
895        fine-grained options, e.g. in cases where the availability of options
896        is partially determined by the parent dataset's parameters.
897
898        :param config:
899        :param DataSet parent_dataset:  An object representing the dataset that
900          the processor would be run on
901        """
902
903        return cls.options if hasattr(cls, "options") else {}

Get processor options

This method by default returns the class's "options" attribute, or an empty dictionary. It can be redefined by processors that need more fine-grained options, e.g. in cases where the availability of options is partially determined by the parent dataset's parameters.

Parameters
  • config:
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
@classmethod
def get_status(cls):
905    @classmethod
906    def get_status(cls):
907        """
908        Get processor status
909
910        :return list:    Statuses of this processor
911        """
912        return cls.status if hasattr(cls, "status") else None

Get processor status

Returns
Statuses of this processor
@classmethod
def is_top_dataset(cls):
914    @classmethod
915    def is_top_dataset(cls):
916        """
917        Confirm this is *not* a top dataset, but a processor.
918
919        Used for processor compatibility checks.
920
921        :return bool:  Always `False`, because this is a processor.
922        """
923        return False

Confirm this is not a top dataset, but a processor.

Used for processor compatibility checks.

Returns

Always False, because this is a processor.

@classmethod
def is_from_collector(cls):
925    @classmethod
926    def is_from_collector(cls):
927        """
928        Check if this processor is one that collects data, i.e. a search or
929        import worker.
930
931        :return bool:
932        """
933        return cls.type.endswith("-search") or cls.type.endswith("-import")

Check if this processor is one that collects data, i.e. a search or import worker.

Returns
@classmethod
def get_extension(self, parent_dataset=None):
935    @classmethod
936    def get_extension(self, parent_dataset=None):
937        """
938        Return the extension of the processor's dataset
939
940        Used for processor compatibility checks.
941
942        :param DataSet parent_dataset:  An object representing the dataset that
943          the processor would be run on
944        :return str|None:  Dataset extension (without leading `.`) or `None`.
945        """
946        if self.is_filter():
947            if parent_dataset is not None:
948                # Filters should use the same extension as the parent dataset
949                return parent_dataset.get_extension()
950            else:
951                # No dataset provided, unable to determine extension of parent dataset
952                # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
953                return None
954        elif self.extension:
955            # Use explicitly defined extension in class (Processor class defaults to "csv")
956            return self.extension
957        else:
958            # A non filter processor updated the base Processor extension to None/False?
959            return None

Return the extension of the processor's dataset

Used for processor compatibility checks.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
Returns

Dataset extension (without leading .) or None.

@classmethod
def is_rankable(cls, multiple_items=True):
961    @classmethod
962    def is_rankable(cls, multiple_items=True):
963        """
964        Used for processor compatibility
965
966        :param bool multiple_items:  Consider datasets with multiple items per
967          item (e.g. word_1, word_2, etc)? Included for compatibility
968        """
969        return False

Used for processor compatibility

Parameters
  • bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)? Included for compatibility
@classmethod
def exclude_followup_processors(cls, processor_type=None):
971    @classmethod
972    def exclude_followup_processors(cls, processor_type=None):
973        """
974        Used for processor compatibility
975
976        To be defined by the child processor if it should exclude certain follow-up processors.
977        e.g.:
978
979        def exclude_followup_processors(cls, processor_type):
980            if processor_type in ["undesirable-followup-processor"]:
981                return True
982            return False
983
984        :param str processor_type:  Processor type to exclude
985        :return bool:  True if processor should be excluded, False otherwise
986        """
987        return False

Used for processor compatibility

To be defined by the child processor if it should exclude certain follow-up processors. e.g.:

def exclude_followup_processors(cls, processor_type): if processor_type in ["undesirable-followup-processor"]: return True return False

Parameters
  • str processor_type: Processor type to exclude
Returns

True if processor should be excluded, False otherwise

@abc.abstractmethod
def process(self):
989    @abc.abstractmethod
990    def process(self):
991        """
992        Process data
993
994        To be defined by the child processor.
995        """
996        pass

Process data

To be defined by the child processor.

@staticmethod
def is_4cat_processor():
 998    @staticmethod
 999    def is_4cat_processor():
1000        """
1001        Is this a 4CAT processor?
1002
1003        This is used to determine whether a class is a 4CAT
1004        processor.
1005
1006        :return:  True
1007        """
1008        return True

Is this a 4CAT processor?

This is used to determine whether a class is a 4CAT processor.

Returns

True