backend.lib.processor
Basic post-processor worker - should be inherited by workers to post-process results
1""" 2Basic post-processor worker - should be inherited by workers to post-process results 3""" 4import traceback 5import zipfile 6import typing 7import shutil 8import abc 9import csv 10import os 11import re 12import time 13 14from pathlib import PurePath 15 16from backend.lib.worker import BasicWorker 17from common.lib.dataset import DataSet 18from common.lib.fourcat_module import FourcatModule 19from common.lib.helpers import get_software_commit, remove_nuls, send_email, hash_to_md5 20from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException, 21 DataSetException, MapItemException, AnnotationException) 22from common.config_manager import ConfigWrapper 23from common.lib.user import User 24 25csv.field_size_limit(1024 * 1024 * 1024) 26 27 28class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta): 29 """ 30 Abstract processor class 31 32 A processor takes a finished dataset as input and processes its result in 33 some way, with another dataset set as output. The input thus is a file, and 34 the output (usually) as well. In other words, the result of a processor can 35 be used as input for another processor (though whether and when this is 36 useful is another question). 37 38 To determine whether a processor can process a given dataset, you can 39 define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class 40 method which takes a dataset as argument and returns a bool that determines 41 if this processor is considered compatible with that dataset. For example: 42 43 .. code-block:: python 44 45 @classmethod 46 def is_compatible_with(cls, module=None, config=None): 47 return module.type == "linguistic-features" 48 49 50 """ 51 52 #: Database handler to interface with the 4CAT database 53 db = None 54 55 #: Job object that requests the execution of this processor 56 job = None 57 58 #: The dataset object that the processor is *creating*. 59 dataset = None 60 61 #: Owner (username) of the dataset 62 owner = None 63 64 #: The dataset object that the processor is *processing*. 65 source_dataset = None 66 67 #: The file that is being processed 68 source_file = None 69 70 #: Processor description, which will be displayed in the web interface 71 description = "No description available" 72 73 #: Category identifier, used to group processors in the web interface 74 category = "Other" 75 76 #: Extension of the file created by the processor 77 extension = "csv" 78 79 #: 4CAT settings from the perspective of the dataset's owner 80 config = None 81 82 #: Is this processor running 'within' a preset processor? 83 is_running_in_preset = False 84 85 #: Is this processor hidden in the front-end, and only used internally/in presets? 86 is_hidden = False 87 88 #: This will be defined automatically upon loading the processor. There is 89 #: no need to override manually 90 filepath = None 91 92 #: This will be a list; files added to it are deleted after the processor 93 #: terminates, even on failure, if they still exist at that point. Add 94 #: path objects or dataset objects; for datasets, the 95 #: `remove_disposable_files()` method will be called. 96 for_cleanup = None 97 98 def work(self): 99 """ 100 Process a dataset 101 102 Loads dataset metadata, sets up the scaffolding for performing some kind 103 of processing on that dataset, and then processes it. Afterwards, clean 104 up. 105 """ 106 try: 107 # a dataset can have multiple owners, but the creator is the user 108 # that actually queued the processor, so their config is relevant 109 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 110 self.owner = self.dataset.creator 111 except DataSetException: 112 # query has been deleted in the meantime. finish without error, 113 # as deleting it will have been a conscious choice by a user 114 self.job.finish() 115 return 116 117 # set up config reader wrapping the worker's config manager, which is 118 # in turn the one passed to it by the WorkerManager, which is the one 119 # originally loaded in bootstrap 120 self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner)) 121 122 if self.dataset.data.get("key_parent", None): 123 # search workers never have parents (for now), so we don't need to 124 # find out what the source_dataset dataset is if it's a search worker 125 try: 126 self.source_dataset = self.dataset.get_parent() 127 128 # for presets, transparently use the *top* dataset as a source_dataset 129 # since that is where any underlying processors should get 130 # their data from. However, this should only be done as long as the 131 # preset is not finished yet, because after that there may be processors 132 # that run on the final preset result 133 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 134 self.is_running_in_preset = True 135 self.source_dataset = self.source_dataset.get_parent() 136 if self.source_dataset is None: 137 # this means there is no dataset that is *not* a preset anywhere 138 # above this dataset. This should never occur, but if it does, we 139 # cannot continue 140 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 141 (self.type, self.dataset.key)) 142 self.job.finish() 143 return 144 145 except DataSetException: 146 # we need to know what the source_dataset dataset was to properly handle the 147 # analysis 148 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 149 self.type, self.dataset.key)) 150 self.job.finish() 151 return 152 153 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 154 # not finished yet - retry after a while 155 # exception for presets, since these *should* be unfinished 156 # until underlying processors are done 157 self.job.release(delay=30) 158 return 159 160 self.source_file = self.source_dataset.get_results_path() 161 if not self.source_file.exists(): 162 self.dataset.update_status("Finished, no input data found.") 163 164 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 165 166 processor_name = self.title if hasattr(self, "title") else self.type 167 self.dataset.clear_log() 168 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 169 170 # start log file 171 self.dataset.update_status("Processing data") 172 self.dataset.update_version(get_software_commit(self)) 173 174 # we may create temporary files with the processor; anything in here 175 # will be deleted after the processor ends (or crashes!). dataset 176 # objects will have their cleanup methods called 177 self.for_cleanup = [self.dataset] 178 if self.source_dataset is not None: 179 # Add source dataset to cleanup list to remove disposable files 180 self.for_cleanup.append(self.source_dataset) 181 182 # get parameters 183 # if possible, fill defaults where parameters are not provided 184 given_parameters = self.dataset.parameters.copy() 185 all_parameters = self.get_options(self.dataset, config=self.config) 186 self.parameters = { 187 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 188 for param in [*all_parameters.keys(), *given_parameters.keys()] 189 } 190 191 # now the parameters have been loaded into memory, clear any sensitive 192 # ones. This has a side-effect that a processor may not run again 193 # without starting from scratch, but this is the price of progress 194 options = self.get_options(self.dataset.get_parent(), config=self.config) 195 for option, option_settings in options.items(): 196 if option_settings.get("sensitive"): 197 self.dataset.delete_parameter(option) 198 199 if self.interrupted: 200 self.dataset.log("Processing interrupted, trying again later") 201 return self.abort() 202 203 if not self.dataset.is_finished(): 204 try: 205 self.process() 206 self.after_process() 207 208 # processors should usually finish their jobs by themselves, but if 209 # the worker finished without errors, the job can be finished in 210 # any case 211 if not self.job.is_finished: 212 self.job.finish() 213 except WorkerInterruptedException as e: 214 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 215 self.abort() 216 except Exception as e: 217 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 218 stack = traceback.extract_tb(e.__traceback__) 219 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]] 220 location = "->".join(frames) 221 222 # Not all datasets have source_dataset keys 223 if len(self.dataset.get_genealogy()) > 1: 224 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 225 else: 226 parent_key = "" 227 228 # Clean up partially created datasets/files 229 self.clean_up_on_error() 230 231 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 232 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack) 233 234 finally: 235 # clean up files that have been created and marked as disposable 236 for item in self.for_cleanup: 237 if type(item) is DataSet: 238 item.remove_disposable_files() 239 elif item.exists(): 240 shutil.rmtree(item, ignore_errors=True) 241 else: 242 # dataset already finished, job shouldn't be open anymore 243 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % ( 244 self.job.data["jobtype"], self.job.data["remote_id"])) 245 self.job.finish() 246 247 def after_process(self): 248 """ 249 Run after processing the dataset 250 251 This method cleans up temporary files, and if needed, handles logistics 252 concerning the result file, e.g. running a pre-defined processor on the 253 result, copying it to another dataset, and so on. 254 """ 255 if self.dataset.data["num_rows"] > 0: 256 self.dataset.update_status("Dataset completed.") 257 258 if not self.dataset.is_finished(): 259 self.dataset.finish() 260 261 # see if we have anything else lined up to run next 262 for next in self.parameters.get("next", []): 263 can_run_next = True 264 next_parameters = next.get("parameters", {}) 265 next_type = next.get("type", "") 266 try: 267 available_processors = self.dataset.get_available_processors(config=self.config) 268 except ValueError: 269 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 270 break 271 272 273 # run it only if the post-processor is actually available for this query 274 if self.dataset.data["num_rows"] <= 0: 275 can_run_next = False 276 self.log.info( 277 "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % ( 278 next_type, self.dataset.key)) 279 280 elif next_type in available_processors: 281 next_analysis = DataSet( 282 parameters=next_parameters, 283 type=next_type, 284 db=self.db, 285 parent=self.dataset.key, 286 extension=available_processors[next_type].extension, 287 is_private=self.dataset.is_private, 288 owner=self.dataset.creator, 289 modules=self.modules 290 ) 291 # copy ownership from parent dataset 292 next_analysis.copy_ownership_from(self.dataset) 293 # add to queue 294 self.queue.add_job(self.modules.processors[next_type], dataset=next_analysis) 295 else: 296 can_run_next = False 297 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % ( 298 self.dataset.key, self.type, next_type)) 299 300 if not can_run_next: 301 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 302 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 303 if "attach_to" in self.parameters: 304 # Probably should not happen, but for some reason a mid processor has been designated as the processor 305 # the parent should attach to 306 pass 307 else: 308 # Check for "attach_to" parameter in descendents 309 while True: 310 if "attach_to" in next_parameters: 311 self.parameters["attach_to"] = next_parameters["attach_to"] 312 break 313 else: 314 if "next" in next_parameters: 315 next_parameters = next_parameters["next"][0]["parameters"] 316 else: 317 # No more descendents 318 # Should not happen; we cannot find the source dataset 319 self.log.warning( 320 "Cannot find preset's source dataset for dataset %s" % self.dataset.key) 321 break 322 323 # see if we need to register the result somewhere 324 if "copy_to" in self.parameters: 325 # copy the results to an arbitrary place that was passed 326 if self.dataset.get_results_path().exists(): 327 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 328 else: 329 # if copy_to was passed, that means it's important that this 330 # file exists somewhere, so we create it as an empty file 331 with open(self.parameters["copy_to"], "w") as empty_file: 332 empty_file.write("") 333 334 # see if this query chain is to be attached to another query 335 # if so, the full genealogy of this query (minus the original dataset) 336 # is attached to the given query - this is mostly useful for presets, 337 # where a chain of processors can be marked as 'underlying' a preset 338 if "attach_to" in self.parameters: 339 try: 340 # copy metadata and results to the surrogate 341 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules) 342 343 if self.dataset.get_results_path().exists(): 344 # Update the surrogate's results file suffix to match this dataset's suffix 345 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix( 346 self.dataset.get_results_path().suffix) 347 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 348 349 try: 350 surrogate.finish(self.dataset.data["num_rows"]) 351 except RuntimeError: 352 # already finished, could happen (though it shouldn't) 353 pass 354 355 surrogate.update_status(self.dataset.get_status()) 356 357 except DataSetException: 358 # dataset with key to attach to doesn't exist... 359 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have " 360 "been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"])) 361 362 self.job.finish() 363 364 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 365 owner = self.dataset.get_parameters().get("email-complete", False) 366 # Check that username is email address 367 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 368 from email.mime.multipart import MIMEMultipart 369 from email.mime.text import MIMEText 370 from smtplib import SMTPException 371 import socket 372 import html2text 373 374 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 375 owner = self.dataset.get_parameters().get("email-complete", False) 376 # Check that username is email address 377 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 378 from email.mime.multipart import MIMEMultipart 379 from email.mime.text import MIMEText 380 from smtplib import SMTPException 381 import socket 382 import html2text 383 384 self.log.debug("Sending email to %s" % owner) 385 dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key 386 sender = self.config.get('mail.noreply') 387 message = MIMEMultipart("alternative") 388 message["From"] = sender 389 message["To"] = owner 390 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 391 mail = """ 392 <p>Hello %s,</p> 393 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 394 <p>You can view your dataset via the following link:</p> 395 <p><a href="%s">%s</a></p> 396 <p>Sincerely,</p> 397 <p>Your friendly neighborhood 4CAT admin</p> 398 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 399 html_parser = html2text.HTML2Text() 400 message.attach(MIMEText(html_parser.handle(mail), "plain")) 401 message.attach(MIMEText(mail, "html")) 402 try: 403 send_email([owner], message, self.config) 404 except (SMTPException, ConnectionRefusedError, socket.timeout): 405 self.log.error("Error sending email to %s" % owner) 406 407 def clean_up_on_error(self): 408 try: 409 # ensure proxied requests are stopped 410 self.flush_proxied_requests() 411 # delete annotations that have been generated as part of this processor 412 self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True) 413 # Remove the results file that was created 414 if self.dataset.get_results_path().exists(): 415 self.dataset.get_results_path().unlink() 416 if self.dataset.get_results_folder_path().exists(): 417 shutil.rmtree(self.dataset.get_results_folder_path()) 418 419 except Exception as e: 420 self.log.error("Error during processor cleanup after error: %s" % str(e)) 421 422 def abort(self): 423 """ 424 Abort dataset creation and clean up so it may be attempted again later 425 """ 426 self.clean_up_on_error() 427 428 # we release instead of finish, since interrupting is just that - the 429 # job should resume at a later point. Delay resuming by 10 seconds to 430 # give 4CAT the time to do whatever it wants (though usually this isn't 431 # needed since restarting also stops the spawning of new workers) 432 if self.interrupted == self.INTERRUPT_RETRY: 433 # retry later - wait at least 10 seconds to give the backend time to shut down 434 self.job.release(delay=10) 435 elif self.interrupted == self.INTERRUPT_CANCEL: 436 # cancel job 437 self.job.finish() 438 439 def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs): 440 """ 441 Request an iterable of URLs and return results 442 443 This method takes an iterable yielding URLs and yields the result for 444 a GET request for that URL in return. This is done through the worker 445 manager's DelegatedRequestHandler, which can run multiple requests in 446 parallel and divide them over the proxies configured in 4CAT (if any). 447 Proxy cooloff and queueing is shared with other processors, so that a 448 processor will never accidentally request from the same site as another 449 processor, potentially triggering rate limits. 450 451 :param urls: Something that can be iterated over and yields URLs 452 :param kwargs: Other keyword arguments are passed on to `add_urls` 453 and eventually to `requests.get()`. 454 :param bool preserve_order: Return items in the original order. Use 455 `False` to potentially speed up processing, if order is not important. 456 :return: A generator yielding request results, i.e. tuples of a 457 URL and a `requests` response objects 458 """ 459 queue_name = self._proxy_queue_name() 460 delegator = self.manager.proxy_delegator 461 462 delegator.refresh_settings(self.config) 463 464 # 50 is an arbitrary batch size - but we top up every 0.05s, so 465 # that should be sufficient 466 batch_size = 50 467 468 # we need an iterable, so we can use next() and StopIteration 469 urls = iter(urls) 470 471 have_urls = True 472 while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls: 473 if queue_length < batch_size and have_urls: 474 batch = [] 475 while len(batch) < (batch_size - queue_length): 476 try: 477 batch.append(next(urls)) 478 except StopIteration: 479 have_urls = False 480 break 481 482 delegator.add_urls(batch, queue_name, **kwargs) 483 484 time.sleep(0.05) # arbitrary... 485 for url, result in delegator.get_results(queue_name, preserve_order=preserve_order): 486 # result may also be a FailedProxiedRequest! 487 # up to the processor to decide how to deal with it 488 yield url, result 489 490 def push_proxied_request(self, url, position=-1, **kwargs): 491 """ 492 Add a single URL to the proxied requests queue 493 494 :param str url: URL to add 495 :param position: Position to add to queue; can be used to add priority 496 requests, adds to end of queue by default 497 :param kwargs: 498 """ 499 self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs) 500 501 def flush_proxied_requests(self): 502 """ 503 Get rid of remaining proxied requests 504 505 Can be used if enough results are available and any remaining ones need 506 to be stopped ASAP and are otherwise unneeded. 507 508 Blocking! 509 """ 510 self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name()) 511 512 def _proxy_queue_name(self): 513 """ 514 Get proxy queue name 515 516 For internal use. 517 518 :return str: 519 """ 520 return f"{self.type}-{self.dataset.key}" 521 522 def unpack_archive_contents(self, path, staging_area=None): 523 """ 524 Unpack all files in an archive to a staging area 525 526 With every iteration, the processor's 'interrupted' flag is checked, 527 and if set a ProcessorInterruptedException is raised, which by default 528 is caught and subsequently stops execution gracefully. 529 530 Files are unzipped to a staging area. The staging area is *not* 531 cleaned up automatically. 532 533 :param Path path: Path to zip file to read 534 :param Path staging_area: Where to store the files while they're 535 being worked with. If omitted, a temporary folder is created and 536 deleted after use 537 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 538 :return Path: A path to the staging area 539 """ 540 541 if not path.exists(): 542 return 543 544 if not staging_area: 545 staging_area = self.dataset.get_staging_area() 546 547 if not staging_area.exists() or not staging_area.is_dir(): 548 raise RuntimeError("Staging area %s is not a valid folder") 549 550 paths = [] 551 with zipfile.ZipFile(path, "r") as archive_file: 552 archive_contents = sorted(archive_file.namelist()) 553 554 for archived_file in archive_contents: 555 if self.interrupted: 556 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 557 558 file_name = archived_file.split("/")[-1] 559 temp_file = staging_area.joinpath(file_name) 560 archive_file.extract(archived_file, staging_area) 561 paths.append(temp_file) 562 563 return staging_area 564 565 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 566 """ 567 Extract a file from an archive by name 568 569 :param str filename: Name of file to extract 570 :param Path archive_path: Path to zip file to read 571 :param Path staging_area: Where to store the files while they're 572 being worked with. If omitted, a temporary folder is created 573 :return Path: A path to the extracted file 574 """ 575 if not archive_path.exists(): 576 return 577 578 if not staging_area: 579 staging_area = self.dataset.get_staging_area() 580 581 if not staging_area.exists() or not staging_area.is_dir(): 582 raise RuntimeError("Staging area %s is not a valid folder") 583 584 with zipfile.ZipFile(archive_path, "r") as archive_file: 585 if filename not in archive_file.namelist(): 586 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 587 else: 588 archive_file.extract(filename, staging_area) 589 return staging_area.joinpath(filename) 590 591 def write_csv_items_and_finish(self, data): 592 """ 593 Write data as csv to results file and finish dataset 594 595 Determines result file path using dataset's path determination helper 596 methods. After writing results, the dataset is marked finished. Will 597 raise a ProcessorInterruptedException if the interrupted flag for this 598 processor is set while iterating. 599 600 :param data: A list or tuple of dictionaries, all with the same keys 601 """ 602 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 603 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 604 605 if not data: 606 raise ValueError("write_csv_items requires a dictionary with at least one item") 607 608 self.dataset.update_status("Writing results file") 609 writer = False 610 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 611 for row in data: 612 if self.interrupted: 613 raise ProcessorInterruptedException("Interrupted while writing results file") 614 615 row = remove_nuls(row) 616 if not writer: 617 writer = csv.DictWriter(results, fieldnames=row.keys()) 618 writer.writeheader() 619 620 writer.writerow(row) 621 622 self.dataset.update_status("Finished") 623 self.dataset.finish(len(data)) 624 625 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 626 """ 627 Archive a bunch of files into a zip archive and finish processing 628 629 :param list|Path files: If a list, all files will be added to the 630 archive and deleted afterwards. If a folder, all files in the folder 631 will be added and the folder will be deleted afterwards. 632 :param int num_items: Items in the dataset. If None, the amount of 633 files added to the archive will be used. 634 :param int compression: Type of compression to use. By default, files 635 are not compressed, to speed up unarchiving. 636 :param bool finish: Finish the dataset/job afterwards or not? 637 """ 638 is_folder = False 639 if issubclass(type(files), PurePath): 640 is_folder = files 641 if not files.exists() or not files.is_dir(): 642 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 643 644 files = files.glob("*") 645 646 # create zip of archive and delete temporary files and folder 647 self.dataset.update_status("Compressing results into archive") 648 done = 0 649 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 650 for output_path in files: 651 zip.write(output_path, output_path.name) 652 output_path.unlink() 653 done += 1 654 655 # delete temporary folder 656 if is_folder: 657 shutil.rmtree(is_folder) 658 659 self.dataset.update_status("Finished") 660 if num_items is None: 661 num_items = done 662 663 if finish: 664 self.dataset.finish(num_items) 665 666 def create_standalone(self, item_ids=None): 667 """ 668 Copy this dataset and make that copy standalone. 669 670 This has the benefit of allowing for all analyses that can be run on 671 full datasets on the new, filtered copy as well. 672 673 This also transfers annotations and annotation fields. 674 675 :param list item_ids: The item_ids that are copied-over. Used to check what annotations need to be copied. 676 677 :return DataSet: The new standalone dataset 678 """ 679 680 top_parent = self.source_dataset 681 682 finished = self.dataset.check_dataset_finished() 683 if finished == 'empty': 684 # No data to process, so we can't create a standalone dataset 685 return 686 elif finished is None: 687 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 688 pass 689 690 standalone = self.dataset.copy(shallow=False) 691 standalone.body_match = "(Filtered) " + top_parent.query 692 standalone.datasource = top_parent.parameters.get("datasource", "custom") 693 694 if top_parent.annotation_fields and top_parent.num_annotations() > 0: 695 # Get column names dynamically 696 annotation_cols = self.db.fetchone("SELECT * FROM annotations LIMIT 1") 697 annotation_cols = list(annotation_cols.keys()) 698 annotation_cols.remove("id") # Set by the DB 699 cols_str = ",".join(annotation_cols) 700 701 cols_list = ["a." + col for col in annotation_cols if col != "dataset"] 702 query = f"INSERT INTO annotations ({cols_str}) OVERRIDING USER VALUE " \ 703 f"SELECT %s, {', '.join(cols_list)} " \ 704 f"FROM annotations AS a WHERE a.dataset = %s" 705 706 # Copy over all annotations if no item_ids are given 707 if not item_ids or top_parent.num_rows == standalone.num_rows: 708 self.db.execute(query, replacements=(standalone.key, top_parent.key)) 709 else: 710 query += " AND a.item_id = ANY(%s)" 711 self.db.execute(query, replacements=(standalone.key, top_parent.key, item_ids)) 712 713 # Copy over annotation fields and update annotations with new field IDs 714 if top_parent.annotation_fields: 715 # New field IDs based on the new dataset key 716 annotation_fields = { 717 hash_to_md5(old_field_id + standalone.key): field_values 718 for old_field_id, field_values in top_parent.annotation_fields.items() 719 } 720 standalone.annotation_fields = {} # Reset to insert everything without checking for changes 721 standalone.save_annotation_fields(annotation_fields) # Save to db 722 723 # Also update field IDs in annotations 724 for i, old_field_id in enumerate(top_parent.annotation_fields.keys()): 725 self.db.update( 726 "annotations", 727 where={"field_id": old_field_id, "dataset": standalone.key}, 728 data={"field_id": hash_to_md5(old_field_id + standalone.key) 729 }) 730 731 try: 732 standalone.board = top_parent.board 733 except AttributeError: 734 standalone.board = self.type 735 736 standalone.type = top_parent.type 737 738 standalone.detach() 739 standalone.delete_parameter("key_parent") 740 741 self.dataset.copied_to = standalone.key 742 743 # we don't need this file anymore - it has been copied to the new 744 # standalone dataset, and this one is not accessible via the interface 745 # except as a link to the copied standalone dataset 746 os.unlink(self.dataset.get_results_path()) 747 748 # Copy the log 749 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 750 751 return standalone 752 753 def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int: 754 """ 755 Saves annotations made by this processor on the basis of another dataset. 756 Also adds some data regarding this processor: set `author` and `label` to processor name, 757 and add parameters to `metadata` (unless explicitly indicated). 758 759 :param annotations: List of dictionaries with annotation items. Must have `item_id` and `value`. 760 E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] 761 :param source_dataset: The dataset that these annotations will be saved on. If None, will use the 762 top parent. 763 :param bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations 764 are still shown in `iterate_items()`). 765 766 :returns int: How many annotations were saved. 767 768 """ 769 770 if not annotations: 771 return 0 772 773 # Default to parent dataset 774 if not source_dataset: 775 source_dataset = self.source_dataset.top_parent() 776 777 # Check if this dataset already has annotation fields, and if so, store some values to use per annotation. 778 annotation_fields = source_dataset.annotation_fields 779 780 # Keep track of what fields we've already seen, so we don't need to hash every time. 781 seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id 782 for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items} 783 784 annotations_saved = 0 785 failed = 0 786 787 # Loop through all annotations. This may be batched. 788 for annotation in annotations: 789 790 # item_id always needs to be present 791 if not annotation.get("item_id"): 792 failed += 1 793 continue 794 795 # Keep track of what dataset generated this annotation 796 annotation["from_dataset"] = self.dataset.key 797 # Set the author to this processor's name 798 if not annotation.get("author"): 799 annotation["author"] = self.name 800 if not annotation.get("author_original"): 801 annotation["author_original"] = self.name 802 annotation["by_processor"] = True 803 804 # Only use a default label if no custom one is given 805 if not annotation.get("label"): 806 annotation["label"] = self.name 807 808 # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet. 809 # We need to do this within this loop because this function may be called in batches and with different 810 # annotation types. 811 if (annotation["from_dataset"], annotation["label"]) not in seen_fields: 812 # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key. 813 # This should create unique fields, even if there's multiple annotation types for one processor. 814 field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"]) 815 seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id 816 annotation_fields[field_id] = { 817 "label": annotation["label"], 818 "type": annotation["type"] if annotation.get("type") else "text", 819 "from_dataset": annotation["from_dataset"], 820 "hide_in_explorer": hide_in_explorer 821 } 822 else: 823 # Else just get the field ID 824 field_id = seen_fields[(annotation["from_dataset"], annotation["label"])] 825 826 # Add field ID to the annotation 827 annotation["field_id"] = field_id 828 829 try: 830 annotations_saved = source_dataset.save_annotations(annotations) 831 except AnnotationException as e: 832 self.source_dataset.update_status(str(e)) 833 if failed: 834 self.dataset.update_status("Could not save all annotations, make sure that all items have an `id` value.") 835 836 source_dataset.save_annotation_fields(annotation_fields) 837 838 return annotations_saved 839 840 @classmethod 841 def map_item_method_available(cls, dataset): 842 """ 843 Check if this processor can use map_item 844 845 Checks if map_item method exists and is compatible with dataset. If 846 dataset has a different extension than the default for this processor, 847 or if the dataset has no extension, this means we cannot be sure the 848 data is in the right format to be mapped, so `False` is returned in 849 that case even if a map_item() method is available. 850 851 :param BasicProcessor processor: The BasicProcessor subclass object 852 with which to use map_item 853 :param DataSet dataset: The DataSet object with which to 854 use map_item 855 """ 856 # only run item mapper if extension of processor == extension of 857 # data file, for the scenario where a csv file was uploaded and 858 # converted to an ndjson-based data source, for example 859 # todo: this is kind of ugly, and a better fix may be possible 860 dataset_extension = dataset.get_extension() 861 if not dataset_extension: 862 # DataSet results file does not exist or has no extension, use expected extension 863 if hasattr(dataset, "extension"): 864 dataset_extension = dataset.extension 865 else: 866 # No known DataSet extension; cannot determine if map_item method compatible 867 return False 868 869 return hasattr(cls, "map_item") and cls.extension == dataset_extension 870 871 @classmethod 872 def get_mapped_item(cls, item): 873 """ 874 Get the mapped item using a processors map_item method. 875 876 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 877 """ 878 try: 879 mapped_item = cls.map_item(item) 880 except (KeyError, IndexError) as e: 881 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 882 883 if not mapped_item: 884 raise MapItemException("Unable to map item!") 885 886 return mapped_item 887 888 @classmethod 889 def is_filter(cls): 890 """ 891 Is this processor a filter? 892 893 Filters do not produce their own dataset but replace the source_dataset dataset 894 instead. 895 896 :todo: Make this a bit more robust than sniffing the processor category 897 :return bool: 898 """ 899 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower() 900 901 @classmethod 902 def get_options(cls, parent_dataset=None, config=None) -> dict: 903 """ 904 Get processor options 905 906 This method by default returns the class's "options" attribute, or an 907 empty dictionary. It can be redefined by processors that need more 908 fine-grained options, e.g. in cases where the availability of options 909 is partially determined by the parent dataset's parameters. 910 911 :param parent_dataset DataSet: An object representing the dataset that 912 the processor would be or was run on. Can be used, in conjunction with 913 config, to show some options only to privileged users. 914 :param config ConfigManager|None config: Configuration reader (context-aware) 915 :return dict: Options for this processor 916 """ 917 918 return cls.options if hasattr(cls, "options") else {} 919 920 @classmethod 921 def get_status(cls): 922 """ 923 Get processor status 924 925 :return list: Statuses of this processor 926 """ 927 return cls.status if hasattr(cls, "status") else None 928 929 @classmethod 930 def is_top_dataset(cls): 931 """ 932 Confirm this is *not* a top dataset, but a processor. 933 934 Used for processor compatibility checks. 935 936 :return bool: Always `False`, because this is a processor. 937 """ 938 return False 939 940 @classmethod 941 def is_from_collector(cls): 942 """ 943 Check if this processor is one that collects data, i.e. a search or 944 import worker. 945 946 :return bool: 947 """ 948 return cls.type.endswith("-search") or cls.type.endswith("-import") 949 950 @classmethod 951 def get_extension(self, parent_dataset=None): 952 """ 953 Return the extension of the processor's dataset 954 955 Used for processor compatibility checks. 956 957 :param DataSet parent_dataset: An object representing the dataset that 958 the processor would be run on 959 :return str|None: Dataset extension (without leading `.`) or `None`. 960 """ 961 if self.is_filter(): 962 if parent_dataset is not None: 963 # Filters should use the same extension as the parent dataset 964 return parent_dataset.get_extension() 965 else: 966 # No dataset provided, unable to determine extension of parent dataset 967 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 968 return None 969 elif self.extension: 970 # Use explicitly defined extension in class (Processor class defaults to "csv") 971 return self.extension 972 else: 973 # A non filter processor updated the base Processor extension to None/False? 974 return None 975 976 @classmethod 977 def is_rankable(cls, multiple_items=True): 978 """ 979 Used for processor compatibility 980 981 :param bool multiple_items: Consider datasets with multiple items per 982 item (e.g. word_1, word_2, etc)? Included for compatibility 983 """ 984 return False 985 986 @classmethod 987 def exclude_followup_processors(cls, processor_type=None): 988 """ 989 Used for processor compatibility 990 991 To be defined by the child processor if it should exclude certain follow-up processors. 992 e.g.: 993 994 def exclude_followup_processors(cls, processor_type): 995 if processor_type in ["undesirable-followup-processor"]: 996 return True 997 return False 998 999 :param str processor_type: Processor type to exclude 1000 :return bool: True if processor should be excluded, False otherwise 1001 """ 1002 return False 1003 1004 @abc.abstractmethod 1005 def process(self): 1006 """ 1007 Process data 1008 1009 To be defined by the child processor. 1010 """ 1011 pass 1012 1013 @staticmethod 1014 def is_4cat_processor(): 1015 """ 1016 Is this a 4CAT processor? 1017 1018 This is used to determine whether a class is a 4CAT 1019 processor. 1020 1021 :return: True 1022 """ 1023 return True
29class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta): 30 """ 31 Abstract processor class 32 33 A processor takes a finished dataset as input and processes its result in 34 some way, with another dataset set as output. The input thus is a file, and 35 the output (usually) as well. In other words, the result of a processor can 36 be used as input for another processor (though whether and when this is 37 useful is another question). 38 39 To determine whether a processor can process a given dataset, you can 40 define a `is_compatible_with(FourcatModule module=None, config=None):) -> bool` class 41 method which takes a dataset as argument and returns a bool that determines 42 if this processor is considered compatible with that dataset. For example: 43 44 .. code-block:: python 45 46 @classmethod 47 def is_compatible_with(cls, module=None, config=None): 48 return module.type == "linguistic-features" 49 50 51 """ 52 53 #: Database handler to interface with the 4CAT database 54 db = None 55 56 #: Job object that requests the execution of this processor 57 job = None 58 59 #: The dataset object that the processor is *creating*. 60 dataset = None 61 62 #: Owner (username) of the dataset 63 owner = None 64 65 #: The dataset object that the processor is *processing*. 66 source_dataset = None 67 68 #: The file that is being processed 69 source_file = None 70 71 #: Processor description, which will be displayed in the web interface 72 description = "No description available" 73 74 #: Category identifier, used to group processors in the web interface 75 category = "Other" 76 77 #: Extension of the file created by the processor 78 extension = "csv" 79 80 #: 4CAT settings from the perspective of the dataset's owner 81 config = None 82 83 #: Is this processor running 'within' a preset processor? 84 is_running_in_preset = False 85 86 #: Is this processor hidden in the front-end, and only used internally/in presets? 87 is_hidden = False 88 89 #: This will be defined automatically upon loading the processor. There is 90 #: no need to override manually 91 filepath = None 92 93 #: This will be a list; files added to it are deleted after the processor 94 #: terminates, even on failure, if they still exist at that point. Add 95 #: path objects or dataset objects; for datasets, the 96 #: `remove_disposable_files()` method will be called. 97 for_cleanup = None 98 99 def work(self): 100 """ 101 Process a dataset 102 103 Loads dataset metadata, sets up the scaffolding for performing some kind 104 of processing on that dataset, and then processes it. Afterwards, clean 105 up. 106 """ 107 try: 108 # a dataset can have multiple owners, but the creator is the user 109 # that actually queued the processor, so their config is relevant 110 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 111 self.owner = self.dataset.creator 112 except DataSetException: 113 # query has been deleted in the meantime. finish without error, 114 # as deleting it will have been a conscious choice by a user 115 self.job.finish() 116 return 117 118 # set up config reader wrapping the worker's config manager, which is 119 # in turn the one passed to it by the WorkerManager, which is the one 120 # originally loaded in bootstrap 121 self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner)) 122 123 if self.dataset.data.get("key_parent", None): 124 # search workers never have parents (for now), so we don't need to 125 # find out what the source_dataset dataset is if it's a search worker 126 try: 127 self.source_dataset = self.dataset.get_parent() 128 129 # for presets, transparently use the *top* dataset as a source_dataset 130 # since that is where any underlying processors should get 131 # their data from. However, this should only be done as long as the 132 # preset is not finished yet, because after that there may be processors 133 # that run on the final preset result 134 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 135 self.is_running_in_preset = True 136 self.source_dataset = self.source_dataset.get_parent() 137 if self.source_dataset is None: 138 # this means there is no dataset that is *not* a preset anywhere 139 # above this dataset. This should never occur, but if it does, we 140 # cannot continue 141 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 142 (self.type, self.dataset.key)) 143 self.job.finish() 144 return 145 146 except DataSetException: 147 # we need to know what the source_dataset dataset was to properly handle the 148 # analysis 149 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 150 self.type, self.dataset.key)) 151 self.job.finish() 152 return 153 154 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 155 # not finished yet - retry after a while 156 # exception for presets, since these *should* be unfinished 157 # until underlying processors are done 158 self.job.release(delay=30) 159 return 160 161 self.source_file = self.source_dataset.get_results_path() 162 if not self.source_file.exists(): 163 self.dataset.update_status("Finished, no input data found.") 164 165 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 166 167 processor_name = self.title if hasattr(self, "title") else self.type 168 self.dataset.clear_log() 169 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 170 171 # start log file 172 self.dataset.update_status("Processing data") 173 self.dataset.update_version(get_software_commit(self)) 174 175 # we may create temporary files with the processor; anything in here 176 # will be deleted after the processor ends (or crashes!). dataset 177 # objects will have their cleanup methods called 178 self.for_cleanup = [self.dataset] 179 if self.source_dataset is not None: 180 # Add source dataset to cleanup list to remove disposable files 181 self.for_cleanup.append(self.source_dataset) 182 183 # get parameters 184 # if possible, fill defaults where parameters are not provided 185 given_parameters = self.dataset.parameters.copy() 186 all_parameters = self.get_options(self.dataset, config=self.config) 187 self.parameters = { 188 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 189 for param in [*all_parameters.keys(), *given_parameters.keys()] 190 } 191 192 # now the parameters have been loaded into memory, clear any sensitive 193 # ones. This has a side-effect that a processor may not run again 194 # without starting from scratch, but this is the price of progress 195 options = self.get_options(self.dataset.get_parent(), config=self.config) 196 for option, option_settings in options.items(): 197 if option_settings.get("sensitive"): 198 self.dataset.delete_parameter(option) 199 200 if self.interrupted: 201 self.dataset.log("Processing interrupted, trying again later") 202 return self.abort() 203 204 if not self.dataset.is_finished(): 205 try: 206 self.process() 207 self.after_process() 208 209 # processors should usually finish their jobs by themselves, but if 210 # the worker finished without errors, the job can be finished in 211 # any case 212 if not self.job.is_finished: 213 self.job.finish() 214 except WorkerInterruptedException as e: 215 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 216 self.abort() 217 except Exception as e: 218 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 219 stack = traceback.extract_tb(e.__traceback__) 220 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]] 221 location = "->".join(frames) 222 223 # Not all datasets have source_dataset keys 224 if len(self.dataset.get_genealogy()) > 1: 225 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 226 else: 227 parent_key = "" 228 229 # Clean up partially created datasets/files 230 self.clean_up_on_error() 231 232 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 233 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack) 234 235 finally: 236 # clean up files that have been created and marked as disposable 237 for item in self.for_cleanup: 238 if type(item) is DataSet: 239 item.remove_disposable_files() 240 elif item.exists(): 241 shutil.rmtree(item, ignore_errors=True) 242 else: 243 # dataset already finished, job shouldn't be open anymore 244 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % ( 245 self.job.data["jobtype"], self.job.data["remote_id"])) 246 self.job.finish() 247 248 def after_process(self): 249 """ 250 Run after processing the dataset 251 252 This method cleans up temporary files, and if needed, handles logistics 253 concerning the result file, e.g. running a pre-defined processor on the 254 result, copying it to another dataset, and so on. 255 """ 256 if self.dataset.data["num_rows"] > 0: 257 self.dataset.update_status("Dataset completed.") 258 259 if not self.dataset.is_finished(): 260 self.dataset.finish() 261 262 # see if we have anything else lined up to run next 263 for next in self.parameters.get("next", []): 264 can_run_next = True 265 next_parameters = next.get("parameters", {}) 266 next_type = next.get("type", "") 267 try: 268 available_processors = self.dataset.get_available_processors(config=self.config) 269 except ValueError: 270 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 271 break 272 273 274 # run it only if the post-processor is actually available for this query 275 if self.dataset.data["num_rows"] <= 0: 276 can_run_next = False 277 self.log.info( 278 "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % ( 279 next_type, self.dataset.key)) 280 281 elif next_type in available_processors: 282 next_analysis = DataSet( 283 parameters=next_parameters, 284 type=next_type, 285 db=self.db, 286 parent=self.dataset.key, 287 extension=available_processors[next_type].extension, 288 is_private=self.dataset.is_private, 289 owner=self.dataset.creator, 290 modules=self.modules 291 ) 292 # copy ownership from parent dataset 293 next_analysis.copy_ownership_from(self.dataset) 294 # add to queue 295 self.queue.add_job(self.modules.processors[next_type], dataset=next_analysis) 296 else: 297 can_run_next = False 298 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % ( 299 self.dataset.key, self.type, next_type)) 300 301 if not can_run_next: 302 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 303 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 304 if "attach_to" in self.parameters: 305 # Probably should not happen, but for some reason a mid processor has been designated as the processor 306 # the parent should attach to 307 pass 308 else: 309 # Check for "attach_to" parameter in descendents 310 while True: 311 if "attach_to" in next_parameters: 312 self.parameters["attach_to"] = next_parameters["attach_to"] 313 break 314 else: 315 if "next" in next_parameters: 316 next_parameters = next_parameters["next"][0]["parameters"] 317 else: 318 # No more descendents 319 # Should not happen; we cannot find the source dataset 320 self.log.warning( 321 "Cannot find preset's source dataset for dataset %s" % self.dataset.key) 322 break 323 324 # see if we need to register the result somewhere 325 if "copy_to" in self.parameters: 326 # copy the results to an arbitrary place that was passed 327 if self.dataset.get_results_path().exists(): 328 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 329 else: 330 # if copy_to was passed, that means it's important that this 331 # file exists somewhere, so we create it as an empty file 332 with open(self.parameters["copy_to"], "w") as empty_file: 333 empty_file.write("") 334 335 # see if this query chain is to be attached to another query 336 # if so, the full genealogy of this query (minus the original dataset) 337 # is attached to the given query - this is mostly useful for presets, 338 # where a chain of processors can be marked as 'underlying' a preset 339 if "attach_to" in self.parameters: 340 try: 341 # copy metadata and results to the surrogate 342 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules) 343 344 if self.dataset.get_results_path().exists(): 345 # Update the surrogate's results file suffix to match this dataset's suffix 346 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix( 347 self.dataset.get_results_path().suffix) 348 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 349 350 try: 351 surrogate.finish(self.dataset.data["num_rows"]) 352 except RuntimeError: 353 # already finished, could happen (though it shouldn't) 354 pass 355 356 surrogate.update_status(self.dataset.get_status()) 357 358 except DataSetException: 359 # dataset with key to attach to doesn't exist... 360 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have " 361 "been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"])) 362 363 self.job.finish() 364 365 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 366 owner = self.dataset.get_parameters().get("email-complete", False) 367 # Check that username is email address 368 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 369 from email.mime.multipart import MIMEMultipart 370 from email.mime.text import MIMEText 371 from smtplib import SMTPException 372 import socket 373 import html2text 374 375 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 376 owner = self.dataset.get_parameters().get("email-complete", False) 377 # Check that username is email address 378 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 379 from email.mime.multipart import MIMEMultipart 380 from email.mime.text import MIMEText 381 from smtplib import SMTPException 382 import socket 383 import html2text 384 385 self.log.debug("Sending email to %s" % owner) 386 dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key 387 sender = self.config.get('mail.noreply') 388 message = MIMEMultipart("alternative") 389 message["From"] = sender 390 message["To"] = owner 391 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 392 mail = """ 393 <p>Hello %s,</p> 394 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 395 <p>You can view your dataset via the following link:</p> 396 <p><a href="%s">%s</a></p> 397 <p>Sincerely,</p> 398 <p>Your friendly neighborhood 4CAT admin</p> 399 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 400 html_parser = html2text.HTML2Text() 401 message.attach(MIMEText(html_parser.handle(mail), "plain")) 402 message.attach(MIMEText(mail, "html")) 403 try: 404 send_email([owner], message, self.config) 405 except (SMTPException, ConnectionRefusedError, socket.timeout): 406 self.log.error("Error sending email to %s" % owner) 407 408 def clean_up_on_error(self): 409 try: 410 # ensure proxied requests are stopped 411 self.flush_proxied_requests() 412 # delete annotations that have been generated as part of this processor 413 self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True) 414 # Remove the results file that was created 415 if self.dataset.get_results_path().exists(): 416 self.dataset.get_results_path().unlink() 417 if self.dataset.get_results_folder_path().exists(): 418 shutil.rmtree(self.dataset.get_results_folder_path()) 419 420 except Exception as e: 421 self.log.error("Error during processor cleanup after error: %s" % str(e)) 422 423 def abort(self): 424 """ 425 Abort dataset creation and clean up so it may be attempted again later 426 """ 427 self.clean_up_on_error() 428 429 # we release instead of finish, since interrupting is just that - the 430 # job should resume at a later point. Delay resuming by 10 seconds to 431 # give 4CAT the time to do whatever it wants (though usually this isn't 432 # needed since restarting also stops the spawning of new workers) 433 if self.interrupted == self.INTERRUPT_RETRY: 434 # retry later - wait at least 10 seconds to give the backend time to shut down 435 self.job.release(delay=10) 436 elif self.interrupted == self.INTERRUPT_CANCEL: 437 # cancel job 438 self.job.finish() 439 440 def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs): 441 """ 442 Request an iterable of URLs and return results 443 444 This method takes an iterable yielding URLs and yields the result for 445 a GET request for that URL in return. This is done through the worker 446 manager's DelegatedRequestHandler, which can run multiple requests in 447 parallel and divide them over the proxies configured in 4CAT (if any). 448 Proxy cooloff and queueing is shared with other processors, so that a 449 processor will never accidentally request from the same site as another 450 processor, potentially triggering rate limits. 451 452 :param urls: Something that can be iterated over and yields URLs 453 :param kwargs: Other keyword arguments are passed on to `add_urls` 454 and eventually to `requests.get()`. 455 :param bool preserve_order: Return items in the original order. Use 456 `False` to potentially speed up processing, if order is not important. 457 :return: A generator yielding request results, i.e. tuples of a 458 URL and a `requests` response objects 459 """ 460 queue_name = self._proxy_queue_name() 461 delegator = self.manager.proxy_delegator 462 463 delegator.refresh_settings(self.config) 464 465 # 50 is an arbitrary batch size - but we top up every 0.05s, so 466 # that should be sufficient 467 batch_size = 50 468 469 # we need an iterable, so we can use next() and StopIteration 470 urls = iter(urls) 471 472 have_urls = True 473 while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls: 474 if queue_length < batch_size and have_urls: 475 batch = [] 476 while len(batch) < (batch_size - queue_length): 477 try: 478 batch.append(next(urls)) 479 except StopIteration: 480 have_urls = False 481 break 482 483 delegator.add_urls(batch, queue_name, **kwargs) 484 485 time.sleep(0.05) # arbitrary... 486 for url, result in delegator.get_results(queue_name, preserve_order=preserve_order): 487 # result may also be a FailedProxiedRequest! 488 # up to the processor to decide how to deal with it 489 yield url, result 490 491 def push_proxied_request(self, url, position=-1, **kwargs): 492 """ 493 Add a single URL to the proxied requests queue 494 495 :param str url: URL to add 496 :param position: Position to add to queue; can be used to add priority 497 requests, adds to end of queue by default 498 :param kwargs: 499 """ 500 self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs) 501 502 def flush_proxied_requests(self): 503 """ 504 Get rid of remaining proxied requests 505 506 Can be used if enough results are available and any remaining ones need 507 to be stopped ASAP and are otherwise unneeded. 508 509 Blocking! 510 """ 511 self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name()) 512 513 def _proxy_queue_name(self): 514 """ 515 Get proxy queue name 516 517 For internal use. 518 519 :return str: 520 """ 521 return f"{self.type}-{self.dataset.key}" 522 523 def unpack_archive_contents(self, path, staging_area=None): 524 """ 525 Unpack all files in an archive to a staging area 526 527 With every iteration, the processor's 'interrupted' flag is checked, 528 and if set a ProcessorInterruptedException is raised, which by default 529 is caught and subsequently stops execution gracefully. 530 531 Files are unzipped to a staging area. The staging area is *not* 532 cleaned up automatically. 533 534 :param Path path: Path to zip file to read 535 :param Path staging_area: Where to store the files while they're 536 being worked with. If omitted, a temporary folder is created and 537 deleted after use 538 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 539 :return Path: A path to the staging area 540 """ 541 542 if not path.exists(): 543 return 544 545 if not staging_area: 546 staging_area = self.dataset.get_staging_area() 547 548 if not staging_area.exists() or not staging_area.is_dir(): 549 raise RuntimeError("Staging area %s is not a valid folder") 550 551 paths = [] 552 with zipfile.ZipFile(path, "r") as archive_file: 553 archive_contents = sorted(archive_file.namelist()) 554 555 for archived_file in archive_contents: 556 if self.interrupted: 557 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 558 559 file_name = archived_file.split("/")[-1] 560 temp_file = staging_area.joinpath(file_name) 561 archive_file.extract(archived_file, staging_area) 562 paths.append(temp_file) 563 564 return staging_area 565 566 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 567 """ 568 Extract a file from an archive by name 569 570 :param str filename: Name of file to extract 571 :param Path archive_path: Path to zip file to read 572 :param Path staging_area: Where to store the files while they're 573 being worked with. If omitted, a temporary folder is created 574 :return Path: A path to the extracted file 575 """ 576 if not archive_path.exists(): 577 return 578 579 if not staging_area: 580 staging_area = self.dataset.get_staging_area() 581 582 if not staging_area.exists() or not staging_area.is_dir(): 583 raise RuntimeError("Staging area %s is not a valid folder") 584 585 with zipfile.ZipFile(archive_path, "r") as archive_file: 586 if filename not in archive_file.namelist(): 587 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 588 else: 589 archive_file.extract(filename, staging_area) 590 return staging_area.joinpath(filename) 591 592 def write_csv_items_and_finish(self, data): 593 """ 594 Write data as csv to results file and finish dataset 595 596 Determines result file path using dataset's path determination helper 597 methods. After writing results, the dataset is marked finished. Will 598 raise a ProcessorInterruptedException if the interrupted flag for this 599 processor is set while iterating. 600 601 :param data: A list or tuple of dictionaries, all with the same keys 602 """ 603 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 604 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 605 606 if not data: 607 raise ValueError("write_csv_items requires a dictionary with at least one item") 608 609 self.dataset.update_status("Writing results file") 610 writer = False 611 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 612 for row in data: 613 if self.interrupted: 614 raise ProcessorInterruptedException("Interrupted while writing results file") 615 616 row = remove_nuls(row) 617 if not writer: 618 writer = csv.DictWriter(results, fieldnames=row.keys()) 619 writer.writeheader() 620 621 writer.writerow(row) 622 623 self.dataset.update_status("Finished") 624 self.dataset.finish(len(data)) 625 626 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 627 """ 628 Archive a bunch of files into a zip archive and finish processing 629 630 :param list|Path files: If a list, all files will be added to the 631 archive and deleted afterwards. If a folder, all files in the folder 632 will be added and the folder will be deleted afterwards. 633 :param int num_items: Items in the dataset. If None, the amount of 634 files added to the archive will be used. 635 :param int compression: Type of compression to use. By default, files 636 are not compressed, to speed up unarchiving. 637 :param bool finish: Finish the dataset/job afterwards or not? 638 """ 639 is_folder = False 640 if issubclass(type(files), PurePath): 641 is_folder = files 642 if not files.exists() or not files.is_dir(): 643 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 644 645 files = files.glob("*") 646 647 # create zip of archive and delete temporary files and folder 648 self.dataset.update_status("Compressing results into archive") 649 done = 0 650 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 651 for output_path in files: 652 zip.write(output_path, output_path.name) 653 output_path.unlink() 654 done += 1 655 656 # delete temporary folder 657 if is_folder: 658 shutil.rmtree(is_folder) 659 660 self.dataset.update_status("Finished") 661 if num_items is None: 662 num_items = done 663 664 if finish: 665 self.dataset.finish(num_items) 666 667 def create_standalone(self, item_ids=None): 668 """ 669 Copy this dataset and make that copy standalone. 670 671 This has the benefit of allowing for all analyses that can be run on 672 full datasets on the new, filtered copy as well. 673 674 This also transfers annotations and annotation fields. 675 676 :param list item_ids: The item_ids that are copied-over. Used to check what annotations need to be copied. 677 678 :return DataSet: The new standalone dataset 679 """ 680 681 top_parent = self.source_dataset 682 683 finished = self.dataset.check_dataset_finished() 684 if finished == 'empty': 685 # No data to process, so we can't create a standalone dataset 686 return 687 elif finished is None: 688 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 689 pass 690 691 standalone = self.dataset.copy(shallow=False) 692 standalone.body_match = "(Filtered) " + top_parent.query 693 standalone.datasource = top_parent.parameters.get("datasource", "custom") 694 695 if top_parent.annotation_fields and top_parent.num_annotations() > 0: 696 # Get column names dynamically 697 annotation_cols = self.db.fetchone("SELECT * FROM annotations LIMIT 1") 698 annotation_cols = list(annotation_cols.keys()) 699 annotation_cols.remove("id") # Set by the DB 700 cols_str = ",".join(annotation_cols) 701 702 cols_list = ["a." + col for col in annotation_cols if col != "dataset"] 703 query = f"INSERT INTO annotations ({cols_str}) OVERRIDING USER VALUE " \ 704 f"SELECT %s, {', '.join(cols_list)} " \ 705 f"FROM annotations AS a WHERE a.dataset = %s" 706 707 # Copy over all annotations if no item_ids are given 708 if not item_ids or top_parent.num_rows == standalone.num_rows: 709 self.db.execute(query, replacements=(standalone.key, top_parent.key)) 710 else: 711 query += " AND a.item_id = ANY(%s)" 712 self.db.execute(query, replacements=(standalone.key, top_parent.key, item_ids)) 713 714 # Copy over annotation fields and update annotations with new field IDs 715 if top_parent.annotation_fields: 716 # New field IDs based on the new dataset key 717 annotation_fields = { 718 hash_to_md5(old_field_id + standalone.key): field_values 719 for old_field_id, field_values in top_parent.annotation_fields.items() 720 } 721 standalone.annotation_fields = {} # Reset to insert everything without checking for changes 722 standalone.save_annotation_fields(annotation_fields) # Save to db 723 724 # Also update field IDs in annotations 725 for i, old_field_id in enumerate(top_parent.annotation_fields.keys()): 726 self.db.update( 727 "annotations", 728 where={"field_id": old_field_id, "dataset": standalone.key}, 729 data={"field_id": hash_to_md5(old_field_id + standalone.key) 730 }) 731 732 try: 733 standalone.board = top_parent.board 734 except AttributeError: 735 standalone.board = self.type 736 737 standalone.type = top_parent.type 738 739 standalone.detach() 740 standalone.delete_parameter("key_parent") 741 742 self.dataset.copied_to = standalone.key 743 744 # we don't need this file anymore - it has been copied to the new 745 # standalone dataset, and this one is not accessible via the interface 746 # except as a link to the copied standalone dataset 747 os.unlink(self.dataset.get_results_path()) 748 749 # Copy the log 750 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 751 752 return standalone 753 754 def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int: 755 """ 756 Saves annotations made by this processor on the basis of another dataset. 757 Also adds some data regarding this processor: set `author` and `label` to processor name, 758 and add parameters to `metadata` (unless explicitly indicated). 759 760 :param annotations: List of dictionaries with annotation items. Must have `item_id` and `value`. 761 E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] 762 :param source_dataset: The dataset that these annotations will be saved on. If None, will use the 763 top parent. 764 :param bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations 765 are still shown in `iterate_items()`). 766 767 :returns int: How many annotations were saved. 768 769 """ 770 771 if not annotations: 772 return 0 773 774 # Default to parent dataset 775 if not source_dataset: 776 source_dataset = self.source_dataset.top_parent() 777 778 # Check if this dataset already has annotation fields, and if so, store some values to use per annotation. 779 annotation_fields = source_dataset.annotation_fields 780 781 # Keep track of what fields we've already seen, so we don't need to hash every time. 782 seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id 783 for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items} 784 785 annotations_saved = 0 786 failed = 0 787 788 # Loop through all annotations. This may be batched. 789 for annotation in annotations: 790 791 # item_id always needs to be present 792 if not annotation.get("item_id"): 793 failed += 1 794 continue 795 796 # Keep track of what dataset generated this annotation 797 annotation["from_dataset"] = self.dataset.key 798 # Set the author to this processor's name 799 if not annotation.get("author"): 800 annotation["author"] = self.name 801 if not annotation.get("author_original"): 802 annotation["author_original"] = self.name 803 annotation["by_processor"] = True 804 805 # Only use a default label if no custom one is given 806 if not annotation.get("label"): 807 annotation["label"] = self.name 808 809 # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet. 810 # We need to do this within this loop because this function may be called in batches and with different 811 # annotation types. 812 if (annotation["from_dataset"], annotation["label"]) not in seen_fields: 813 # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key. 814 # This should create unique fields, even if there's multiple annotation types for one processor. 815 field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"]) 816 seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id 817 annotation_fields[field_id] = { 818 "label": annotation["label"], 819 "type": annotation["type"] if annotation.get("type") else "text", 820 "from_dataset": annotation["from_dataset"], 821 "hide_in_explorer": hide_in_explorer 822 } 823 else: 824 # Else just get the field ID 825 field_id = seen_fields[(annotation["from_dataset"], annotation["label"])] 826 827 # Add field ID to the annotation 828 annotation["field_id"] = field_id 829 830 try: 831 annotations_saved = source_dataset.save_annotations(annotations) 832 except AnnotationException as e: 833 self.source_dataset.update_status(str(e)) 834 if failed: 835 self.dataset.update_status("Could not save all annotations, make sure that all items have an `id` value.") 836 837 source_dataset.save_annotation_fields(annotation_fields) 838 839 return annotations_saved 840 841 @classmethod 842 def map_item_method_available(cls, dataset): 843 """ 844 Check if this processor can use map_item 845 846 Checks if map_item method exists and is compatible with dataset. If 847 dataset has a different extension than the default for this processor, 848 or if the dataset has no extension, this means we cannot be sure the 849 data is in the right format to be mapped, so `False` is returned in 850 that case even if a map_item() method is available. 851 852 :param BasicProcessor processor: The BasicProcessor subclass object 853 with which to use map_item 854 :param DataSet dataset: The DataSet object with which to 855 use map_item 856 """ 857 # only run item mapper if extension of processor == extension of 858 # data file, for the scenario where a csv file was uploaded and 859 # converted to an ndjson-based data source, for example 860 # todo: this is kind of ugly, and a better fix may be possible 861 dataset_extension = dataset.get_extension() 862 if not dataset_extension: 863 # DataSet results file does not exist or has no extension, use expected extension 864 if hasattr(dataset, "extension"): 865 dataset_extension = dataset.extension 866 else: 867 # No known DataSet extension; cannot determine if map_item method compatible 868 return False 869 870 return hasattr(cls, "map_item") and cls.extension == dataset_extension 871 872 @classmethod 873 def get_mapped_item(cls, item): 874 """ 875 Get the mapped item using a processors map_item method. 876 877 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 878 """ 879 try: 880 mapped_item = cls.map_item(item) 881 except (KeyError, IndexError) as e: 882 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 883 884 if not mapped_item: 885 raise MapItemException("Unable to map item!") 886 887 return mapped_item 888 889 @classmethod 890 def is_filter(cls): 891 """ 892 Is this processor a filter? 893 894 Filters do not produce their own dataset but replace the source_dataset dataset 895 instead. 896 897 :todo: Make this a bit more robust than sniffing the processor category 898 :return bool: 899 """ 900 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower() 901 902 @classmethod 903 def get_options(cls, parent_dataset=None, config=None) -> dict: 904 """ 905 Get processor options 906 907 This method by default returns the class's "options" attribute, or an 908 empty dictionary. It can be redefined by processors that need more 909 fine-grained options, e.g. in cases where the availability of options 910 is partially determined by the parent dataset's parameters. 911 912 :param parent_dataset DataSet: An object representing the dataset that 913 the processor would be or was run on. Can be used, in conjunction with 914 config, to show some options only to privileged users. 915 :param config ConfigManager|None config: Configuration reader (context-aware) 916 :return dict: Options for this processor 917 """ 918 919 return cls.options if hasattr(cls, "options") else {} 920 921 @classmethod 922 def get_status(cls): 923 """ 924 Get processor status 925 926 :return list: Statuses of this processor 927 """ 928 return cls.status if hasattr(cls, "status") else None 929 930 @classmethod 931 def is_top_dataset(cls): 932 """ 933 Confirm this is *not* a top dataset, but a processor. 934 935 Used for processor compatibility checks. 936 937 :return bool: Always `False`, because this is a processor. 938 """ 939 return False 940 941 @classmethod 942 def is_from_collector(cls): 943 """ 944 Check if this processor is one that collects data, i.e. a search or 945 import worker. 946 947 :return bool: 948 """ 949 return cls.type.endswith("-search") or cls.type.endswith("-import") 950 951 @classmethod 952 def get_extension(self, parent_dataset=None): 953 """ 954 Return the extension of the processor's dataset 955 956 Used for processor compatibility checks. 957 958 :param DataSet parent_dataset: An object representing the dataset that 959 the processor would be run on 960 :return str|None: Dataset extension (without leading `.`) or `None`. 961 """ 962 if self.is_filter(): 963 if parent_dataset is not None: 964 # Filters should use the same extension as the parent dataset 965 return parent_dataset.get_extension() 966 else: 967 # No dataset provided, unable to determine extension of parent dataset 968 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 969 return None 970 elif self.extension: 971 # Use explicitly defined extension in class (Processor class defaults to "csv") 972 return self.extension 973 else: 974 # A non filter processor updated the base Processor extension to None/False? 975 return None 976 977 @classmethod 978 def is_rankable(cls, multiple_items=True): 979 """ 980 Used for processor compatibility 981 982 :param bool multiple_items: Consider datasets with multiple items per 983 item (e.g. word_1, word_2, etc)? Included for compatibility 984 """ 985 return False 986 987 @classmethod 988 def exclude_followup_processors(cls, processor_type=None): 989 """ 990 Used for processor compatibility 991 992 To be defined by the child processor if it should exclude certain follow-up processors. 993 e.g.: 994 995 def exclude_followup_processors(cls, processor_type): 996 if processor_type in ["undesirable-followup-processor"]: 997 return True 998 return False 999 1000 :param str processor_type: Processor type to exclude 1001 :return bool: True if processor should be excluded, False otherwise 1002 """ 1003 return False 1004 1005 @abc.abstractmethod 1006 def process(self): 1007 """ 1008 Process data 1009 1010 To be defined by the child processor. 1011 """ 1012 pass 1013 1014 @staticmethod 1015 def is_4cat_processor(): 1016 """ 1017 Is this a 4CAT processor? 1018 1019 This is used to determine whether a class is a 4CAT 1020 processor. 1021 1022 :return: True 1023 """ 1024 return True
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, config=None):) -> bool class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod
def is_compatible_with(cls, module=None, config=None):
return module.type == "linguistic-features"
99 def work(self): 100 """ 101 Process a dataset 102 103 Loads dataset metadata, sets up the scaffolding for performing some kind 104 of processing on that dataset, and then processes it. Afterwards, clean 105 up. 106 """ 107 try: 108 # a dataset can have multiple owners, but the creator is the user 109 # that actually queued the processor, so their config is relevant 110 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 111 self.owner = self.dataset.creator 112 except DataSetException: 113 # query has been deleted in the meantime. finish without error, 114 # as deleting it will have been a conscious choice by a user 115 self.job.finish() 116 return 117 118 # set up config reader wrapping the worker's config manager, which is 119 # in turn the one passed to it by the WorkerManager, which is the one 120 # originally loaded in bootstrap 121 self.config = ConfigWrapper(config=self.config, user=User.get_by_name(self.db, self.owner)) 122 123 if self.dataset.data.get("key_parent", None): 124 # search workers never have parents (for now), so we don't need to 125 # find out what the source_dataset dataset is if it's a search worker 126 try: 127 self.source_dataset = self.dataset.get_parent() 128 129 # for presets, transparently use the *top* dataset as a source_dataset 130 # since that is where any underlying processors should get 131 # their data from. However, this should only be done as long as the 132 # preset is not finished yet, because after that there may be processors 133 # that run on the final preset result 134 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 135 self.is_running_in_preset = True 136 self.source_dataset = self.source_dataset.get_parent() 137 if self.source_dataset is None: 138 # this means there is no dataset that is *not* a preset anywhere 139 # above this dataset. This should never occur, but if it does, we 140 # cannot continue 141 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 142 (self.type, self.dataset.key)) 143 self.job.finish() 144 return 145 146 except DataSetException: 147 # we need to know what the source_dataset dataset was to properly handle the 148 # analysis 149 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 150 self.type, self.dataset.key)) 151 self.job.finish() 152 return 153 154 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 155 # not finished yet - retry after a while 156 # exception for presets, since these *should* be unfinished 157 # until underlying processors are done 158 self.job.release(delay=30) 159 return 160 161 self.source_file = self.source_dataset.get_results_path() 162 if not self.source_file.exists(): 163 self.dataset.update_status("Finished, no input data found.") 164 165 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 166 167 processor_name = self.title if hasattr(self, "title") else self.type 168 self.dataset.clear_log() 169 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 170 171 # start log file 172 self.dataset.update_status("Processing data") 173 self.dataset.update_version(get_software_commit(self)) 174 175 # we may create temporary files with the processor; anything in here 176 # will be deleted after the processor ends (or crashes!). dataset 177 # objects will have their cleanup methods called 178 self.for_cleanup = [self.dataset] 179 if self.source_dataset is not None: 180 # Add source dataset to cleanup list to remove disposable files 181 self.for_cleanup.append(self.source_dataset) 182 183 # get parameters 184 # if possible, fill defaults where parameters are not provided 185 given_parameters = self.dataset.parameters.copy() 186 all_parameters = self.get_options(self.dataset, config=self.config) 187 self.parameters = { 188 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 189 for param in [*all_parameters.keys(), *given_parameters.keys()] 190 } 191 192 # now the parameters have been loaded into memory, clear any sensitive 193 # ones. This has a side-effect that a processor may not run again 194 # without starting from scratch, but this is the price of progress 195 options = self.get_options(self.dataset.get_parent(), config=self.config) 196 for option, option_settings in options.items(): 197 if option_settings.get("sensitive"): 198 self.dataset.delete_parameter(option) 199 200 if self.interrupted: 201 self.dataset.log("Processing interrupted, trying again later") 202 return self.abort() 203 204 if not self.dataset.is_finished(): 205 try: 206 self.process() 207 self.after_process() 208 209 # processors should usually finish their jobs by themselves, but if 210 # the worker finished without errors, the job can be finished in 211 # any case 212 if not self.job.is_finished: 213 self.job.finish() 214 except WorkerInterruptedException as e: 215 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 216 self.abort() 217 except Exception as e: 218 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 219 stack = traceback.extract_tb(e.__traceback__) 220 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack[1:]] 221 location = "->".join(frames) 222 223 # Not all datasets have source_dataset keys 224 if len(self.dataset.get_genealogy()) > 1: 225 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 226 else: 227 parent_key = "" 228 229 # Clean up partially created datasets/files 230 self.clean_up_on_error() 231 232 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 233 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=stack) 234 235 finally: 236 # clean up files that have been created and marked as disposable 237 for item in self.for_cleanup: 238 if type(item) is DataSet: 239 item.remove_disposable_files() 240 elif item.exists(): 241 shutil.rmtree(item, ignore_errors=True) 242 else: 243 # dataset already finished, job shouldn't be open anymore 244 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % ( 245 self.job.data["jobtype"], self.job.data["remote_id"])) 246 self.job.finish()
Process a dataset
Loads dataset metadata, sets up the scaffolding for performing some kind of processing on that dataset, and then processes it. Afterwards, clean up.
248 def after_process(self): 249 """ 250 Run after processing the dataset 251 252 This method cleans up temporary files, and if needed, handles logistics 253 concerning the result file, e.g. running a pre-defined processor on the 254 result, copying it to another dataset, and so on. 255 """ 256 if self.dataset.data["num_rows"] > 0: 257 self.dataset.update_status("Dataset completed.") 258 259 if not self.dataset.is_finished(): 260 self.dataset.finish() 261 262 # see if we have anything else lined up to run next 263 for next in self.parameters.get("next", []): 264 can_run_next = True 265 next_parameters = next.get("parameters", {}) 266 next_type = next.get("type", "") 267 try: 268 available_processors = self.dataset.get_available_processors(config=self.config) 269 except ValueError: 270 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 271 break 272 273 274 # run it only if the post-processor is actually available for this query 275 if self.dataset.data["num_rows"] <= 0: 276 can_run_next = False 277 self.log.info( 278 "Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % ( 279 next_type, self.dataset.key)) 280 281 elif next_type in available_processors: 282 next_analysis = DataSet( 283 parameters=next_parameters, 284 type=next_type, 285 db=self.db, 286 parent=self.dataset.key, 287 extension=available_processors[next_type].extension, 288 is_private=self.dataset.is_private, 289 owner=self.dataset.creator, 290 modules=self.modules 291 ) 292 # copy ownership from parent dataset 293 next_analysis.copy_ownership_from(self.dataset) 294 # add to queue 295 self.queue.add_job(self.modules.processors[next_type], dataset=next_analysis) 296 else: 297 can_run_next = False 298 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % ( 299 self.dataset.key, self.type, next_type)) 300 301 if not can_run_next: 302 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 303 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 304 if "attach_to" in self.parameters: 305 # Probably should not happen, but for some reason a mid processor has been designated as the processor 306 # the parent should attach to 307 pass 308 else: 309 # Check for "attach_to" parameter in descendents 310 while True: 311 if "attach_to" in next_parameters: 312 self.parameters["attach_to"] = next_parameters["attach_to"] 313 break 314 else: 315 if "next" in next_parameters: 316 next_parameters = next_parameters["next"][0]["parameters"] 317 else: 318 # No more descendents 319 # Should not happen; we cannot find the source dataset 320 self.log.warning( 321 "Cannot find preset's source dataset for dataset %s" % self.dataset.key) 322 break 323 324 # see if we need to register the result somewhere 325 if "copy_to" in self.parameters: 326 # copy the results to an arbitrary place that was passed 327 if self.dataset.get_results_path().exists(): 328 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 329 else: 330 # if copy_to was passed, that means it's important that this 331 # file exists somewhere, so we create it as an empty file 332 with open(self.parameters["copy_to"], "w") as empty_file: 333 empty_file.write("") 334 335 # see if this query chain is to be attached to another query 336 # if so, the full genealogy of this query (minus the original dataset) 337 # is attached to the given query - this is mostly useful for presets, 338 # where a chain of processors can be marked as 'underlying' a preset 339 if "attach_to" in self.parameters: 340 try: 341 # copy metadata and results to the surrogate 342 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db, modules=self.modules) 343 344 if self.dataset.get_results_path().exists(): 345 # Update the surrogate's results file suffix to match this dataset's suffix 346 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix( 347 self.dataset.get_results_path().suffix) 348 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 349 350 try: 351 surrogate.finish(self.dataset.data["num_rows"]) 352 except RuntimeError: 353 # already finished, could happen (though it shouldn't) 354 pass 355 356 surrogate.update_status(self.dataset.get_status()) 357 358 except DataSetException: 359 # dataset with key to attach to doesn't exist... 360 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have " 361 "been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"])) 362 363 self.job.finish() 364 365 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 366 owner = self.dataset.get_parameters().get("email-complete", False) 367 # Check that username is email address 368 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 369 from email.mime.multipart import MIMEMultipart 370 from email.mime.text import MIMEText 371 from smtplib import SMTPException 372 import socket 373 import html2text 374 375 if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 376 owner = self.dataset.get_parameters().get("email-complete", False) 377 # Check that username is email address 378 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 379 from email.mime.multipart import MIMEMultipart 380 from email.mime.text import MIMEText 381 from smtplib import SMTPException 382 import socket 383 import html2text 384 385 self.log.debug("Sending email to %s" % owner) 386 dataset_url = ('https://' if self.config.get('flask.https') else 'http://') + self.config.get('flask.server_name') + '/results/' + self.dataset.key 387 sender = self.config.get('mail.noreply') 388 message = MIMEMultipart("alternative") 389 message["From"] = sender 390 message["To"] = owner 391 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 392 mail = """ 393 <p>Hello %s,</p> 394 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 395 <p>You can view your dataset via the following link:</p> 396 <p><a href="%s">%s</a></p> 397 <p>Sincerely,</p> 398 <p>Your friendly neighborhood 4CAT admin</p> 399 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 400 html_parser = html2text.HTML2Text() 401 message.attach(MIMEText(html_parser.handle(mail), "plain")) 402 message.attach(MIMEText(mail, "html")) 403 try: 404 send_email([owner], message, self.config) 405 except (SMTPException, ConnectionRefusedError, socket.timeout): 406 self.log.error("Error sending email to %s" % owner)
Run after processing the dataset
This method cleans up temporary files, and if needed, handles logistics concerning the result file, e.g. running a pre-defined processor on the result, copying it to another dataset, and so on.
408 def clean_up_on_error(self): 409 try: 410 # ensure proxied requests are stopped 411 self.flush_proxied_requests() 412 # delete annotations that have been generated as part of this processor 413 self.db.delete("annotations", where={"from_dataset": self.dataset.key}, commit=True) 414 # Remove the results file that was created 415 if self.dataset.get_results_path().exists(): 416 self.dataset.get_results_path().unlink() 417 if self.dataset.get_results_folder_path().exists(): 418 shutil.rmtree(self.dataset.get_results_folder_path()) 419 420 except Exception as e: 421 self.log.error("Error during processor cleanup after error: %s" % str(e))
423 def abort(self): 424 """ 425 Abort dataset creation and clean up so it may be attempted again later 426 """ 427 self.clean_up_on_error() 428 429 # we release instead of finish, since interrupting is just that - the 430 # job should resume at a later point. Delay resuming by 10 seconds to 431 # give 4CAT the time to do whatever it wants (though usually this isn't 432 # needed since restarting also stops the spawning of new workers) 433 if self.interrupted == self.INTERRUPT_RETRY: 434 # retry later - wait at least 10 seconds to give the backend time to shut down 435 self.job.release(delay=10) 436 elif self.interrupted == self.INTERRUPT_CANCEL: 437 # cancel job 438 self.job.finish()
Abort dataset creation and clean up so it may be attempted again later
440 def iterate_proxied_requests(self, urls, preserve_order=True, **kwargs): 441 """ 442 Request an iterable of URLs and return results 443 444 This method takes an iterable yielding URLs and yields the result for 445 a GET request for that URL in return. This is done through the worker 446 manager's DelegatedRequestHandler, which can run multiple requests in 447 parallel and divide them over the proxies configured in 4CAT (if any). 448 Proxy cooloff and queueing is shared with other processors, so that a 449 processor will never accidentally request from the same site as another 450 processor, potentially triggering rate limits. 451 452 :param urls: Something that can be iterated over and yields URLs 453 :param kwargs: Other keyword arguments are passed on to `add_urls` 454 and eventually to `requests.get()`. 455 :param bool preserve_order: Return items in the original order. Use 456 `False` to potentially speed up processing, if order is not important. 457 :return: A generator yielding request results, i.e. tuples of a 458 URL and a `requests` response objects 459 """ 460 queue_name = self._proxy_queue_name() 461 delegator = self.manager.proxy_delegator 462 463 delegator.refresh_settings(self.config) 464 465 # 50 is an arbitrary batch size - but we top up every 0.05s, so 466 # that should be sufficient 467 batch_size = 50 468 469 # we need an iterable, so we can use next() and StopIteration 470 urls = iter(urls) 471 472 have_urls = True 473 while (queue_length := delegator.get_queue_length(queue_name)) > 0 or have_urls: 474 if queue_length < batch_size and have_urls: 475 batch = [] 476 while len(batch) < (batch_size - queue_length): 477 try: 478 batch.append(next(urls)) 479 except StopIteration: 480 have_urls = False 481 break 482 483 delegator.add_urls(batch, queue_name, **kwargs) 484 485 time.sleep(0.05) # arbitrary... 486 for url, result in delegator.get_results(queue_name, preserve_order=preserve_order): 487 # result may also be a FailedProxiedRequest! 488 # up to the processor to decide how to deal with it 489 yield url, result
Request an iterable of URLs and return results
This method takes an iterable yielding URLs and yields the result for a GET request for that URL in return. This is done through the worker manager's DelegatedRequestHandler, which can run multiple requests in parallel and divide them over the proxies configured in 4CAT (if any). Proxy cooloff and queueing is shared with other processors, so that a processor will never accidentally request from the same site as another processor, potentially triggering rate limits.
Parameters
- urls: Something that can be iterated over and yields URLs
- kwargs: Other keyword arguments are passed on to
add_urlsand eventually torequests.get(). - bool preserve_order: Return items in the original order. Use
Falseto potentially speed up processing, if order is not important.
Returns
A generator yielding request results, i.e. tuples of a URL and a
requestsresponse objects
491 def push_proxied_request(self, url, position=-1, **kwargs): 492 """ 493 Add a single URL to the proxied requests queue 494 495 :param str url: URL to add 496 :param position: Position to add to queue; can be used to add priority 497 requests, adds to end of queue by default 498 :param kwargs: 499 """ 500 self.manager.proxy_delegator.add_urls([url], self._proxy_queue_name(), position=position, **kwargs)
Add a single URL to the proxied requests queue
Parameters
- str url: URL to add
- position: Position to add to queue; can be used to add priority requests, adds to end of queue by default
- kwargs:
502 def flush_proxied_requests(self): 503 """ 504 Get rid of remaining proxied requests 505 506 Can be used if enough results are available and any remaining ones need 507 to be stopped ASAP and are otherwise unneeded. 508 509 Blocking! 510 """ 511 self.manager.proxy_delegator.halt_and_wait(self._proxy_queue_name())
Get rid of remaining proxied requests
Can be used if enough results are available and any remaining ones need to be stopped ASAP and are otherwise unneeded.
Blocking!
523 def unpack_archive_contents(self, path, staging_area=None): 524 """ 525 Unpack all files in an archive to a staging area 526 527 With every iteration, the processor's 'interrupted' flag is checked, 528 and if set a ProcessorInterruptedException is raised, which by default 529 is caught and subsequently stops execution gracefully. 530 531 Files are unzipped to a staging area. The staging area is *not* 532 cleaned up automatically. 533 534 :param Path path: Path to zip file to read 535 :param Path staging_area: Where to store the files while they're 536 being worked with. If omitted, a temporary folder is created and 537 deleted after use 538 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 539 :return Path: A path to the staging area 540 """ 541 542 if not path.exists(): 543 return 544 545 if not staging_area: 546 staging_area = self.dataset.get_staging_area() 547 548 if not staging_area.exists() or not staging_area.is_dir(): 549 raise RuntimeError("Staging area %s is not a valid folder") 550 551 paths = [] 552 with zipfile.ZipFile(path, "r") as archive_file: 553 archive_contents = sorted(archive_file.namelist()) 554 555 for archived_file in archive_contents: 556 if self.interrupted: 557 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 558 559 file_name = archived_file.split("/")[-1] 560 temp_file = staging_area.joinpath(file_name) 561 archive_file.extract(archived_file, staging_area) 562 paths.append(temp_file) 563 564 return staging_area
Unpack all files in an archive to a staging area
With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.
Files are unzipped to a staging area. The staging area is not cleaned up automatically.
Parameters
- Path path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
- int max_number_files: Maximum number of files to unpack. If None, all files unpacked
Returns
A path to the staging area
566 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 567 """ 568 Extract a file from an archive by name 569 570 :param str filename: Name of file to extract 571 :param Path archive_path: Path to zip file to read 572 :param Path staging_area: Where to store the files while they're 573 being worked with. If omitted, a temporary folder is created 574 :return Path: A path to the extracted file 575 """ 576 if not archive_path.exists(): 577 return 578 579 if not staging_area: 580 staging_area = self.dataset.get_staging_area() 581 582 if not staging_area.exists() or not staging_area.is_dir(): 583 raise RuntimeError("Staging area %s is not a valid folder") 584 585 with zipfile.ZipFile(archive_path, "r") as archive_file: 586 if filename not in archive_file.namelist(): 587 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 588 else: 589 archive_file.extract(filename, staging_area) 590 return staging_area.joinpath(filename)
Extract a file from an archive by name
Parameters
- str filename: Name of file to extract
- Path archive_path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created
Returns
A path to the extracted file
592 def write_csv_items_and_finish(self, data): 593 """ 594 Write data as csv to results file and finish dataset 595 596 Determines result file path using dataset's path determination helper 597 methods. After writing results, the dataset is marked finished. Will 598 raise a ProcessorInterruptedException if the interrupted flag for this 599 processor is set while iterating. 600 601 :param data: A list or tuple of dictionaries, all with the same keys 602 """ 603 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 604 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 605 606 if not data: 607 raise ValueError("write_csv_items requires a dictionary with at least one item") 608 609 self.dataset.update_status("Writing results file") 610 writer = False 611 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 612 for row in data: 613 if self.interrupted: 614 raise ProcessorInterruptedException("Interrupted while writing results file") 615 616 row = remove_nuls(row) 617 if not writer: 618 writer = csv.DictWriter(results, fieldnames=row.keys()) 619 writer.writeheader() 620 621 writer.writerow(row) 622 623 self.dataset.update_status("Finished") 624 self.dataset.finish(len(data))
Write data as csv to results file and finish dataset
Determines result file path using dataset's path determination helper methods. After writing results, the dataset is marked finished. Will raise a ProcessorInterruptedException if the interrupted flag for this processor is set while iterating.
Parameters
- data: A list or tuple of dictionaries, all with the same keys
626 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 627 """ 628 Archive a bunch of files into a zip archive and finish processing 629 630 :param list|Path files: If a list, all files will be added to the 631 archive and deleted afterwards. If a folder, all files in the folder 632 will be added and the folder will be deleted afterwards. 633 :param int num_items: Items in the dataset. If None, the amount of 634 files added to the archive will be used. 635 :param int compression: Type of compression to use. By default, files 636 are not compressed, to speed up unarchiving. 637 :param bool finish: Finish the dataset/job afterwards or not? 638 """ 639 is_folder = False 640 if issubclass(type(files), PurePath): 641 is_folder = files 642 if not files.exists() or not files.is_dir(): 643 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 644 645 files = files.glob("*") 646 647 # create zip of archive and delete temporary files and folder 648 self.dataset.update_status("Compressing results into archive") 649 done = 0 650 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 651 for output_path in files: 652 zip.write(output_path, output_path.name) 653 output_path.unlink() 654 done += 1 655 656 # delete temporary folder 657 if is_folder: 658 shutil.rmtree(is_folder) 659 660 self.dataset.update_status("Finished") 661 if num_items is None: 662 num_items = done 663 664 if finish: 665 self.dataset.finish(num_items)
Archive a bunch of files into a zip archive and finish processing
Parameters
- list|Path files: If a list, all files will be added to the archive and deleted afterwards. If a folder, all files in the folder will be added and the folder will be deleted afterwards.
- int num_items: Items in the dataset. If None, the amount of files added to the archive will be used.
- int compression: Type of compression to use. By default, files are not compressed, to speed up unarchiving.
- bool finish: Finish the dataset/job afterwards or not?
667 def create_standalone(self, item_ids=None): 668 """ 669 Copy this dataset and make that copy standalone. 670 671 This has the benefit of allowing for all analyses that can be run on 672 full datasets on the new, filtered copy as well. 673 674 This also transfers annotations and annotation fields. 675 676 :param list item_ids: The item_ids that are copied-over. Used to check what annotations need to be copied. 677 678 :return DataSet: The new standalone dataset 679 """ 680 681 top_parent = self.source_dataset 682 683 finished = self.dataset.check_dataset_finished() 684 if finished == 'empty': 685 # No data to process, so we can't create a standalone dataset 686 return 687 elif finished is None: 688 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 689 pass 690 691 standalone = self.dataset.copy(shallow=False) 692 standalone.body_match = "(Filtered) " + top_parent.query 693 standalone.datasource = top_parent.parameters.get("datasource", "custom") 694 695 if top_parent.annotation_fields and top_parent.num_annotations() > 0: 696 # Get column names dynamically 697 annotation_cols = self.db.fetchone("SELECT * FROM annotations LIMIT 1") 698 annotation_cols = list(annotation_cols.keys()) 699 annotation_cols.remove("id") # Set by the DB 700 cols_str = ",".join(annotation_cols) 701 702 cols_list = ["a." + col for col in annotation_cols if col != "dataset"] 703 query = f"INSERT INTO annotations ({cols_str}) OVERRIDING USER VALUE " \ 704 f"SELECT %s, {', '.join(cols_list)} " \ 705 f"FROM annotations AS a WHERE a.dataset = %s" 706 707 # Copy over all annotations if no item_ids are given 708 if not item_ids or top_parent.num_rows == standalone.num_rows: 709 self.db.execute(query, replacements=(standalone.key, top_parent.key)) 710 else: 711 query += " AND a.item_id = ANY(%s)" 712 self.db.execute(query, replacements=(standalone.key, top_parent.key, item_ids)) 713 714 # Copy over annotation fields and update annotations with new field IDs 715 if top_parent.annotation_fields: 716 # New field IDs based on the new dataset key 717 annotation_fields = { 718 hash_to_md5(old_field_id + standalone.key): field_values 719 for old_field_id, field_values in top_parent.annotation_fields.items() 720 } 721 standalone.annotation_fields = {} # Reset to insert everything without checking for changes 722 standalone.save_annotation_fields(annotation_fields) # Save to db 723 724 # Also update field IDs in annotations 725 for i, old_field_id in enumerate(top_parent.annotation_fields.keys()): 726 self.db.update( 727 "annotations", 728 where={"field_id": old_field_id, "dataset": standalone.key}, 729 data={"field_id": hash_to_md5(old_field_id + standalone.key) 730 }) 731 732 try: 733 standalone.board = top_parent.board 734 except AttributeError: 735 standalone.board = self.type 736 737 standalone.type = top_parent.type 738 739 standalone.detach() 740 standalone.delete_parameter("key_parent") 741 742 self.dataset.copied_to = standalone.key 743 744 # we don't need this file anymore - it has been copied to the new 745 # standalone dataset, and this one is not accessible via the interface 746 # except as a link to the copied standalone dataset 747 os.unlink(self.dataset.get_results_path()) 748 749 # Copy the log 750 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 751 752 return standalone
Copy this dataset and make that copy standalone.
This has the benefit of allowing for all analyses that can be run on full datasets on the new, filtered copy as well.
This also transfers annotations and annotation fields.
Parameters
- list item_ids: The item_ids that are copied-over. Used to check what annotations need to be copied.
Returns
The new standalone dataset
754 def save_annotations(self, annotations: list, source_dataset=None, hide_in_explorer=False) -> int: 755 """ 756 Saves annotations made by this processor on the basis of another dataset. 757 Also adds some data regarding this processor: set `author` and `label` to processor name, 758 and add parameters to `metadata` (unless explicitly indicated). 759 760 :param annotations: List of dictionaries with annotation items. Must have `item_id` and `value`. 761 E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] 762 :param source_dataset: The dataset that these annotations will be saved on. If None, will use the 763 top parent. 764 :param bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations 765 are still shown in `iterate_items()`). 766 767 :returns int: How many annotations were saved. 768 769 """ 770 771 if not annotations: 772 return 0 773 774 # Default to parent dataset 775 if not source_dataset: 776 source_dataset = self.source_dataset.top_parent() 777 778 # Check if this dataset already has annotation fields, and if so, store some values to use per annotation. 779 annotation_fields = source_dataset.annotation_fields 780 781 # Keep track of what fields we've already seen, so we don't need to hash every time. 782 seen_fields = {(field_items["from_dataset"], field_items["label"]): field_id 783 for field_id, field_items in annotation_fields.items() if "from_dataset" in field_items} 784 785 annotations_saved = 0 786 failed = 0 787 788 # Loop through all annotations. This may be batched. 789 for annotation in annotations: 790 791 # item_id always needs to be present 792 if not annotation.get("item_id"): 793 failed += 1 794 continue 795 796 # Keep track of what dataset generated this annotation 797 annotation["from_dataset"] = self.dataset.key 798 # Set the author to this processor's name 799 if not annotation.get("author"): 800 annotation["author"] = self.name 801 if not annotation.get("author_original"): 802 annotation["author_original"] = self.name 803 annotation["by_processor"] = True 804 805 # Only use a default label if no custom one is given 806 if not annotation.get("label"): 807 annotation["label"] = self.name 808 809 # Store info on the annotation field if this from_dataset/label combo hasn't been seen yet. 810 # We need to do this within this loop because this function may be called in batches and with different 811 # annotation types. 812 if (annotation["from_dataset"], annotation["label"]) not in seen_fields: 813 # Generating a unique field ID based on the source dataset's key, the label, and this dataset's key. 814 # This should create unique fields, even if there's multiple annotation types for one processor. 815 field_id = hash_to_md5(self.source_dataset.key + annotation["label"] + annotation["from_dataset"]) 816 seen_fields[(annotation["from_dataset"], annotation["label"])] = field_id 817 annotation_fields[field_id] = { 818 "label": annotation["label"], 819 "type": annotation["type"] if annotation.get("type") else "text", 820 "from_dataset": annotation["from_dataset"], 821 "hide_in_explorer": hide_in_explorer 822 } 823 else: 824 # Else just get the field ID 825 field_id = seen_fields[(annotation["from_dataset"], annotation["label"])] 826 827 # Add field ID to the annotation 828 annotation["field_id"] = field_id 829 830 try: 831 annotations_saved = source_dataset.save_annotations(annotations) 832 except AnnotationException as e: 833 self.source_dataset.update_status(str(e)) 834 if failed: 835 self.dataset.update_status("Could not save all annotations, make sure that all items have an `id` value.") 836 837 source_dataset.save_annotation_fields(annotation_fields) 838 839 return annotations_saved
Saves annotations made by this processor on the basis of another dataset.
Also adds some data regarding this processor: set author and label to processor name,
and add parameters to metadata (unless explicitly indicated).
Parameters
- annotations: List of dictionaries with annotation items. Must have
item_idandvalue. E.g. [{"item_id": "12345", "label": "Valid", "value": "Yes"}] - source_dataset: The dataset that these annotations will be saved on. If None, will use the top parent.
- bool hide_in_explorer: Whether this annotation is included in the Explorer. 'Hidden' annotations
are still shown in
iterate_items()).
:returns int: How many annotations were saved.
841 @classmethod 842 def map_item_method_available(cls, dataset): 843 """ 844 Check if this processor can use map_item 845 846 Checks if map_item method exists and is compatible with dataset. If 847 dataset has a different extension than the default for this processor, 848 or if the dataset has no extension, this means we cannot be sure the 849 data is in the right format to be mapped, so `False` is returned in 850 that case even if a map_item() method is available. 851 852 :param BasicProcessor processor: The BasicProcessor subclass object 853 with which to use map_item 854 :param DataSet dataset: The DataSet object with which to 855 use map_item 856 """ 857 # only run item mapper if extension of processor == extension of 858 # data file, for the scenario where a csv file was uploaded and 859 # converted to an ndjson-based data source, for example 860 # todo: this is kind of ugly, and a better fix may be possible 861 dataset_extension = dataset.get_extension() 862 if not dataset_extension: 863 # DataSet results file does not exist or has no extension, use expected extension 864 if hasattr(dataset, "extension"): 865 dataset_extension = dataset.extension 866 else: 867 # No known DataSet extension; cannot determine if map_item method compatible 868 return False 869 870 return hasattr(cls, "map_item") and cls.extension == dataset_extension
Check if this processor can use map_item
Checks if map_item method exists and is compatible with dataset. If
dataset has a different extension than the default for this processor,
or if the dataset has no extension, this means we cannot be sure the
data is in the right format to be mapped, so False is returned in
that case even if a map_item() method is available.
Parameters
- BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
- DataSet dataset: The DataSet object with which to use map_item
872 @classmethod 873 def get_mapped_item(cls, item): 874 """ 875 Get the mapped item using a processors map_item method. 876 877 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 878 """ 879 try: 880 mapped_item = cls.map_item(item) 881 except (KeyError, IndexError) as e: 882 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 883 884 if not mapped_item: 885 raise MapItemException("Unable to map item!") 886 887 return mapped_item
Get the mapped item using a processors map_item method.
Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
889 @classmethod 890 def is_filter(cls): 891 """ 892 Is this processor a filter? 893 894 Filters do not produce their own dataset but replace the source_dataset dataset 895 instead. 896 897 :todo: Make this a bit more robust than sniffing the processor category 898 :return bool: 899 """ 900 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
Is this processor a filter?
Filters do not produce their own dataset but replace the source_dataset dataset instead.
:todo: Make this a bit more robust than sniffing the processor category
Returns
902 @classmethod 903 def get_options(cls, parent_dataset=None, config=None) -> dict: 904 """ 905 Get processor options 906 907 This method by default returns the class's "options" attribute, or an 908 empty dictionary. It can be redefined by processors that need more 909 fine-grained options, e.g. in cases where the availability of options 910 is partially determined by the parent dataset's parameters. 911 912 :param parent_dataset DataSet: An object representing the dataset that 913 the processor would be or was run on. Can be used, in conjunction with 914 config, to show some options only to privileged users. 915 :param config ConfigManager|None config: Configuration reader (context-aware) 916 :return dict: Options for this processor 917 """ 918 919 return cls.options if hasattr(cls, "options") else {}
Get processor options
This method by default returns the class's "options" attribute, or an empty dictionary. It can be redefined by processors that need more fine-grained options, e.g. in cases where the availability of options is partially determined by the parent dataset's parameters.
Parameters
- parent_dataset DataSet: An object representing the dataset that the processor would be or was run on. Can be used, in conjunction with config, to show some options only to privileged users.
- config ConfigManager|None config: Configuration reader (context-aware)
Returns
Options for this processor
921 @classmethod 922 def get_status(cls): 923 """ 924 Get processor status 925 926 :return list: Statuses of this processor 927 """ 928 return cls.status if hasattr(cls, "status") else None
Get processor status
Returns
Statuses of this processor
930 @classmethod 931 def is_top_dataset(cls): 932 """ 933 Confirm this is *not* a top dataset, but a processor. 934 935 Used for processor compatibility checks. 936 937 :return bool: Always `False`, because this is a processor. 938 """ 939 return False
Confirm this is not a top dataset, but a processor.
Used for processor compatibility checks.
Returns
Always
False, because this is a processor.
941 @classmethod 942 def is_from_collector(cls): 943 """ 944 Check if this processor is one that collects data, i.e. a search or 945 import worker. 946 947 :return bool: 948 """ 949 return cls.type.endswith("-search") or cls.type.endswith("-import")
Check if this processor is one that collects data, i.e. a search or import worker.
Returns
951 @classmethod 952 def get_extension(self, parent_dataset=None): 953 """ 954 Return the extension of the processor's dataset 955 956 Used for processor compatibility checks. 957 958 :param DataSet parent_dataset: An object representing the dataset that 959 the processor would be run on 960 :return str|None: Dataset extension (without leading `.`) or `None`. 961 """ 962 if self.is_filter(): 963 if parent_dataset is not None: 964 # Filters should use the same extension as the parent dataset 965 return parent_dataset.get_extension() 966 else: 967 # No dataset provided, unable to determine extension of parent dataset 968 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 969 return None 970 elif self.extension: 971 # Use explicitly defined extension in class (Processor class defaults to "csv") 972 return self.extension 973 else: 974 # A non filter processor updated the base Processor extension to None/False? 975 return None
Return the extension of the processor's dataset
Used for processor compatibility checks.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
Returns
Dataset extension (without leading
.) orNone.
977 @classmethod 978 def is_rankable(cls, multiple_items=True): 979 """ 980 Used for processor compatibility 981 982 :param bool multiple_items: Consider datasets with multiple items per 983 item (e.g. word_1, word_2, etc)? Included for compatibility 984 """ 985 return False
Used for processor compatibility
Parameters
- bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)? Included for compatibility
987 @classmethod 988 def exclude_followup_processors(cls, processor_type=None): 989 """ 990 Used for processor compatibility 991 992 To be defined by the child processor if it should exclude certain follow-up processors. 993 e.g.: 994 995 def exclude_followup_processors(cls, processor_type): 996 if processor_type in ["undesirable-followup-processor"]: 997 return True 998 return False 999 1000 :param str processor_type: Processor type to exclude 1001 :return bool: True if processor should be excluded, False otherwise 1002 """ 1003 return False
Used for processor compatibility
To be defined by the child processor if it should exclude certain follow-up processors. e.g.:
def exclude_followup_processors(cls, processor_type): if processor_type in ["undesirable-followup-processor"]: return True return False
Parameters
- str processor_type: Processor type to exclude
Returns
True if processor should be excluded, False otherwise
1005 @abc.abstractmethod 1006 def process(self): 1007 """ 1008 Process data 1009 1010 To be defined by the child processor. 1011 """ 1012 pass
Process data
To be defined by the child processor.
1014 @staticmethod 1015 def is_4cat_processor(): 1016 """ 1017 Is this a 4CAT processor? 1018 1019 This is used to determine whether a class is a 4CAT 1020 processor. 1021 1022 :return: True 1023 """ 1024 return True
Is this a 4CAT processor?
This is used to determine whether a class is a 4CAT processor.
Returns
True