backend.lib.processor
Basic post-processor worker - should be inherited by workers to post-process results
1""" 2Basic post-processor worker - should be inherited by workers to post-process results 3""" 4import traceback 5import zipfile 6import typing 7import shutil 8import abc 9import csv 10import os 11import re 12import time 13 14from pathlib import PurePath 15 16from backend.lib.worker import BasicWorker 17from common.lib.dataset import DataSet 18from common.lib.fourcat_module import FourcatModule 19from common.lib.helpers import get_software_commit, remove_nuls, send_email, hash_to_md5 20from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException, 21 DataSetException, MapItemException) 22from common.config_manager import ConfigWrapper 23from common.lib.user import User 24 25csv.field_size_limit(1024 * 1024 * 1024) 26 27 28class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta): 29 """ 30 Abstract processor class 31 32 A processor takes a finished dataset as input and processes its result in 33 some way, with another dataset set as output. The input thus is a file, and 34 the output (usually) as well. In other words, the result of a processor can 35 be used as input for another processor (though whether and when this is 36 useful is another question). 37 38 To determine whether a processor can process a given dataset, you can 39 define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class 40 method which takes a dataset as argument and returns a bool that determines 41 if this processor is considered compatible with that dataset. For example: 42 43 .. code-block:: python 44 45 @classmethod 46 def is_compatible_with(cls, module=None, config=None): 47 return module.type == "linguistic-features" 48 49 50 """ 51 52 #: Database handler to interface with the 4CAT database 53 db = None 54 55 #: Job object that requests the execution of this processor 56 job = None 57 58 #: The dataset object that the processor is *creating*. 59 dataset = None 60 61 #: Owner (username) of the dataset 62 owner = None 63 64 #: The dataset object that the processor is *processing*. 65 source_dataset = None 66 67 #: The file that is being processed 68 source_file = None 69 70 #: Processor description, which will be displayed in the web interface 71 description = "No description available" 72 73 #: Category identifier, used to group processors in the web interface 74 category = "Other" 75 76 #: Extension of the file created by the processor 77 extension = "csv" 78 79 #: 4CAT settings from the perspective of the dataset's owner 80 config = None 81 82 #: Is this processor running 'within' a preset processor? 83 is_running_in_preset = False 84 85 #: Is this processor hidden in the front-end, and only used internally/in presets? 86 is_hidden = False 87 88 #: This will be defined automatically upon loading the processor. There is 89 #: no need to override manually 90 filepath = None 91 92 def work(self): 93 """ 94 Process a dataset 95 96 Loads dataset metadata, sets up the scaffolding for performing some kind 97 of processing on that dataset, and then processes it. Afterwards, clean 98 up. 99 """ 100 try: 101 # a dataset can have multiple owners, but the creator is the user 102 # that actually queued the processor, so their config is relevant 103 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 104 self.owner = self.dataset.creator 105 except DataSetException: 106 # query has been deleted in the meantime. finish without error, 107 # as deleting it will have been a conscious choice by a user 108 self.job.finish() 109 return 110 111 # set up config reader wrapping the worker's config manager, which is 112 # in turn the one passed to it by the WorkerManager, which is the one 113 # originally loaded in bootstrap 114 self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner)) 115 116 if self.dataset.data.get("key_parent", None): 117 # search workers never have parents (for now), so we don't need to 118 # find out what the source_dataset dataset is if it's a search worker 119 try: 120 self.source_dataset = self.dataset.get_parent() 121 122 # for presets, transparently use the *top* dataset as a source_dataset 123 # since that is where any underlying processors should get 124 # their data from. However, this should only be done as long as the 125 # preset is not finished yet, because after that there may be processors 126 # that run on the final preset result 127 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 128 self.is_running_in_preset = True 129 self.source_dataset = self.source_dataset.get_parent() 130 if self.source_dataset is None: 131 # this means there is no dataset that is *not* a preset anywhere 132 # above this dataset. This should never occur, but if it does, we 133 # cannot continue 134 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 135 (self.type, self.dataset.key)) 136 self.job.finish() 137 return 138 139 except DataSetException: 140 # we need to know what the source_dataset dataset was to properly handle the 141 # analysis 142 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 143 self.type, self.dataset.key)) 144 self.job.finish() 145 return 146 147 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 148 # not finished yet - retry after a while 149 # exception for presets, since these *should* be unfinished 150 # until underlying processors are done 151 self.job.release(delay=30) 152 return 153 154 self.source_file = self.source_dataset.get_results_path() 155 if not self.source_file.exists(): 156 self.dataset.update_status("Finished, no input data found.") 157 158 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 159 160 processor_name = self.title if hasattr(self, "title") else self.type 161 self.dataset.clear_log() 162 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 163 164 # start log file 165 self.dataset.update_status("Processing data") 166 self.dataset.update_version(get_software_commit(self)) 167 168 # get parameters 169 # if possible, fill defaults where parameters are not provided 170 given_parameters = self.dataset.parameters.copy() 171 all_parameters = self.get_options(self.dataset, config=self.config) 172 self.parameters = { 173 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 174 for param in [*all_parameters.keys(), *given_parameters.keys()] 175 } 176 177 # now the parameters have been loaded into memory, clear any sensitive 178 # ones. This has a side-effect that a processor may not run again 179 # without starting from scratch, but this is the price of progress 180 options = self.get_options(self.dataset.get_parent(), config=self.config) 181 for option, option_settings in options.items(): 182 if option_settings.get("sensitive"): 183 self.dataset.delete_parameter(option) 184 185 if self.interrupted: 186 self.dataset.log("Processing interrupted, trying again later") 187 return self.abort() 188 189 if not self.dataset.is_finished(): 190 try: 191 self.process() 192 self.after_process() 193 except WorkerInterruptedException as e: 194 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 195 self.abort() 196 except Exception as e: 197 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 198 stack = traceback.extract_tb(e.__traceback__) 199 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]] 200 location = "->".join(frames) 201 202 # Not all datasets have source_dataset keys 203 if len(self.dataset.get_genealogy()) > 1: 204 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 205 else: 206 parent_key = "" 207 208 print("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 209 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e))) 210 # remove any result files that have been created so far 211 self.remove_files() 212 213 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 214 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack) 215 else: 216 # dataset already finished, job shouldn't be open anymore 217 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % ( 218 self.job.data["jobtype"], self.job.data["remote_id"])) 219 self.job.finish() 220 221 def after_process(self): 222 """ 223 Run after processing the dataset 224 225 This method cleans up temporary files, and if needed, handles logistics 226 concerning the result file, e.g. running a pre-defined processor on the 227 result, copying it to another dataset, and so on. 228 """ 229 if self.dataset.data["num_rows"] > 0: 230 self.dataset.update_status("Dataset completed.") 231 232 if not self.dataset.is_finished(): 233 self.dataset.finish() 234 235 self.dataset.remove_staging_areas() 236 237 # see if we have anything else lined up to run next 238 for next in self.parameters.get("next", []): 239 can_run_next = True 240 next_parameters = next.get("parameters", {}) 241 next_type = next.get("type", "") 242 try: 243 available_processors = self.dataset.get_available_processors(config=self.config) 244 except ValueError: 245 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 246 break 247 248 # see if we have anything else lined up to run next 249 for next in self.parameters.get("next", []): 250 can_run_next = True 251 next_parameters = next.get("parameters", {}) 252 next_type = next.get("type", "") 253 try: 254 available_processors = self.dataset.get_available_processors(config=self.config) 255 except ValueError: 256 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 257 break 258 259 260 # run it only if the post-processor is actually available for this query 261 if self.dataset.data["num_rows"] <= 0: 262 can_run_next = False 263 self.log.info( 264 "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % ( 265 next_type, self.dataset.key)) 266 267 elif next_type in available_processors: 268 next_analysis = DataSet( 269 parameters=next_parameters, 270 type=next_type, 271 db=self.db, 272 parent=self.dataset.key, 273 extension=available_processors[next_type].extension, 274 is_private=self.dataset.is_private, 275 owner=self.dataset.creator, 276 modules=self.modules 277 ) 278 # copy ownership from parent dataset 279 next_analysis.copy_ownership_from(self.dataset) 280 # add to queue 281 self.queue.add_job(next_type, remote_id=next_analysis.key) 282 else: 283 can_run_next = False 284 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % ( 285 self.dataset.key, self.type, next_type)) 286 287 if not can_run_next: 288 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 289 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 290 if "attach_to" in self.parameters: 291 # Probably should not happen, but for some reason a mid processor has been designated as the processor 292 # the parent should attach to 293 pass 294 else: 295 # Check for "attach_to" parameter in descendents 296 while True: 297 if "attach_to" in next_parameters: 298 self.parameters["attach_to"] = next_parameters["attach_to"] 299 break 300 else: 301 if "next" in next_parameters: 302 next_parameters = next_parameters["next"][0]["parameters"] 303 else: 304 # No more descendents 305 # Should not happen; we cannot find the source dataset 306 self.log.warning( 307 "Cannot find preset's source dataset for dataset %s" % self.dataset.key) 308 break 309 310 # see if we need to register the result somewhere 311 if "copy_to" in self.parameters: 312 # copy the results to an arbitrary place that was passed 313 if self.dataset.get_results_path().exists(): 314 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 315 else: 316 # if copy_to was passed, that means it's important that this 317 # file exists somewhere, so we create it as an empty file 318 with open(self.parameters["copy_to"], "w") as empty_file: 319 empty_file.write("") 320 321 # see if this query chain is to be attached to another query 322 # if so, the full genealogy of this query (minus the original dataset) 323 # is attached to the given query - this is mostly useful for presets, 324 # where a chain of processors can be marked as 'underlying' a preset 325 if "attach_to" in self.parameters: 326 try: 327 # copy metadata and results to the surrogate 328 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules) 329 330 if self.dataset.get_results_path().exists(): 331 # Update the surrogate's results file suffix to match this dataset's suffix 332 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix( 333 self.dataset.get_results_path().suffix) 334 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 335 336 try: 337 surrogate.finish(self.dataset.data["num_rows"]) 338 except RuntimeError: 339 # already finished, could happen (though it shouldn't) 340 pass 341 342 surrogate.update_status(self.dataset.get_status()) 343 344 except ValueError: 345 # dataset with key to attach to doesn't exist... 346 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % ( 347 self.dataset.key, self.parameters["attach_to"])) 348 349 self.job.finish() 350 351 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 352 owner = self.dataset.get_parameters().get("email-complete", False) 353 # Check that username is email address 354 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 355 from email.mime.multipart import MIMEMultipart 356 from email.mime.text import MIMEText 357 from smtplib import SMTPException 358 import socket 359 import html2text 360 361 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 362 owner = self.dataset.get_parameters().get("email-complete", False) 363 # Check that username is email address 364 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 365 from email.mime.multipart import MIMEMultipart 366 from email.mime.text import MIMEText 367 from smtplib import SMTPException 368 import socket 369 import html2text 370 371 self.log.debug("Sending email to %s" % owner) 372 dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key 373 sender = self.config.get('mail.noreply') 374 message = MIMEMultipart("alternative") 375 message["From"] = sender 376 message["To"] = owner 377 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 378 mail = """ 379 <p>Hello %s,</p> 380 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 381 <p>You can view your dataset via the following link:</p> 382 <p><a href="%s">%s</a></p> 383 <p>Sincerely,</p> 384 <p>Your friendly neighborhood 4CAT admin</p> 385 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 386 html_parser = html2text.HTML2Text() 387 message.attach(MIMEText(html_parser.handle(mail), "plain")) 388 message.attach(MIMEText(mail, "html")) 389 try: 390 send_email([owner], message, self.config) 391 except (SMTPException, ConnectionRefusedError, socket.timeout): 392 self.log.error("Error sending email to %s" % owner) 393 394 def remove_files(self): 395 """ 396 Clean up result files and any staging files for processor to be attempted 397 later if desired. 398 """ 399 # Remove the results file that was created 400 if self.dataset.get_results_path().exists(): 401 self.dataset.get_results_path().unlink() 402 if self.dataset.get_results_folder_path().exists(): 403 shutil.rmtree(self.dataset.get_results_folder_path()) 404 405 # Remove any staging areas with temporary data 406 self.dataset.remove_staging_areas() 407 408 def abort(self): 409 """ 410 Abort dataset creation and clean up so it may be attempted again later 411 """ 412 413 # delete annotations that have been generated as part of this processor 414 self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True) 415 # remove any result files that have been created so far 416 self.remove_files() 417 418 # we release instead of finish, since interrupting is just that - the 419 # job should resume at a later point. Delay resuming by 10 seconds to 420 # give 4CAT the time to do whatever it wants (though usually this isn't 421 # needed since restarting also stops the spawning of new workers) 422 if self.interrupted == self.INTERRUPT_RETRY: 423 # retry later - wait at least 10 seconds to give the backend time to shut down 424 self.job.release(delay=10) 425 elif self.interrupted == self.INTERRUPT_CANCEL: 426 # cancel job 427 self.job.finish() 428 429 def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs): 430 """ 431 Request an iterable of URLs and return results 432 433 This method takes an iterable yielding URLs and yields the result for 434 a GET request for that URL in return. This is done through the worker 435 manager's DelegatedRequestHandler, which can run multiple requests in 436 parallel and divide them over the proxies configured in 4CAT (if any). 437 Proxy cooloff and queueing is shared with other processors, so that a 438 processor will never accidentally request from the same site as another 439 processor, potentially triggering rate limits. 440 441 :param urls: Something that can be iterated over and yields URLs 442 :param kwargs: Other keyword arguments are passed on to `add_urls` 443 and eventually to `requests.get()`. 444 :param bool preserve_order: Return items in the original order. Use 445 `False` to potentially speed up processing, if order is not important. 446 :return: A generator yielding request results, i.e. tuples of a 447 URL and a `requests` response objects 448 """ 449 queue_name = self._proxy_queue_name() 450 delegator = self.manager.proxy_delegator 451 452 delegator.refresh_settings(self.config) 453 454 # 50 is an arbitrary batch size - but we top up every 0.05s, so 455 # that should be sufficient 456 batch_size = 50 457 458 # we need an iterable, so we can use next() and StopIteration 459 urls = iter(urls) 460 461 have_urls = True 462 while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls: 463 if queue_length < batch_size and have_urls: 464 batch = [] 465 while len(batch) < (batch_size - queue_length): 466 try: 467 batch.append(next(urls)) 468 except StopIteration: 469 have_urls = False 470 break 471 472 delegator.add_urls(batch, queue_name, **kwargs) 473 474 time.sleep(0.05) # arbitrary... 475 for url, result in delegator.get_results(queue_name, preserve_order=preserve_order): 476 # result may also be a FailedProxiedRequest! 477 # up to the processor to decide how to deal with it 478 yield url, result 479 480 def push_proxied_request(self, url, position=-1, **kwargs): 481 """ 482 Add a single URL to the proxied requests queue 483 484 :param str url: URL to add 485 :param position: Position to add to queue; can be used to add priority 486 requests, adds to end of queue by default 487 :param kwargs: 488 """ 489 self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs) 490 491 def flush_proxied_requests(self): 492 """ 493 Get rid of remaining proxied requests 494 495 Can be used if enough results are available and any remaining ones need 496 to be stopped ASAP and are otherwise unneeded. 497 498 Blocking! 499 """ 500 self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name()) 501 502 def _proxy_queue_name(self): 503 """ 504 Get proxy queue name 505 506 For internal use. 507 508 :return str: 509 """ 510 return f"{self.type}-{self.dataset.key}" 511 512 def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]): 513 """ 514 A generator that iterates through files in an archive 515 516 With every iteration, the processor's 'interrupted' flag is checked, 517 and if set a ProcessorInterruptedException is raised, which by default 518 is caught and subsequently stops execution gracefully. 519 520 Files are temporarily unzipped and deleted after use. 521 522 :param Path path: Path to zip file to read 523 :param Path staging_area: Where to store the files while they're 524 being worked with. If omitted, a temporary folder is created and 525 deleted after use 526 :param bool immediately_delete: Temporary files are removed after yielded; 527 False keeps files until the staging_area is removed (usually during processor 528 cleanup) 529 :param list filename_filter: Whitelist of filenames to iterate. 530 Other files will be ignored. If empty, do not ignore anything. 531 :return: An iterator with a Path item for each file 532 """ 533 534 if not path.exists(): 535 return 536 537 if not staging_area: 538 staging_area = self.dataset.get_staging_area() 539 540 if not staging_area.exists() or not staging_area.is_dir(): 541 raise RuntimeError("Staging area %s is not a valid folder") 542 543 with zipfile.ZipFile(path, "r") as archive_file: 544 archive_contents = sorted(archive_file.namelist()) 545 546 for archived_file in archive_contents: 547 if filename_filter and archived_file not in filename_filter: 548 continue 549 550 info = archive_file.getinfo(archived_file) 551 if info.is_dir(): 552 continue 553 554 if self.interrupted: 555 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 556 557 temp_file = staging_area.joinpath(archived_file) 558 archive_file.extract(archived_file, staging_area) 559 560 yield temp_file 561 if immediately_delete: 562 temp_file.unlink() 563 564 def unpack_archive_contents(self, path, staging_area=None): 565 """ 566 Unpack all files in an archive to a staging area 567 568 With every iteration, the processor's 'interrupted' flag is checked, 569 and if set a ProcessorInterruptedException is raised, which by default 570 is caught and subsequently stops execution gracefully. 571 572 Files are unzipped to a staging area. The staging area is *not* 573 cleaned up automatically. 574 575 :param Path path: Path to zip file to read 576 :param Path staging_area: Where to store the files while they're 577 being worked with. If omitted, a temporary folder is created and 578 deleted after use 579 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 580 :return Path: A path to the staging area 581 """ 582 583 if not path.exists(): 584 return 585 586 if not staging_area: 587 staging_area = self.dataset.get_staging_area() 588 589 if not staging_area.exists() or not staging_area.is_dir(): 590 raise RuntimeError("Staging area %s is not a valid folder") 591 592 paths = [] 593 with zipfile.ZipFile(path, "r") as archive_file: 594 archive_contents = sorted(archive_file.namelist()) 595 596 for archived_file in archive_contents: 597 if self.interrupted: 598 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 599 600 file_name = archived_file.split("/")[-1] 601 temp_file = staging_area.joinpath(file_name) 602 archive_file.extract(archived_file, staging_area) 603 paths.append(temp_file) 604 605 return staging_area 606 607 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 608 """ 609 Extract a file from an archive by name 610 611 :param str filename: Name of file to extract 612 :param Path archive_path: Path to zip file to read 613 :param Path staging_area: Where to store the files while they're 614 being worked with. If omitted, a temporary folder is created 615 :return Path: A path to the extracted file 616 """ 617 if not archive_path.exists(): 618 return 619 620 if not staging_area: 621 staging_area = self.dataset.get_staging_area() 622 623 if not staging_area.exists() or not staging_area.is_dir(): 624 raise RuntimeError("Staging area %s is not a valid folder") 625 626 with zipfile.ZipFile(archive_path, "r") as archive_file: 627 if filename not in archive_file.namelist(): 628 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 629 else: 630 archive_file.extract(filename, staging_area) 631 return staging_area.joinpath(filename) 632 633 def write_csv_items_and_finish(self, data): 634 """ 635 Write data as csv to results file and finish dataset 636 637 Determines result file path using dataset's path determination helper 638 methods. After writing results, the dataset is marked finished. Will 639 raise a ProcessorInterruptedException if the interrupted flag for this 640 processor is set while iterating. 641 642 :param data: A list or tuple of dictionaries, all with the same keys 643 """ 644 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 645 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 646 647 if not data: 648 raise ValueError("write_csv_items requires a dictionary with at least one item") 649 650 self.dataset.update_status("Writing results file") 651 writer = False 652 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 653 for row in data: 654 if self.interrupted: 655 raise ProcessorInterruptedException("Interrupted while writing results file") 656 657 row = remove_nuls(row) 658 if not writer: 659 writer = csv.DictWriter(results, fieldnames=row.keys()) 660 writer.writeheader() 661 662 writer.writerow(row) 663 664 self.dataset.update_status("Finished") 665 self.dataset.finish(len(data)) 666 667 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 668 """ 669 Archive a bunch of files into a zip archive and finish processing 670 671 :param list|Path files: If a list, all files will be added to the 672 archive and deleted afterwards. If a folder, all files in the folder 673 will be added and the folder will be deleted afterwards. 674 :param int num_items: Items in the dataset. If None, the amount of 675 files added to the archive will be used. 676 :param int compression: Type of compression to use. By default, files 677 are not compressed, to speed up unarchiving. 678 :param bool finish: Finish the dataset/job afterwards or not? 679 """ 680 is_folder = False 681 if issubclass(type(files), PurePath): 682 is_folder = files 683 if not files.exists() or not files.is_dir(): 684 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 685 686 files = files.glob("*") 687 688 # create zip of archive and delete temporary files and folder 689 self.dataset.update_status("Compressing results into archive") 690 done = 0 691 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 692 for output_path in files: 693 zip.write(output_path, output_path.name) 694 output_path.unlink() 695 done += 1 696 697 # delete temporary folder 698 if is_folder: 699 shutil.rmtree(is_folder) 700 701 self.dataset.update_status("Finished") 702 if num_items is None: 703 num_items = done 704 705 if finish: 706 self.dataset.finish(num_items) 707 708 def create_standalone(self): 709 """ 710 Copy this dataset and make that copy standalone 711 712 This has the benefit of allowing for all analyses that can be run on 713 full datasets on the new, filtered copy as well. 714 715 :return DataSet: The new standalone dataset 716 """ 717 top_parent = self.source_dataset 718 719 finished = self.dataset.check_dataset_finished() 720 if finished == 'empty': 721 # No data to process, so we can't create a standalone dataset 722 return 723 elif finished is None: 724 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 725 pass 726 727 standalone = self.dataset.copy(shallow=False) 728 standalone.body_match = "(Filtered) " + top_parent.query 729 standalone.datasource = top_parent.parameters.get("datasource", "custom") 730 731 try: 732 standalone.board = top_parent.board 733 except AttributeError: 734 standalone.board = self.type 735 736 standalone.type = top_parent.type 737 738 standalone.detach() 739 standalone.delete_parameter("key_parent") 740 741 self.dataset.copied_to = standalone.key 742 743 # we don't need this file anymore - it has been copied to the new 744 # standalone dataset, and this one is not accessible via the interface 745 # except as a link to the copied standalone dataset 746 os.unlink(self.dataset.get_results_path()) 747 748 # Copy the log 749 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 750 751 return standalone 752 753 def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int: 754 """ 755 Saves annotations made by this processor on the basis of another dataset. 756 Also adds some data regarding this processor: set `author` and `label` to processor name, 757 and add parameters to `metadata` (unless explicitly indicated). 758 759 :param annotations: List of dictionaries with annotation items. Must have `item_id` and `value`. 760 E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] 761 :param source_dataset: The dataset that these annotations will be saved on. If None, will use the 762 top parent. 763 :param bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations 764 are still shown in `iterate_items()`). 765 766 :returns int: How many annotations were saved. 767 768 """ 769 770 if not annotations: 771 return 0 772 773 # Default to parent dataset 774 if not source_dataset: 775 source_dataset = self.source_dataset.top_parent() 776 777 # Check if this dataset already has annotation fields, and if so, store some values to use per annotation. 778 annotation_fields = source_dataset.annotation_fields 779 780 # Keep track of what fields we've already seen, so we don't need to hash every time. 781 seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id 782 for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items} 783 784 # Loop through all annotations. This may be batched. 785 for annotation in annotations: 786 787 # Keep track of what dataset generated this annotation 788 annotation["from_dataset"] = self.dataset.key 789 # Set the author to this processor's name 790 if not annotation.get("author"): 791 annotation["author"] = self.name 792 if not annotation.get("author_original"): 793 annotation["author_original"] = self.name 794 annotation["by_processor"] = True 795 796 # Only use a default label if no custom one is given 797 if not annotation.get("label"): 798 annotation["label"] = self.name 799 800 # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet. 801 # We need to do this within this loop because this function may be called in batches and with different 802 # annotation types. 803 if (annotation["from_dataset"], annotation["label"]) not in seen_fields: 804 # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key. 805 # This should create unique fields, even if there's multiple annotation types for one processor. 806 field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"]) 807 seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id 808 annotation_fields[field_id] = { 809 "label": annotation["label"], 810 "type": annotation["type"] if annotation.get("type") else "text", 811 "from_dataset": annotation["from_dataset"], 812 "hide_in_explorer": hide_in_explorer 813 } 814 else: 815 # Else just get the field ID 816 field_id = seen_fields[(annotation["from_dataset"], annotation["label"])] 817 818 # Add field ID to the annotation 819 annotation["field_id"] = field_id 820 821 annotations_saved = source_dataset.save_annotations(annotations) 822 source_dataset.save_annotation_fields(annotation_fields) 823 824 return annotations_saved 825 826 @classmethod 827 def map_item_method_available(cls, dataset): 828 """ 829 Check if this processor can use map_item 830 831 Checks if map_item method exists and is compatible with dataset. If 832 dataset has a different extension than the default for this processor, 833 or if the dataset has no extension, this means we cannot be sure the 834 data is in the right format to be mapped, so `False` is returned in 835 that case even if a map_item() method is available. 836 837 :param BasicProcessor processor: The BasicProcessor subclass object 838 with which to use map_item 839 :param DataSet dataset: The DataSet object with which to 840 use map_item 841 """ 842 # only run item mapper if extension of processor == extension of 843 # data file, for the scenario where a csv file was uploaded and 844 # converted to an ndjson-based data source, for example 845 # todo: this is kind of ugly, and a better fix may be possible 846 dataset_extension = dataset.get_extension() 847 if not dataset_extension: 848 # DataSet results file does not exist or has no extension, use expected extension 849 if hasattr(dataset, "extension"): 850 dataset_extension = dataset.extension 851 else: 852 # No known DataSet extension; cannot determine if map_item method compatible 853 return False 854 855 return hasattr(cls, "map_item") and cls.extension == dataset_extension 856 857 @classmethod 858 def get_mapped_item(cls, item): 859 """ 860 Get the mapped item using a processors map_item method. 861 862 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 863 """ 864 try: 865 mapped_item = cls.map_item(item) 866 except (KeyError, IndexError) as e: 867 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 868 869 if not mapped_item: 870 raise MapItemException("Unable to map item!") 871 872 return mapped_item 873 874 @classmethod 875 def is_filter(cls): 876 """ 877 Is this processor a filter? 878 879 Filters do not produce their own dataset but replace the source_dataset dataset 880 instead. 881 882 :todo: Make this a bit more robust than sniffing the processor category 883 :return bool: 884 """ 885 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower() 886 887 @classmethod 888 def get_options(cls, parent_dataset=None, config=None): 889 """ 890 Get processor options 891 892 This method by default returns the class's "options" attribute, or an 893 empty dictionary. It can be redefined by processors that need more 894 fine-grained options, e.g. in cases where the availability of options 895 is partially determined by the parent dataset's parameters. 896 897 :param config: 898 :param DataSet parent_dataset: An object representing the dataset that 899 the processor would be run on 900 """ 901 902 return cls.options if hasattr(cls, "options") else {} 903 904 @classmethod 905 def get_status(cls): 906 """ 907 Get processor status 908 909 :return list: Statuses of this processor 910 """ 911 return cls.status if hasattr(cls, "status") else None 912 913 @classmethod 914 def is_top_dataset(cls): 915 """ 916 Confirm this is *not* a top dataset, but a processor. 917 918 Used for processor compatibility checks. 919 920 :return bool: Always `False`, because this is a processor. 921 """ 922 return False 923 924 @classmethod 925 def is_from_collector(cls): 926 """ 927 Check if this processor is one that collects data, i.e. a search or 928 import worker. 929 930 :return bool: 931 """ 932 return cls.type.endswith("-search") or cls.type.endswith("-import") 933 934 @classmethod 935 def get_extension(self, parent_dataset=None): 936 """ 937 Return the extension of the processor's dataset 938 939 Used for processor compatibility checks. 940 941 :param DataSet parent_dataset: An object representing the dataset that 942 the processor would be run on 943 :return str|None: Dataset extension (without leading `.`) or `None`. 944 """ 945 if self.is_filter(): 946 if parent_dataset is not None: 947 # Filters should use the same extension as the parent dataset 948 return parent_dataset.get_extension() 949 else: 950 # No dataset provided, unable to determine extension of parent dataset 951 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 952 return None 953 elif self.extension: 954 # Use explicitly defined extension in class (Processor class defaults to "csv") 955 return self.extension 956 else: 957 # A non filter processor updated the base Processor extension to None/False? 958 return None 959 960 @classmethod 961 def is_rankable(cls, multiple_items=True): 962 """ 963 Used for processor compatibility 964 965 :param bool multiple_items: Consider datasets with multiple items per 966 item (e.g. word_1, word_2, etc)? Included for compatibility 967 """ 968 return False 969 970 @classmethod 971 def exclude_followup_processors(cls, processor_type=None): 972 """ 973 Used for processor compatibility 974 975 To be defined by the child processor if it should exclude certain follow-up processors. 976 e.g.: 977 978 def exclude_followup_processors(cls, processor_type): 979 if processor_type in ["undesirable-followup-processor"]: 980 return True 981 return False 982 983 :param str processor_type: Processor type to exclude 984 :return bool: True if processor should be excluded, False otherwise 985 """ 986 return False 987 988 @abc.abstractmethod 989 def process(self): 990 """ 991 Process data 992 993 To be defined by the child processor. 994 """ 995 pass 996 997 @staticmethod 998 def is_4cat_processor(): 999 """ 1000 Is this a 4CAT processor? 1001 1002 This is used to determine whether a class is a 4CAT 1003 processor. 1004 1005 :return: True 1006 """ 1007 return True
29class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta): 30 """ 31 Abstract processor class 32 33 A processor takes a finished dataset as input and processes its result in 34 some way, with another dataset set as output. The input thus is a file, and 35 the output (usually) as well. In other words, the result of a processor can 36 be used as input for another processor (though whether and when this is 37 useful is another question). 38 39 To determine whether a processor can process a given dataset, you can 40 define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class 41 method which takes a dataset as argument and returns a bool that determines 42 if this processor is considered compatible with that dataset. For example: 43 44 .. code-block:: python 45 46 @classmethod 47 def is_compatible_with(cls, module=None, config=None): 48 return module.type == "linguistic-features" 49 50 51 """ 52 53 #: Database handler to interface with the 4CAT database 54 db = None 55 56 #: Job object that requests the execution of this processor 57 job = None 58 59 #: The dataset object that the processor is *creating*. 60 dataset = None 61 62 #: Owner (username) of the dataset 63 owner = None 64 65 #: The dataset object that the processor is *processing*. 66 source_dataset = None 67 68 #: The file that is being processed 69 source_file = None 70 71 #: Processor description, which will be displayed in the web interface 72 description = "No description available" 73 74 #: Category identifier, used to group processors in the web interface 75 category = "Other" 76 77 #: Extension of the file created by the processor 78 extension = "csv" 79 80 #: 4CAT settings from the perspective of the dataset's owner 81 config = None 82 83 #: Is this processor running 'within' a preset processor? 84 is_running_in_preset = False 85 86 #: Is this processor hidden in the front-end, and only used internally/in presets? 87 is_hidden = False 88 89 #: This will be defined automatically upon loading the processor. There is 90 #: no need to override manually 91 filepath = None 92 93 def work(self): 94 """ 95 Process a dataset 96 97 Loads dataset metadata, sets up the scaffolding for performing some kind 98 of processing on that dataset, and then processes it. Afterwards, clean 99 up. 100 """ 101 try: 102 # a dataset can have multiple owners, but the creator is the user 103 # that actually queued the processor, so their config is relevant 104 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 105 self.owner = self.dataset.creator 106 except DataSetException: 107 # query has been deleted in the meantime. finish without error, 108 # as deleting it will have been a conscious choice by a user 109 self.job.finish() 110 return 111 112 # set up config reader wrapping the worker's config manager, which is 113 # in turn the one passed to it by the WorkerManager, which is the one 114 # originally loaded in bootstrap 115 self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner)) 116 117 if self.dataset.data.get("key_parent", None): 118 # search workers never have parents (for now), so we don't need to 119 # find out what the source_dataset dataset is if it's a search worker 120 try: 121 self.source_dataset = self.dataset.get_parent() 122 123 # for presets, transparently use the *top* dataset as a source_dataset 124 # since that is where any underlying processors should get 125 # their data from. However, this should only be done as long as the 126 # preset is not finished yet, because after that there may be processors 127 # that run on the final preset result 128 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 129 self.is_running_in_preset = True 130 self.source_dataset = self.source_dataset.get_parent() 131 if self.source_dataset is None: 132 # this means there is no dataset that is *not* a preset anywhere 133 # above this dataset. This should never occur, but if it does, we 134 # cannot continue 135 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 136 (self.type, self.dataset.key)) 137 self.job.finish() 138 return 139 140 except DataSetException: 141 # we need to know what the source_dataset dataset was to properly handle the 142 # analysis 143 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 144 self.type, self.dataset.key)) 145 self.job.finish() 146 return 147 148 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 149 # not finished yet - retry after a while 150 # exception for presets, since these *should* be unfinished 151 # until underlying processors are done 152 self.job.release(delay=30) 153 return 154 155 self.source_file = self.source_dataset.get_results_path() 156 if not self.source_file.exists(): 157 self.dataset.update_status("Finished, no input data found.") 158 159 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 160 161 processor_name = self.title if hasattr(self, "title") else self.type 162 self.dataset.clear_log() 163 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 164 165 # start log file 166 self.dataset.update_status("Processing data") 167 self.dataset.update_version(get_software_commit(self)) 168 169 # get parameters 170 # if possible, fill defaults where parameters are not provided 171 given_parameters = self.dataset.parameters.copy() 172 all_parameters = self.get_options(self.dataset, config=self.config) 173 self.parameters = { 174 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 175 for param in [*all_parameters.keys(), *given_parameters.keys()] 176 } 177 178 # now the parameters have been loaded into memory, clear any sensitive 179 # ones. This has a side-effect that a processor may not run again 180 # without starting from scratch, but this is the price of progress 181 options = self.get_options(self.dataset.get_parent(), config=self.config) 182 for option, option_settings in options.items(): 183 if option_settings.get("sensitive"): 184 self.dataset.delete_parameter(option) 185 186 if self.interrupted: 187 self.dataset.log("Processing interrupted, trying again later") 188 return self.abort() 189 190 if not self.dataset.is_finished(): 191 try: 192 self.process() 193 self.after_process() 194 except WorkerInterruptedException as e: 195 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 196 self.abort() 197 except Exception as e: 198 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 199 stack = traceback.extract_tb(e.__traceback__) 200 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]] 201 location = "->".join(frames) 202 203 # Not all datasets have source_dataset keys 204 if len(self.dataset.get_genealogy()) > 1: 205 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 206 else: 207 parent_key = "" 208 209 print("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 210 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e))) 211 # remove any result files that have been created so far 212 self.remove_files() 213 214 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 215 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack) 216 else: 217 # dataset already finished, job shouldn't be open anymore 218 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % ( 219 self.job.data["jobtype"], self.job.data["remote_id"])) 220 self.job.finish() 221 222 def after_process(self): 223 """ 224 Run after processing the dataset 225 226 This method cleans up temporary files, and if needed, handles logistics 227 concerning the result file, e.g. running a pre-defined processor on the 228 result, copying it to another dataset, and so on. 229 """ 230 if self.dataset.data["num_rows"] > 0: 231 self.dataset.update_status("Dataset completed.") 232 233 if not self.dataset.is_finished(): 234 self.dataset.finish() 235 236 self.dataset.remove_staging_areas() 237 238 # see if we have anything else lined up to run next 239 for next in self.parameters.get("next", []): 240 can_run_next = True 241 next_parameters = next.get("parameters", {}) 242 next_type = next.get("type", "") 243 try: 244 available_processors = self.dataset.get_available_processors(config=self.config) 245 except ValueError: 246 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 247 break 248 249 # see if we have anything else lined up to run next 250 for next in self.parameters.get("next", []): 251 can_run_next = True 252 next_parameters = next.get("parameters", {}) 253 next_type = next.get("type", "") 254 try: 255 available_processors = self.dataset.get_available_processors(config=self.config) 256 except ValueError: 257 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 258 break 259 260 261 # run it only if the post-processor is actually available for this query 262 if self.dataset.data["num_rows"] <= 0: 263 can_run_next = False 264 self.log.info( 265 "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % ( 266 next_type, self.dataset.key)) 267 268 elif next_type in available_processors: 269 next_analysis = DataSet( 270 parameters=next_parameters, 271 type=next_type, 272 db=self.db, 273 parent=self.dataset.key, 274 extension=available_processors[next_type].extension, 275 is_private=self.dataset.is_private, 276 owner=self.dataset.creator, 277 modules=self.modules 278 ) 279 # copy ownership from parent dataset 280 next_analysis.copy_ownership_from(self.dataset) 281 # add to queue 282 self.queue.add_job(next_type, remote_id=next_analysis.key) 283 else: 284 can_run_next = False 285 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % ( 286 self.dataset.key, self.type, next_type)) 287 288 if not can_run_next: 289 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 290 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 291 if "attach_to" in self.parameters: 292 # Probably should not happen, but for some reason a mid processor has been designated as the processor 293 # the parent should attach to 294 pass 295 else: 296 # Check for "attach_to" parameter in descendents 297 while True: 298 if "attach_to" in next_parameters: 299 self.parameters["attach_to"] = next_parameters["attach_to"] 300 break 301 else: 302 if "next" in next_parameters: 303 next_parameters = next_parameters["next"][0]["parameters"] 304 else: 305 # No more descendents 306 # Should not happen; we cannot find the source dataset 307 self.log.warning( 308 "Cannot find preset's source dataset for dataset %s" % self.dataset.key) 309 break 310 311 # see if we need to register the result somewhere 312 if "copy_to" in self.parameters: 313 # copy the results to an arbitrary place that was passed 314 if self.dataset.get_results_path().exists(): 315 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 316 else: 317 # if copy_to was passed, that means it's important that this 318 # file exists somewhere, so we create it as an empty file 319 with open(self.parameters["copy_to"], "w") as empty_file: 320 empty_file.write("") 321 322 # see if this query chain is to be attached to another query 323 # if so, the full genealogy of this query (minus the original dataset) 324 # is attached to the given query - this is mostly useful for presets, 325 # where a chain of processors can be marked as 'underlying' a preset 326 if "attach_to" in self.parameters: 327 try: 328 # copy metadata and results to the surrogate 329 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules) 330 331 if self.dataset.get_results_path().exists(): 332 # Update the surrogate's results file suffix to match this dataset's suffix 333 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix( 334 self.dataset.get_results_path().suffix) 335 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 336 337 try: 338 surrogate.finish(self.dataset.data["num_rows"]) 339 except RuntimeError: 340 # already finished, could happen (though it shouldn't) 341 pass 342 343 surrogate.update_status(self.dataset.get_status()) 344 345 except ValueError: 346 # dataset with key to attach to doesn't exist... 347 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % ( 348 self.dataset.key, self.parameters["attach_to"])) 349 350 self.job.finish() 351 352 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 353 owner = self.dataset.get_parameters().get("email-complete", False) 354 # Check that username is email address 355 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 356 from email.mime.multipart import MIMEMultipart 357 from email.mime.text import MIMEText 358 from smtplib import SMTPException 359 import socket 360 import html2text 361 362 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 363 owner = self.dataset.get_parameters().get("email-complete", False) 364 # Check that username is email address 365 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 366 from email.mime.multipart import MIMEMultipart 367 from email.mime.text import MIMEText 368 from smtplib import SMTPException 369 import socket 370 import html2text 371 372 self.log.debug("Sending email to %s" % owner) 373 dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key 374 sender = self.config.get('mail.noreply') 375 message = MIMEMultipart("alternative") 376 message["From"] = sender 377 message["To"] = owner 378 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 379 mail = """ 380 <p>Hello %s,</p> 381 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 382 <p>You can view your dataset via the following link:</p> 383 <p><a href="%s">%s</a></p> 384 <p>Sincerely,</p> 385 <p>Your friendly neighborhood 4CAT admin</p> 386 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 387 html_parser = html2text.HTML2Text() 388 message.attach(MIMEText(html_parser.handle(mail), "plain")) 389 message.attach(MIMEText(mail, "html")) 390 try: 391 send_email([owner], message, self.config) 392 except (SMTPException, ConnectionRefusedError, socket.timeout): 393 self.log.error("Error sending email to %s" % owner) 394 395 def remove_files(self): 396 """ 397 Clean up result files and any staging files for processor to be attempted 398 later if desired. 399 """ 400 # Remove the results file that was created 401 if self.dataset.get_results_path().exists(): 402 self.dataset.get_results_path().unlink() 403 if self.dataset.get_results_folder_path().exists(): 404 shutil.rmtree(self.dataset.get_results_folder_path()) 405 406 # Remove any staging areas with temporary data 407 self.dataset.remove_staging_areas() 408 409 def abort(self): 410 """ 411 Abort dataset creation and clean up so it may be attempted again later 412 """ 413 414 # delete annotations that have been generated as part of this processor 415 self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True) 416 # remove any result files that have been created so far 417 self.remove_files() 418 419 # we release instead of finish, since interrupting is just that - the 420 # job should resume at a later point. Delay resuming by 10 seconds to 421 # give 4CAT the time to do whatever it wants (though usually this isn't 422 # needed since restarting also stops the spawning of new workers) 423 if self.interrupted == self.INTERRUPT_RETRY: 424 # retry later - wait at least 10 seconds to give the backend time to shut down 425 self.job.release(delay=10) 426 elif self.interrupted == self.INTERRUPT_CANCEL: 427 # cancel job 428 self.job.finish() 429 430 def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs): 431 """ 432 Request an iterable of URLs and return results 433 434 This method takes an iterable yielding URLs and yields the result for 435 a GET request for that URL in return. This is done through the worker 436 manager's DelegatedRequestHandler, which can run multiple requests in 437 parallel and divide them over the proxies configured in 4CAT (if any). 438 Proxy cooloff and queueing is shared with other processors, so that a 439 processor will never accidentally request from the same site as another 440 processor, potentially triggering rate limits. 441 442 :param urls: Something that can be iterated over and yields URLs 443 :param kwargs: Other keyword arguments are passed on to `add_urls` 444 and eventually to `requests.get()`. 445 :param bool preserve_order: Return items in the original order. Use 446 `False` to potentially speed up processing, if order is not important. 447 :return: A generator yielding request results, i.e. tuples of a 448 URL and a `requests` response objects 449 """ 450 queue_name = self._proxy_queue_name() 451 delegator = self.manager.proxy_delegator 452 453 delegator.refresh_settings(self.config) 454 455 # 50 is an arbitrary batch size - but we top up every 0.05s, so 456 # that should be sufficient 457 batch_size = 50 458 459 # we need an iterable, so we can use next() and StopIteration 460 urls = iter(urls) 461 462 have_urls = True 463 while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls: 464 if queue_length < batch_size and have_urls: 465 batch = [] 466 while len(batch) < (batch_size - queue_length): 467 try: 468 batch.append(next(urls)) 469 except StopIteration: 470 have_urls = False 471 break 472 473 delegator.add_urls(batch, queue_name, **kwargs) 474 475 time.sleep(0.05) # arbitrary... 476 for url, result in delegator.get_results(queue_name, preserve_order=preserve_order): 477 # result may also be a FailedProxiedRequest! 478 # up to the processor to decide how to deal with it 479 yield url, result 480 481 def push_proxied_request(self, url, position=-1, **kwargs): 482 """ 483 Add a single URL to the proxied requests queue 484 485 :param str url: URL to add 486 :param position: Position to add to queue; can be used to add priority 487 requests, adds to end of queue by default 488 :param kwargs: 489 """ 490 self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs) 491 492 def flush_proxied_requests(self): 493 """ 494 Get rid of remaining proxied requests 495 496 Can be used if enough results are available and any remaining ones need 497 to be stopped ASAP and are otherwise unneeded. 498 499 Blocking! 500 """ 501 self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name()) 502 503 def _proxy_queue_name(self): 504 """ 505 Get proxy queue name 506 507 For internal use. 508 509 :return str: 510 """ 511 return f"{self.type}-{self.dataset.key}" 512 513 def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]): 514 """ 515 A generator that iterates through files in an archive 516 517 With every iteration, the processor's 'interrupted' flag is checked, 518 and if set a ProcessorInterruptedException is raised, which by default 519 is caught and subsequently stops execution gracefully. 520 521 Files are temporarily unzipped and deleted after use. 522 523 :param Path path: Path to zip file to read 524 :param Path staging_area: Where to store the files while they're 525 being worked with. If omitted, a temporary folder is created and 526 deleted after use 527 :param bool immediately_delete: Temporary files are removed after yielded; 528 False keeps files until the staging_area is removed (usually during processor 529 cleanup) 530 :param list filename_filter: Whitelist of filenames to iterate. 531 Other files will be ignored. If empty, do not ignore anything. 532 :return: An iterator with a Path item for each file 533 """ 534 535 if not path.exists(): 536 return 537 538 if not staging_area: 539 staging_area = self.dataset.get_staging_area() 540 541 if not staging_area.exists() or not staging_area.is_dir(): 542 raise RuntimeError("Staging area %s is not a valid folder") 543 544 with zipfile.ZipFile(path, "r") as archive_file: 545 archive_contents = sorted(archive_file.namelist()) 546 547 for archived_file in archive_contents: 548 if filename_filter and archived_file not in filename_filter: 549 continue 550 551 info = archive_file.getinfo(archived_file) 552 if info.is_dir(): 553 continue 554 555 if self.interrupted: 556 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 557 558 temp_file = staging_area.joinpath(archived_file) 559 archive_file.extract(archived_file, staging_area) 560 561 yield temp_file 562 if immediately_delete: 563 temp_file.unlink() 564 565 def unpack_archive_contents(self, path, staging_area=None): 566 """ 567 Unpack all files in an archive to a staging area 568 569 With every iteration, the processor's 'interrupted' flag is checked, 570 and if set a ProcessorInterruptedException is raised, which by default 571 is caught and subsequently stops execution gracefully. 572 573 Files are unzipped to a staging area. The staging area is *not* 574 cleaned up automatically. 575 576 :param Path path: Path to zip file to read 577 :param Path staging_area: Where to store the files while they're 578 being worked with. If omitted, a temporary folder is created and 579 deleted after use 580 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 581 :return Path: A path to the staging area 582 """ 583 584 if not path.exists(): 585 return 586 587 if not staging_area: 588 staging_area = self.dataset.get_staging_area() 589 590 if not staging_area.exists() or not staging_area.is_dir(): 591 raise RuntimeError("Staging area %s is not a valid folder") 592 593 paths = [] 594 with zipfile.ZipFile(path, "r") as archive_file: 595 archive_contents = sorted(archive_file.namelist()) 596 597 for archived_file in archive_contents: 598 if self.interrupted: 599 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 600 601 file_name = archived_file.split("/")[-1] 602 temp_file = staging_area.joinpath(file_name) 603 archive_file.extract(archived_file, staging_area) 604 paths.append(temp_file) 605 606 return staging_area 607 608 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 609 """ 610 Extract a file from an archive by name 611 612 :param str filename: Name of file to extract 613 :param Path archive_path: Path to zip file to read 614 :param Path staging_area: Where to store the files while they're 615 being worked with. If omitted, a temporary folder is created 616 :return Path: A path to the extracted file 617 """ 618 if not archive_path.exists(): 619 return 620 621 if not staging_area: 622 staging_area = self.dataset.get_staging_area() 623 624 if not staging_area.exists() or not staging_area.is_dir(): 625 raise RuntimeError("Staging area %s is not a valid folder") 626 627 with zipfile.ZipFile(archive_path, "r") as archive_file: 628 if filename not in archive_file.namelist(): 629 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 630 else: 631 archive_file.extract(filename, staging_area) 632 return staging_area.joinpath(filename) 633 634 def write_csv_items_and_finish(self, data): 635 """ 636 Write data as csv to results file and finish dataset 637 638 Determines result file path using dataset's path determination helper 639 methods. After writing results, the dataset is marked finished. Will 640 raise a ProcessorInterruptedException if the interrupted flag for this 641 processor is set while iterating. 642 643 :param data: A list or tuple of dictionaries, all with the same keys 644 """ 645 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 646 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 647 648 if not data: 649 raise ValueError("write_csv_items requires a dictionary with at least one item") 650 651 self.dataset.update_status("Writing results file") 652 writer = False 653 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 654 for row in data: 655 if self.interrupted: 656 raise ProcessorInterruptedException("Interrupted while writing results file") 657 658 row = remove_nuls(row) 659 if not writer: 660 writer = csv.DictWriter(results, fieldnames=row.keys()) 661 writer.writeheader() 662 663 writer.writerow(row) 664 665 self.dataset.update_status("Finished") 666 self.dataset.finish(len(data)) 667 668 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 669 """ 670 Archive a bunch of files into a zip archive and finish processing 671 672 :param list|Path files: If a list, all files will be added to the 673 archive and deleted afterwards. If a folder, all files in the folder 674 will be added and the folder will be deleted afterwards. 675 :param int num_items: Items in the dataset. If None, the amount of 676 files added to the archive will be used. 677 :param int compression: Type of compression to use. By default, files 678 are not compressed, to speed up unarchiving. 679 :param bool finish: Finish the dataset/job afterwards or not? 680 """ 681 is_folder = False 682 if issubclass(type(files), PurePath): 683 is_folder = files 684 if not files.exists() or not files.is_dir(): 685 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 686 687 files = files.glob("*") 688 689 # create zip of archive and delete temporary files and folder 690 self.dataset.update_status("Compressing results into archive") 691 done = 0 692 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 693 for output_path in files: 694 zip.write(output_path, output_path.name) 695 output_path.unlink() 696 done += 1 697 698 # delete temporary folder 699 if is_folder: 700 shutil.rmtree(is_folder) 701 702 self.dataset.update_status("Finished") 703 if num_items is None: 704 num_items = done 705 706 if finish: 707 self.dataset.finish(num_items) 708 709 def create_standalone(self): 710 """ 711 Copy this dataset and make that copy standalone 712 713 This has the benefit of allowing for all analyses that can be run on 714 full datasets on the new, filtered copy as well. 715 716 :return DataSet: The new standalone dataset 717 """ 718 top_parent = self.source_dataset 719 720 finished = self.dataset.check_dataset_finished() 721 if finished == 'empty': 722 # No data to process, so we can't create a standalone dataset 723 return 724 elif finished is None: 725 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 726 pass 727 728 standalone = self.dataset.copy(shallow=False) 729 standalone.body_match = "(Filtered) " + top_parent.query 730 standalone.datasource = top_parent.parameters.get("datasource", "custom") 731 732 try: 733 standalone.board = top_parent.board 734 except AttributeError: 735 standalone.board = self.type 736 737 standalone.type = top_parent.type 738 739 standalone.detach() 740 standalone.delete_parameter("key_parent") 741 742 self.dataset.copied_to = standalone.key 743 744 # we don't need this file anymore - it has been copied to the new 745 # standalone dataset, and this one is not accessible via the interface 746 # except as a link to the copied standalone dataset 747 os.unlink(self.dataset.get_results_path()) 748 749 # Copy the log 750 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 751 752 return standalone 753 754 def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int: 755 """ 756 Saves annotations made by this processor on the basis of another dataset. 757 Also adds some data regarding this processor: set `author` and `label` to processor name, 758 and add parameters to `metadata` (unless explicitly indicated). 759 760 :param annotations: List of dictionaries with annotation items. Must have `item_id` and `value`. 761 E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] 762 :param source_dataset: The dataset that these annotations will be saved on. If None, will use the 763 top parent. 764 :param bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations 765 are still shown in `iterate_items()`). 766 767 :returns int: How many annotations were saved. 768 769 """ 770 771 if not annotations: 772 return 0 773 774 # Default to parent dataset 775 if not source_dataset: 776 source_dataset = self.source_dataset.top_parent() 777 778 # Check if this dataset already has annotation fields, and if so, store some values to use per annotation. 779 annotation_fields = source_dataset.annotation_fields 780 781 # Keep track of what fields we've already seen, so we don't need to hash every time. 782 seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id 783 for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items} 784 785 # Loop through all annotations. This may be batched. 786 for annotation in annotations: 787 788 # Keep track of what dataset generated this annotation 789 annotation["from_dataset"] = self.dataset.key 790 # Set the author to this processor's name 791 if not annotation.get("author"): 792 annotation["author"] = self.name 793 if not annotation.get("author_original"): 794 annotation["author_original"] = self.name 795 annotation["by_processor"] = True 796 797 # Only use a default label if no custom one is given 798 if not annotation.get("label"): 799 annotation["label"] = self.name 800 801 # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet. 802 # We need to do this within this loop because this function may be called in batches and with different 803 # annotation types. 804 if (annotation["from_dataset"], annotation["label"]) not in seen_fields: 805 # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key. 806 # This should create unique fields, even if there's multiple annotation types for one processor. 807 field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"]) 808 seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id 809 annotation_fields[field_id] = { 810 "label": annotation["label"], 811 "type": annotation["type"] if annotation.get("type") else "text", 812 "from_dataset": annotation["from_dataset"], 813 "hide_in_explorer": hide_in_explorer 814 } 815 else: 816 # Else just get the field ID 817 field_id = seen_fields[(annotation["from_dataset"], annotation["label"])] 818 819 # Add field ID to the annotation 820 annotation["field_id"] = field_id 821 822 annotations_saved = source_dataset.save_annotations(annotations) 823 source_dataset.save_annotation_fields(annotation_fields) 824 825 return annotations_saved 826 827 @classmethod 828 def map_item_method_available(cls, dataset): 829 """ 830 Check if this processor can use map_item 831 832 Checks if map_item method exists and is compatible with dataset. If 833 dataset has a different extension than the default for this processor, 834 or if the dataset has no extension, this means we cannot be sure the 835 data is in the right format to be mapped, so `False` is returned in 836 that case even if a map_item() method is available. 837 838 :param BasicProcessor processor: The BasicProcessor subclass object 839 with which to use map_item 840 :param DataSet dataset: The DataSet object with which to 841 use map_item 842 """ 843 # only run item mapper if extension of processor == extension of 844 # data file, for the scenario where a csv file was uploaded and 845 # converted to an ndjson-based data source, for example 846 # todo: this is kind of ugly, and a better fix may be possible 847 dataset_extension = dataset.get_extension() 848 if not dataset_extension: 849 # DataSet results file does not exist or has no extension, use expected extension 850 if hasattr(dataset, "extension"): 851 dataset_extension = dataset.extension 852 else: 853 # No known DataSet extension; cannot determine if map_item method compatible 854 return False 855 856 return hasattr(cls, "map_item") and cls.extension == dataset_extension 857 858 @classmethod 859 def get_mapped_item(cls, item): 860 """ 861 Get the mapped item using a processors map_item method. 862 863 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 864 """ 865 try: 866 mapped_item = cls.map_item(item) 867 except (KeyError, IndexError) as e: 868 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 869 870 if not mapped_item: 871 raise MapItemException("Unable to map item!") 872 873 return mapped_item 874 875 @classmethod 876 def is_filter(cls): 877 """ 878 Is this processor a filter? 879 880 Filters do not produce their own dataset but replace the source_dataset dataset 881 instead. 882 883 :todo: Make this a bit more robust than sniffing the processor category 884 :return bool: 885 """ 886 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower() 887 888 @classmethod 889 def get_options(cls, parent_dataset=None, config=None): 890 """ 891 Get processor options 892 893 This method by default returns the class's "options" attribute, or an 894 empty dictionary. It can be redefined by processors that need more 895 fine-grained options, e.g. in cases where the availability of options 896 is partially determined by the parent dataset's parameters. 897 898 :param config: 899 :param DataSet parent_dataset: An object representing the dataset that 900 the processor would be run on 901 """ 902 903 return cls.options if hasattr(cls, "options") else {} 904 905 @classmethod 906 def get_status(cls): 907 """ 908 Get processor status 909 910 :return list: Statuses of this processor 911 """ 912 return cls.status if hasattr(cls, "status") else None 913 914 @classmethod 915 def is_top_dataset(cls): 916 """ 917 Confirm this is *not* a top dataset, but a processor. 918 919 Used for processor compatibility checks. 920 921 :return bool: Always `False`, because this is a processor. 922 """ 923 return False 924 925 @classmethod 926 def is_from_collector(cls): 927 """ 928 Check if this processor is one that collects data, i.e. a search or 929 import worker. 930 931 :return bool: 932 """ 933 return cls.type.endswith("-search") or cls.type.endswith("-import") 934 935 @classmethod 936 def get_extension(self, parent_dataset=None): 937 """ 938 Return the extension of the processor's dataset 939 940 Used for processor compatibility checks. 941 942 :param DataSet parent_dataset: An object representing the dataset that 943 the processor would be run on 944 :return str|None: Dataset extension (without leading `.`) or `None`. 945 """ 946 if self.is_filter(): 947 if parent_dataset is not None: 948 # Filters should use the same extension as the parent dataset 949 return parent_dataset.get_extension() 950 else: 951 # No dataset provided, unable to determine extension of parent dataset 952 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 953 return None 954 elif self.extension: 955 # Use explicitly defined extension in class (Processor class defaults to "csv") 956 return self.extension 957 else: 958 # A non filter processor updated the base Processor extension to None/False? 959 return None 960 961 @classmethod 962 def is_rankable(cls, multiple_items=True): 963 """ 964 Used for processor compatibility 965 966 :param bool multiple_items: Consider datasets with multiple items per 967 item (e.g. word_1, word_2, etc)? Included for compatibility 968 """ 969 return False 970 971 @classmethod 972 def exclude_followup_processors(cls, processor_type=None): 973 """ 974 Used for processor compatibility 975 976 To be defined by the child processor if it should exclude certain follow-up processors. 977 e.g.: 978 979 def exclude_followup_processors(cls, processor_type): 980 if processor_type in ["undesirable-followup-processor"]: 981 return True 982 return False 983 984 :param str processor_type: Processor type to exclude 985 :return bool: True if processor should be excluded, False otherwise 986 """ 987 return False 988 989 @abc.abstractmethod 990 def process(self): 991 """ 992 Process data 993 994 To be defined by the child processor. 995 """ 996 pass 997 998 @staticmethod 999 def is_4cat_processor(): 1000 """ 1001 Is this a 4CAT processor? 1002 1003 This is used to determine whether a class is a 4CAT 1004 processor. 1005 1006 :return: True 1007 """ 1008 return True
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, config=None):) -> bool
class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"
93 def work(self): 94 """ 95 Process a dataset 96 97 Loads dataset metadata, sets up the scaffolding for performing some kind 98 of processing on that dataset, and then processes it. Afterwards, clean 99 up. 100 """ 101 try: 102 # a dataset can have multiple owners, but the creator is the user 103 # that actually queued the processor, so their config is relevant 104 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 105 self.owner = self.dataset.creator 106 except DataSetException: 107 # query has been deleted in the meantime. finish without error, 108 # as deleting it will have been a conscious choice by a user 109 self.job.finish() 110 return 111 112 # set up config reader wrapping the worker's config manager, which is 113 # in turn the one passed to it by the WorkerManager, which is the one 114 # originally loaded in bootstrap 115 self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner)) 116 117 if self.dataset.data.get("key_parent", None): 118 # search workers never have parents (for now), so we don't need to 119 # find out what the source_dataset dataset is if it's a search worker 120 try: 121 self.source_dataset = self.dataset.get_parent() 122 123 # for presets, transparently use the *top* dataset as a source_dataset 124 # since that is where any underlying processors should get 125 # their data from. However, this should only be done as long as the 126 # preset is not finished yet, because after that there may be processors 127 # that run on the final preset result 128 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 129 self.is_running_in_preset = True 130 self.source_dataset = self.source_dataset.get_parent() 131 if self.source_dataset is None: 132 # this means there is no dataset that is *not* a preset anywhere 133 # above this dataset. This should never occur, but if it does, we 134 # cannot continue 135 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 136 (self.type, self.dataset.key)) 137 self.job.finish() 138 return 139 140 except DataSetException: 141 # we need to know what the source_dataset dataset was to properly handle the 142 # analysis 143 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 144 self.type, self.dataset.key)) 145 self.job.finish() 146 return 147 148 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 149 # not finished yet - retry after a while 150 # exception for presets, since these *should* be unfinished 151 # until underlying processors are done 152 self.job.release(delay=30) 153 return 154 155 self.source_file = self.source_dataset.get_results_path() 156 if not self.source_file.exists(): 157 self.dataset.update_status("Finished, no input data found.") 158 159 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 160 161 processor_name = self.title if hasattr(self, "title") else self.type 162 self.dataset.clear_log() 163 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 164 165 # start log file 166 self.dataset.update_status("Processing data") 167 self.dataset.update_version(get_software_commit(self)) 168 169 # get parameters 170 # if possible, fill defaults where parameters are not provided 171 given_parameters = self.dataset.parameters.copy() 172 all_parameters = self.get_options(self.dataset, config=self.config) 173 self.parameters = { 174 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 175 for param in [*all_parameters.keys(), *given_parameters.keys()] 176 } 177 178 # now the parameters have been loaded into memory, clear any sensitive 179 # ones. This has a side-effect that a processor may not run again 180 # without starting from scratch, but this is the price of progress 181 options = self.get_options(self.dataset.get_parent(), config=self.config) 182 for option, option_settings in options.items(): 183 if option_settings.get("sensitive"): 184 self.dataset.delete_parameter(option) 185 186 if self.interrupted: 187 self.dataset.log("Processing interrupted, trying again later") 188 return self.abort() 189 190 if not self.dataset.is_finished(): 191 try: 192 self.process() 193 self.after_process() 194 except WorkerInterruptedException as e: 195 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 196 self.abort() 197 except Exception as e: 198 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 199 stack = traceback.extract_tb(e.__traceback__) 200 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]] 201 location = "->".join(frames) 202 203 # Not all datasets have source_dataset keys 204 if len(self.dataset.get_genealogy()) > 1: 205 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 206 else: 207 parent_key = "" 208 209 print("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 210 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e))) 211 # remove any result files that have been created so far 212 self.remove_files() 213 214 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 215 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack) 216 else: 217 # dataset already finished, job shouldn't be open anymore 218 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % ( 219 self.job.data["jobtype"], self.job.data["remote_id"])) 220 self.job.finish()
Process a dataset
Loads dataset metadata, sets up the scaffolding for performing some kind of processing on that dataset, and then processes it. Afterwards, clean up.
222 def after_process(self): 223 """ 224 Run after processing the dataset 225 226 This method cleans up temporary files, and if needed, handles logistics 227 concerning the result file, e.g. running a pre-defined processor on the 228 result, copying it to another dataset, and so on. 229 """ 230 if self.dataset.data["num_rows"] > 0: 231 self.dataset.update_status("Dataset completed.") 232 233 if not self.dataset.is_finished(): 234 self.dataset.finish() 235 236 self.dataset.remove_staging_areas() 237 238 # see if we have anything else lined up to run next 239 for next in self.parameters.get("next", []): 240 can_run_next = True 241 next_parameters = next.get("parameters", {}) 242 next_type = next.get("type", "") 243 try: 244 available_processors = self.dataset.get_available_processors(config=self.config) 245 except ValueError: 246 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 247 break 248 249 # see if we have anything else lined up to run next 250 for next in self.parameters.get("next", []): 251 can_run_next = True 252 next_parameters = next.get("parameters", {}) 253 next_type = next.get("type", "") 254 try: 255 available_processors = self.dataset.get_available_processors(config=self.config) 256 except ValueError: 257 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 258 break 259 260 261 # run it only if the post-processor is actually available for this query 262 if self.dataset.data["num_rows"] <= 0: 263 can_run_next = False 264 self.log.info( 265 "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % ( 266 next_type, self.dataset.key)) 267 268 elif next_type in available_processors: 269 next_analysis = DataSet( 270 parameters=next_parameters, 271 type=next_type, 272 db=self.db, 273 parent=self.dataset.key, 274 extension=available_processors[next_type].extension, 275 is_private=self.dataset.is_private, 276 owner=self.dataset.creator, 277 modules=self.modules 278 ) 279 # copy ownership from parent dataset 280 next_analysis.copy_ownership_from(self.dataset) 281 # add to queue 282 self.queue.add_job(next_type, remote_id=next_analysis.key) 283 else: 284 can_run_next = False 285 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % ( 286 self.dataset.key, self.type, next_type)) 287 288 if not can_run_next: 289 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 290 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 291 if "attach_to" in self.parameters: 292 # Probably should not happen, but for some reason a mid processor has been designated as the processor 293 # the parent should attach to 294 pass 295 else: 296 # Check for "attach_to" parameter in descendents 297 while True: 298 if "attach_to" in next_parameters: 299 self.parameters["attach_to"] = next_parameters["attach_to"] 300 break 301 else: 302 if "next" in next_parameters: 303 next_parameters = next_parameters["next"][0]["parameters"] 304 else: 305 # No more descendents 306 # Should not happen; we cannot find the source dataset 307 self.log.warning( 308 "Cannot find preset's source dataset for dataset %s" % self.dataset.key) 309 break 310 311 # see if we need to register the result somewhere 312 if "copy_to" in self.parameters: 313 # copy the results to an arbitrary place that was passed 314 if self.dataset.get_results_path().exists(): 315 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 316 else: 317 # if copy_to was passed, that means it's important that this 318 # file exists somewhere, so we create it as an empty file 319 with open(self.parameters["copy_to"], "w") as empty_file: 320 empty_file.write("") 321 322 # see if this query chain is to be attached to another query 323 # if so, the full genealogy of this query (minus the original dataset) 324 # is attached to the given query - this is mostly useful for presets, 325 # where a chain of processors can be marked as 'underlying' a preset 326 if "attach_to" in self.parameters: 327 try: 328 # copy metadata and results to the surrogate 329 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules) 330 331 if self.dataset.get_results_path().exists(): 332 # Update the surrogate's results file suffix to match this dataset's suffix 333 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix( 334 self.dataset.get_results_path().suffix) 335 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 336 337 try: 338 surrogate.finish(self.dataset.data["num_rows"]) 339 except RuntimeError: 340 # already finished, could happen (though it shouldn't) 341 pass 342 343 surrogate.update_status(self.dataset.get_status()) 344 345 except ValueError: 346 # dataset with key to attach to doesn't exist... 347 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % ( 348 self.dataset.key, self.parameters["attach_to"])) 349 350 self.job.finish() 351 352 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 353 owner = self.dataset.get_parameters().get("email-complete", False) 354 # Check that username is email address 355 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 356 from email.mime.multipart import MIMEMultipart 357 from email.mime.text import MIMEText 358 from smtplib import SMTPException 359 import socket 360 import html2text 361 362 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 363 owner = self.dataset.get_parameters().get("email-complete", False) 364 # Check that username is email address 365 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 366 from email.mime.multipart import MIMEMultipart 367 from email.mime.text import MIMEText 368 from smtplib import SMTPException 369 import socket 370 import html2text 371 372 self.log.debug("Sending email to %s" % owner) 373 dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key 374 sender = self.config.get('mail.noreply') 375 message = MIMEMultipart("alternative") 376 message["From"] = sender 377 message["To"] = owner 378 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 379 mail = """ 380 <p>Hello %s,</p> 381 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 382 <p>You can view your dataset via the following link:</p> 383 <p><a href="%s">%s</a></p> 384 <p>Sincerely,</p> 385 <p>Your friendly neighborhood 4CAT admin</p> 386 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 387 html_parser = html2text.HTML2Text() 388 message.attach(MIMEText(html_parser.handle(mail), "plain")) 389 message.attach(MIMEText(mail, "html")) 390 try: 391 send_email([owner], message, self.config) 392 except (SMTPException, ConnectionRefusedError, socket.timeout): 393 self.log.error("Error sending email to %s" % owner)
Run after processing the dataset
This method cleans up temporary files, and if needed, handles logistics concerning the result file, e.g. running a pre-defined processor on the result, copying it to another dataset, and so on.
395 def remove_files(self): 396 """ 397 Clean up result files and any staging files for processor to be attempted 398 later if desired. 399 """ 400 # Remove the results file that was created 401 if self.dataset.get_results_path().exists(): 402 self.dataset.get_results_path().unlink() 403 if self.dataset.get_results_folder_path().exists(): 404 shutil.rmtree(self.dataset.get_results_folder_path()) 405 406 # Remove any staging areas with temporary data 407 self.dataset.remove_staging_areas()
Clean up result files and any staging files for processor to be attempted later if desired.
409 def abort(self): 410 """ 411 Abort dataset creation and clean up so it may be attempted again later 412 """ 413 414 # delete annotations that have been generated as part of this processor 415 self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True) 416 # remove any result files that have been created so far 417 self.remove_files() 418 419 # we release instead of finish, since interrupting is just that - the 420 # job should resume at a later point. Delay resuming by 10 seconds to 421 # give 4CAT the time to do whatever it wants (though usually this isn't 422 # needed since restarting also stops the spawning of new workers) 423 if self.interrupted == self.INTERRUPT_RETRY: 424 # retry later - wait at least 10 seconds to give the backend time to shut down 425 self.job.release(delay=10) 426 elif self.interrupted == self.INTERRUPT_CANCEL: 427 # cancel job 428 self.job.finish()
Abort dataset creation and clean up so it may be attempted again later
430 def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs): 431 """ 432 Request an iterable of URLs and return results 433 434 This method takes an iterable yielding URLs and yields the result for 435 a GET request for that URL in return. This is done through the worker 436 manager's DelegatedRequestHandler, which can run multiple requests in 437 parallel and divide them over the proxies configured in 4CAT (if any). 438 Proxy cooloff and queueing is shared with other processors, so that a 439 processor will never accidentally request from the same site as another 440 processor, potentially triggering rate limits. 441 442 :param urls: Something that can be iterated over and yields URLs 443 :param kwargs: Other keyword arguments are passed on to `add_urls` 444 and eventually to `requests.get()`. 445 :param bool preserve_order: Return items in the original order. Use 446 `False` to potentially speed up processing, if order is not important. 447 :return: A generator yielding request results, i.e. tuples of a 448 URL and a `requests` response objects 449 """ 450 queue_name = self._proxy_queue_name() 451 delegator = self.manager.proxy_delegator 452 453 delegator.refresh_settings(self.config) 454 455 # 50 is an arbitrary batch size - but we top up every 0.05s, so 456 # that should be sufficient 457 batch_size = 50 458 459 # we need an iterable, so we can use next() and StopIteration 460 urls = iter(urls) 461 462 have_urls = True 463 while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls: 464 if queue_length < batch_size and have_urls: 465 batch = [] 466 while len(batch) < (batch_size - queue_length): 467 try: 468 batch.append(next(urls)) 469 except StopIteration: 470 have_urls = False 471 break 472 473 delegator.add_urls(batch, queue_name, **kwargs) 474 475 time.sleep(0.05) # arbitrary... 476 for url, result in delegator.get_results(queue_name, preserve_order=preserve_order): 477 # result may also be a FailedProxiedRequest! 478 # up to the processor to decide how to deal with it 479 yield url, result
Request an iterable of URLs and return results
This method takes an iterable yielding URLs and yields the result for a GET request for that URL in return. This is done through the worker manager's DelegatedRequestHandler, which can run multiple requests in parallel and divide them over the proxies configured in 4CAT (if any). Proxy cooloff and queueing is shared with other processors, so that a processor will never accidentally request from the same site as another processor, potentially triggering rate limits.
Parameters
- urls: Something that can be iterated over and yields URLs
- kwargs: Other keyword arguments are passed on to
add_urls
and eventually torequests.get()
. - bool preserve_order: Return items in the original order. Use
False
to potentially speed up processing, if order is not important.
Returns
A generator yielding request results, i.e. tuples of a URL and a
requests
response objects
481 def push_proxied_request(self, url, position=-1, **kwargs): 482 """ 483 Add a single URL to the proxied requests queue 484 485 :param str url: URL to add 486 :param position: Position to add to queue; can be used to add priority 487 requests, adds to end of queue by default 488 :param kwargs: 489 """ 490 self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)
Add a single URL to the proxied requests queue
Parameters
- str url: URL to add
- position: Position to add to queue; can be used to add priority requests, adds to end of queue by default
- kwargs:
492 def flush_proxied_requests(self): 493 """ 494 Get rid of remaining proxied requests 495 496 Can be used if enough results are available and any remaining ones need 497 to be stopped ASAP and are otherwise unneeded. 498 499 Blocking! 500 """ 501 self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())
Get rid of remaining proxied requests
Can be used if enough results are available and any remaining ones need to be stopped ASAP and are otherwise unneeded.
Blocking!
513 def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]): 514 """ 515 A generator that iterates through files in an archive 516 517 With every iteration, the processor's 'interrupted' flag is checked, 518 and if set a ProcessorInterruptedException is raised, which by default 519 is caught and subsequently stops execution gracefully. 520 521 Files are temporarily unzipped and deleted after use. 522 523 :param Path path: Path to zip file to read 524 :param Path staging_area: Where to store the files while they're 525 being worked with. If omitted, a temporary folder is created and 526 deleted after use 527 :param bool immediately_delete: Temporary files are removed after yielded; 528 False keeps files until the staging_area is removed (usually during processor 529 cleanup) 530 :param list filename_filter: Whitelist of filenames to iterate. 531 Other files will be ignored. If empty, do not ignore anything. 532 :return: An iterator with a Path item for each file 533 """ 534 535 if not path.exists(): 536 return 537 538 if not staging_area: 539 staging_area = self.dataset.get_staging_area() 540 541 if not staging_area.exists() or not staging_area.is_dir(): 542 raise RuntimeError("Staging area %s is not a valid folder") 543 544 with zipfile.ZipFile(path, "r") as archive_file: 545 archive_contents = sorted(archive_file.namelist()) 546 547 for archived_file in archive_contents: 548 if filename_filter and archived_file not in filename_filter: 549 continue 550 551 info = archive_file.getinfo(archived_file) 552 if info.is_dir(): 553 continue 554 555 if self.interrupted: 556 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 557 558 temp_file = staging_area.joinpath(archived_file) 559 archive_file.extract(archived_file, staging_area) 560 561 yield temp_file 562 if immediately_delete: 563 temp_file.unlink()
A generator that iterates through files in an archive
With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.
Files are temporarily unzipped and deleted after use.
Parameters
- Path path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
- bool immediately_delete: Temporary files are removed after yielded; False keeps files until the staging_area is removed (usually during processor cleanup)
- list filename_filter: Whitelist of filenames to iterate. Other files will be ignored. If empty, do not ignore anything.
Returns
An iterator with a Path item for each file
565 def unpack_archive_contents(self, path, staging_area=None): 566 """ 567 Unpack all files in an archive to a staging area 568 569 With every iteration, the processor's 'interrupted' flag is checked, 570 and if set a ProcessorInterruptedException is raised, which by default 571 is caught and subsequently stops execution gracefully. 572 573 Files are unzipped to a staging area. The staging area is *not* 574 cleaned up automatically. 575 576 :param Path path: Path to zip file to read 577 :param Path staging_area: Where to store the files while they're 578 being worked with. If omitted, a temporary folder is created and 579 deleted after use 580 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 581 :return Path: A path to the staging area 582 """ 583 584 if not path.exists(): 585 return 586 587 if not staging_area: 588 staging_area = self.dataset.get_staging_area() 589 590 if not staging_area.exists() or not staging_area.is_dir(): 591 raise RuntimeError("Staging area %s is not a valid folder") 592 593 paths = [] 594 with zipfile.ZipFile(path, "r") as archive_file: 595 archive_contents = sorted(archive_file.namelist()) 596 597 for archived_file in archive_contents: 598 if self.interrupted: 599 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 600 601 file_name = archived_file.split("/")[-1] 602 temp_file = staging_area.joinpath(file_name) 603 archive_file.extract(archived_file, staging_area) 604 paths.append(temp_file) 605 606 return staging_area
Unpack all files in an archive to a staging area
With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.
Files are unzipped to a staging area. The staging area is not cleaned up automatically.
Parameters
- Path path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
- int max_number_files: Maximum number of files to unpack. If None, all files unpacked
Returns
A path to the staging area
608 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 609 """ 610 Extract a file from an archive by name 611 612 :param str filename: Name of file to extract 613 :param Path archive_path: Path to zip file to read 614 :param Path staging_area: Where to store the files while they're 615 being worked with. If omitted, a temporary folder is created 616 :return Path: A path to the extracted file 617 """ 618 if not archive_path.exists(): 619 return 620 621 if not staging_area: 622 staging_area = self.dataset.get_staging_area() 623 624 if not staging_area.exists() or not staging_area.is_dir(): 625 raise RuntimeError("Staging area %s is not a valid folder") 626 627 with zipfile.ZipFile(archive_path, "r") as archive_file: 628 if filename not in archive_file.namelist(): 629 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 630 else: 631 archive_file.extract(filename, staging_area) 632 return staging_area.joinpath(filename)
Extract a file from an archive by name
Parameters
- str filename: Name of file to extract
- Path archive_path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created
Returns
A path to the extracted file
634 def write_csv_items_and_finish(self, data): 635 """ 636 Write data as csv to results file and finish dataset 637 638 Determines result file path using dataset's path determination helper 639 methods. After writing results, the dataset is marked finished. Will 640 raise a ProcessorInterruptedException if the interrupted flag for this 641 processor is set while iterating. 642 643 :param data: A list or tuple of dictionaries, all with the same keys 644 """ 645 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 646 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 647 648 if not data: 649 raise ValueError("write_csv_items requires a dictionary with at least one item") 650 651 self.dataset.update_status("Writing results file") 652 writer = False 653 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 654 for row in data: 655 if self.interrupted: 656 raise ProcessorInterruptedException("Interrupted while writing results file") 657 658 row = remove_nuls(row) 659 if not writer: 660 writer = csv.DictWriter(results, fieldnames=row.keys()) 661 writer.writeheader() 662 663 writer.writerow(row) 664 665 self.dataset.update_status("Finished") 666 self.dataset.finish(len(data))
Write data as csv to results file and finish dataset
Determines result file path using dataset's path determination helper methods. After writing results, the dataset is marked finished. Will raise a ProcessorInterruptedException if the interrupted flag for this processor is set while iterating.
Parameters
- data: A list or tuple of dictionaries, all with the same keys
668 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 669 """ 670 Archive a bunch of files into a zip archive and finish processing 671 672 :param list|Path files: If a list, all files will be added to the 673 archive and deleted afterwards. If a folder, all files in the folder 674 will be added and the folder will be deleted afterwards. 675 :param int num_items: Items in the dataset. If None, the amount of 676 files added to the archive will be used. 677 :param int compression: Type of compression to use. By default, files 678 are not compressed, to speed up unarchiving. 679 :param bool finish: Finish the dataset/job afterwards or not? 680 """ 681 is_folder = False 682 if issubclass(type(files), PurePath): 683 is_folder = files 684 if not files.exists() or not files.is_dir(): 685 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 686 687 files = files.glob("*") 688 689 # create zip of archive and delete temporary files and folder 690 self.dataset.update_status("Compressing results into archive") 691 done = 0 692 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 693 for output_path in files: 694 zip.write(output_path, output_path.name) 695 output_path.unlink() 696 done += 1 697 698 # delete temporary folder 699 if is_folder: 700 shutil.rmtree(is_folder) 701 702 self.dataset.update_status("Finished") 703 if num_items is None: 704 num_items = done 705 706 if finish: 707 self.dataset.finish(num_items)
Archive a bunch of files into a zip archive and finish processing
Parameters
- list|Path files: If a list, all files will be added to the archive and deleted afterwards. If a folder, all files in the folder will be added and the folder will be deleted afterwards.
- int num_items: Items in the dataset. If None, the amount of files added to the archive will be used.
- int compression: Type of compression to use. By default, files are not compressed, to speed up unarchiving.
- bool finish: Finish the dataset/job afterwards or not?
709 def create_standalone(self): 710 """ 711 Copy this dataset and make that copy standalone 712 713 This has the benefit of allowing for all analyses that can be run on 714 full datasets on the new, filtered copy as well. 715 716 :return DataSet: The new standalone dataset 717 """ 718 top_parent = self.source_dataset 719 720 finished = self.dataset.check_dataset_finished() 721 if finished == 'empty': 722 # No data to process, so we can't create a standalone dataset 723 return 724 elif finished is None: 725 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 726 pass 727 728 standalone = self.dataset.copy(shallow=False) 729 standalone.body_match = "(Filtered) " + top_parent.query 730 standalone.datasource = top_parent.parameters.get("datasource", "custom") 731 732 try: 733 standalone.board = top_parent.board 734 except AttributeError: 735 standalone.board = self.type 736 737 standalone.type = top_parent.type 738 739 standalone.detach() 740 standalone.delete_parameter("key_parent") 741 742 self.dataset.copied_to = standalone.key 743 744 # we don't need this file anymore - it has been copied to the new 745 # standalone dataset, and this one is not accessible via the interface 746 # except as a link to the copied standalone dataset 747 os.unlink(self.dataset.get_results_path()) 748 749 # Copy the log 750 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 751 752 return standalone
Copy this dataset and make that copy standalone
This has the benefit of allowing for all analyses that can be run on full datasets on the new, filtered copy as well.
Returns
The new standalone dataset
754 def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int: 755 """ 756 Saves annotations made by this processor on the basis of another dataset. 757 Also adds some data regarding this processor: set `author` and `label` to processor name, 758 and add parameters to `metadata` (unless explicitly indicated). 759 760 :param annotations: List of dictionaries with annotation items. Must have `item_id` and `value`. 761 E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] 762 :param source_dataset: The dataset that these annotations will be saved on. If None, will use the 763 top parent. 764 :param bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations 765 are still shown in `iterate_items()`). 766 767 :returns int: How many annotations were saved. 768 769 """ 770 771 if not annotations: 772 return 0 773 774 # Default to parent dataset 775 if not source_dataset: 776 source_dataset = self.source_dataset.top_parent() 777 778 # Check if this dataset already has annotation fields, and if so, store some values to use per annotation. 779 annotation_fields = source_dataset.annotation_fields 780 781 # Keep track of what fields we've already seen, so we don't need to hash every time. 782 seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id 783 for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items} 784 785 # Loop through all annotations. This may be batched. 786 for annotation in annotations: 787 788 # Keep track of what dataset generated this annotation 789 annotation["from_dataset"] = self.dataset.key 790 # Set the author to this processor's name 791 if not annotation.get("author"): 792 annotation["author"] = self.name 793 if not annotation.get("author_original"): 794 annotation["author_original"] = self.name 795 annotation["by_processor"] = True 796 797 # Only use a default label if no custom one is given 798 if not annotation.get("label"): 799 annotation["label"] = self.name 800 801 # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet. 802 # We need to do this within this loop because this function may be called in batches and with different 803 # annotation types. 804 if (annotation["from_dataset"], annotation["label"]) not in seen_fields: 805 # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key. 806 # This should create unique fields, even if there's multiple annotation types for one processor. 807 field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"]) 808 seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id 809 annotation_fields[field_id] = { 810 "label": annotation["label"], 811 "type": annotation["type"] if annotation.get("type") else "text", 812 "from_dataset": annotation["from_dataset"], 813 "hide_in_explorer": hide_in_explorer 814 } 815 else: 816 # Else just get the field ID 817 field_id = seen_fields[(annotation["from_dataset"], annotation["label"])] 818 819 # Add field ID to the annotation 820 annotation["field_id"] = field_id 821 822 annotations_saved = source_dataset.save_annotations(annotations) 823 source_dataset.save_annotation_fields(annotation_fields) 824 825 return annotations_saved
Saves annotations made by this processor on the basis of another dataset.
Also adds some data regarding this processor: set author
and label
to processor name,
and add parameters to metadata
(unless explicitly indicated).
Parameters
- annotations: List of dictionaries with annotation items. Must have
item_id
andvalue
. E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] - source_dataset: The dataset that these annotations will be saved on. If None, will use the top parent.
- bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations
are still shown in
iterate_items()
).
:returns int: How many annotations were saved.
827 @classmethod 828 def map_item_method_available(cls, dataset): 829 """ 830 Check if this processor can use map_item 831 832 Checks if map_item method exists and is compatible with dataset. If 833 dataset has a different extension than the default for this processor, 834 or if the dataset has no extension, this means we cannot be sure the 835 data is in the right format to be mapped, so `False` is returned in 836 that case even if a map_item() method is available. 837 838 :param BasicProcessor processor: The BasicProcessor subclass object 839 with which to use map_item 840 :param DataSet dataset: The DataSet object with which to 841 use map_item 842 """ 843 # only run item mapper if extension of processor == extension of 844 # data file, for the scenario where a csv file was uploaded and 845 # converted to an ndjson-based data source, for example 846 # todo: this is kind of ugly, and a better fix may be possible 847 dataset_extension = dataset.get_extension() 848 if not dataset_extension: 849 # DataSet results file does not exist or has no extension, use expected extension 850 if hasattr(dataset, "extension"): 851 dataset_extension = dataset.extension 852 else: 853 # No known DataSet extension; cannot determine if map_item method compatible 854 return False 855 856 return hasattr(cls, "map_item") and cls.extension == dataset_extension
Check if this processor can use map_item
Checks if map_item method exists and is compatible with dataset. If
dataset has a different extension than the default for this processor,
or if the dataset has no extension, this means we cannot be sure the
data is in the right format to be mapped, so False
is returned in
that case even if a map_item() method is available.
Parameters
- BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
- DataSet dataset: The DataSet object with which to use map_item
858 @classmethod 859 def get_mapped_item(cls, item): 860 """ 861 Get the mapped item using a processors map_item method. 862 863 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 864 """ 865 try: 866 mapped_item = cls.map_item(item) 867 except (KeyError, IndexError) as e: 868 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 869 870 if not mapped_item: 871 raise MapItemException("Unable to map item!") 872 873 return mapped_item
Get the mapped item using a processors map_item method.
Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
875 @classmethod 876 def is_filter(cls): 877 """ 878 Is this processor a filter? 879 880 Filters do not produce their own dataset but replace the source_dataset dataset 881 instead. 882 883 :todo: Make this a bit more robust than sniffing the processor category 884 :return bool: 885 """ 886 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
Is this processor a filter?
Filters do not produce their own dataset but replace the source_dataset dataset instead.
:todo: Make this a bit more robust than sniffing the processor category
Returns
888 @classmethod 889 def get_options(cls, parent_dataset=None, config=None): 890 """ 891 Get processor options 892 893 This method by default returns the class's "options" attribute, or an 894 empty dictionary. It can be redefined by processors that need more 895 fine-grained options, e.g. in cases where the availability of options 896 is partially determined by the parent dataset's parameters. 897 898 :param config: 899 :param DataSet parent_dataset: An object representing the dataset that 900 the processor would be run on 901 """ 902 903 return cls.options if hasattr(cls, "options") else {}
Get processor options
This method by default returns the class's "options" attribute, or an empty dictionary. It can be redefined by processors that need more fine-grained options, e.g. in cases where the availability of options is partially determined by the parent dataset's parameters.
Parameters
- config:
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
905 @classmethod 906 def get_status(cls): 907 """ 908 Get processor status 909 910 :return list: Statuses of this processor 911 """ 912 return cls.status if hasattr(cls, "status") else None
Get processor status
Returns
Statuses of this processor
914 @classmethod 915 def is_top_dataset(cls): 916 """ 917 Confirm this is *not* a top dataset, but a processor. 918 919 Used for processor compatibility checks. 920 921 :return bool: Always `False`, because this is a processor. 922 """ 923 return False
Confirm this is not a top dataset, but a processor.
Used for processor compatibility checks.
Returns
Always
False
, because this is a processor.
925 @classmethod 926 def is_from_collector(cls): 927 """ 928 Check if this processor is one that collects data, i.e. a search or 929 import worker. 930 931 :return bool: 932 """ 933 return cls.type.endswith("-search") or cls.type.endswith("-import")
Check if this processor is one that collects data, i.e. a search or import worker.
Returns
935 @classmethod 936 def get_extension(self, parent_dataset=None): 937 """ 938 Return the extension of the processor's dataset 939 940 Used for processor compatibility checks. 941 942 :param DataSet parent_dataset: An object representing the dataset that 943 the processor would be run on 944 :return str|None: Dataset extension (without leading `.`) or `None`. 945 """ 946 if self.is_filter(): 947 if parent_dataset is not None: 948 # Filters should use the same extension as the parent dataset 949 return parent_dataset.get_extension() 950 else: 951 # No dataset provided, unable to determine extension of parent dataset 952 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 953 return None 954 elif self.extension: 955 # Use explicitly defined extension in class (Processor class defaults to "csv") 956 return self.extension 957 else: 958 # A non filter processor updated the base Processor extension to None/False? 959 return None
Return the extension of the processor's dataset
Used for processor compatibility checks.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
Returns
Dataset extension (without leading
.
) orNone
.
961 @classmethod 962 def is_rankable(cls, multiple_items=True): 963 """ 964 Used for processor compatibility 965 966 :param bool multiple_items: Consider datasets with multiple items per 967 item (e.g. word_1, word_2, etc)? Included for compatibility 968 """ 969 return False
Used for processor compatibility
Parameters
- bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)? Included for compatibility
971 @classmethod 972 def exclude_followup_processors(cls, processor_type=None): 973 """ 974 Used for processor compatibility 975 976 To be defined by the child processor if it should exclude certain follow-up processors. 977 e.g.: 978 979 def exclude_followup_processors(cls, processor_type): 980 if processor_type in ["undesirable-followup-processor"]: 981 return True 982 return False 983 984 :param str processor_type: Processor type to exclude 985 :return bool: True if processor should be excluded, False otherwise 986 """ 987 return False
Used for processor compatibility
To be defined by the child processor if it should exclude certain follow-up processors. e.g.:
def exclude_followup_processors(cls, processor_type): if processor_type in ["undesirable-followup-processor"]: return True return False
Parameters
- str processor_type: Processor type to exclude
Returns
True if processor should be excluded, False otherwise
989 @abc.abstractmethod 990 def process(self): 991 """ 992 Process data 993 994 To be defined by the child processor. 995 """ 996 pass
Process data
To be defined by the child processor.
998 @staticmethod 999 def is_4cat_processor(): 1000 """ 1001 Is this a 4CAT processor? 1002 1003 This is used to determine whether a class is a 4CAT 1004 processor. 1005 1006 :return: True 1007 """ 1008 return True
Is this a 4CAT processor?
This is used to determine whether a class is a 4CAT processor.
Returns
True