backend.lib.processor
Basic post-processor worker - should be inherited by workers to post-process results
1""" 2Basic post-processor worker - should be inherited by workers to post-process results 3""" 4import re 5import traceback 6import zipfile 7import typing 8import shutil 9import json 10import abc 11import csv 12import os 13 14from pathlib import Path, PurePath 15 16from backend.lib.worker import BasicWorker 17from common.lib.dataset import DataSet 18from common.lib.fourcat_module import FourcatModule 19from common.lib.helpers import get_software_commit, remove_nuls, send_email 20from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException, 21 DataSetException, MapItemException) 22from common.config_manager import config, ConfigWrapper 23from common.lib.user import User 24 25 26csv.field_size_limit(1024 * 1024 * 1024) 27 28 29class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta): 30 """ 31 Abstract processor class 32 33 A processor takes a finished dataset as input and processes its result in 34 some way, with another dataset set as output. The input thus is a file, and 35 the output (usually) as well. In other words, the result of a processor can 36 be used as input for another processor (though whether and when this is 37 useful is another question). 38 39 To determine whether a processor can process a given dataset, you can 40 define a `is_compatible_with(FourcatModule module=None, str user=None):) -> bool` class 41 method which takes a dataset as argument and returns a bool that determines 42 if this processor is considered compatible with that dataset. For example: 43 44 .. code-block:: python 45 46 @classmethod 47 def is_compatible_with(cls, module=None, user=None): 48 return module.type == "linguistic-features" 49 50 51 """ 52 53 #: Database handler to interface with the 4CAT database 54 db = None 55 56 #: Job object that requests the execution of this processor 57 job = None 58 59 #: The dataset object that the processor is *creating*. 60 dataset = None 61 62 #: Owner (username) of the dataset 63 owner = None 64 65 #: The dataset object that the processor is *processing*. 66 source_dataset = None 67 68 #: The file that is being processed 69 source_file = None 70 71 #: Processor description, which will be displayed in the web interface 72 description = "No description available" 73 74 #: Category identifier, used to group processors in the web interface 75 category = "Other" 76 77 #: Extension of the file created by the processor 78 extension = "csv" 79 80 #: 4CAT settings from the perspective of the dataset's owner 81 config = None 82 83 #: Is this processor running 'within' a preset processor? 84 is_running_in_preset = False 85 86 #: Is this processor hidden in the front-end, and only used internally/in presets? 87 is_hidden = False 88 89 #: This will be defined automatically upon loading the processor. There is 90 #: no need to override manually 91 filepath = None 92 93 def work(self): 94 """ 95 Process a dataset 96 97 Loads dataset metadata, sets up the scaffolding for performing some kind 98 of processing on that dataset, and then processes it. Afterwards, clean 99 up. 100 """ 101 try: 102 # a dataset can have multiple owners, but the creator is the user 103 # that actually queued the processor, so their config is relevant 104 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 105 self.owner = self.dataset.creator 106 except DataSetException as e: 107 # query has been deleted in the meantime. finish without error, 108 # as deleting it will have been a conscious choice by a user 109 self.job.finish() 110 return 111 112 # set up config reader using the worker's DB connection and the dataset 113 # creator. This ensures that if a value has been overriden for the owner, 114 # the overridden value is used instead. 115 config.with_db(self.db) 116 self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner)) 117 118 if self.dataset.data.get("key_parent", None): 119 # search workers never have parents (for now), so we don't need to 120 # find out what the source_dataset dataset is if it's a search worker 121 try: 122 self.source_dataset = self.dataset.get_parent() 123 124 # for presets, transparently use the *top* dataset as a source_dataset 125 # since that is where any underlying processors should get 126 # their data from. However, this should only be done as long as the 127 # preset is not finished yet, because after that there may be processors 128 # that run on the final preset result 129 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 130 self.is_running_in_preset = True 131 self.source_dataset = self.source_dataset.get_parent() 132 if self.source_dataset is None: 133 # this means there is no dataset that is *not* a preset anywhere 134 # above this dataset. This should never occur, but if it does, we 135 # cannot continue 136 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 137 (self.type, self.dataset.key)) 138 self.job.finish() 139 return 140 141 except DataSetException: 142 # we need to know what the source_dataset dataset was to properly handle the 143 # analysis 144 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 145 self.type, self.dataset.key)) 146 self.job.finish() 147 return 148 149 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 150 # not finished yet - retry after a while 151 # exception for presets, since these *should* be unfinished 152 # until underlying processors are done 153 self.job.release(delay=30) 154 return 155 156 self.source_file = self.source_dataset.get_results_path() 157 if not self.source_file.exists(): 158 self.dataset.update_status("Finished, no input data found.") 159 160 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 161 162 processor_name = self.title if hasattr(self, "title") else self.type 163 self.dataset.clear_log() 164 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 165 166 # start log file 167 self.dataset.update_status("Processing data") 168 self.dataset.update_version(get_software_commit(self)) 169 170 # get parameters 171 # if possible, fill defaults where parameters are not provided 172 given_parameters = self.dataset.parameters.copy() 173 all_parameters = self.get_options(self.dataset) 174 self.parameters = { 175 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 176 for param in [*all_parameters.keys(), *given_parameters.keys()] 177 } 178 179 # now the parameters have been loaded into memory, clear any sensitive 180 # ones. This has a side-effect that a processor may not run again 181 # without starting from scratch, but this is the price of progress 182 options = self.get_options(self.dataset.get_parent()) 183 for option, option_settings in options.items(): 184 if option_settings.get("sensitive"): 185 self.dataset.delete_parameter(option) 186 187 if self.interrupted: 188 self.dataset.log("Processing interrupted, trying again later") 189 return self.abort() 190 191 if not self.dataset.is_finished(): 192 try: 193 self.process() 194 self.after_process() 195 except WorkerInterruptedException as e: 196 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 197 self.abort() 198 except Exception as e: 199 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 200 frames = traceback.extract_tb(e.__traceback__) 201 last_frame = frames[-1] 202 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames[1:]] 203 location = "->".join(frames) 204 205 # Not all datasets have source_dataset keys 206 if len(self.dataset.get_genealogy()) > 1: 207 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 208 else: 209 parent_key = "" 210 211 # remove any result files that have been created so far 212 self.remove_files() 213 214 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 215 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=last_frame) 216 else: 217 # dataset already finished, job shouldn't be open anymore 218 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (self.job.data["jobtype"], self.job.data["remote_id"])) 219 self.job.finish() 220 221 222 def after_process(self): 223 """ 224 Run after processing the dataset 225 226 This method cleans up temporary files, and if needed, handles logistics 227 concerning the result file, e.g. running a pre-defined processor on the 228 result, copying it to another dataset, and so on. 229 """ 230 if self.dataset.data["num_rows"] > 0: 231 self.dataset.update_status("Dataset completed.") 232 233 if not self.dataset.is_finished(): 234 self.dataset.finish() 235 236 self.dataset.remove_staging_areas() 237 238 # see if we have anything else lined up to run next 239 for next in self.parameters.get("next", []): 240 can_run_next = True 241 next_parameters = next.get("parameters", {}) 242 next_type = next.get("type", "") 243 try: 244 available_processors = self.dataset.get_available_processors(user=self.dataset.creator) 245 except ValueError: 246 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 247 break 248 249 # run it only if the post-processor is actually available for this query 250 if self.dataset.data["num_rows"] <= 0: 251 can_run_next = False 252 self.log.info("Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (next_type, self.dataset.key)) 253 254 elif next_type in available_processors: 255 next_analysis = DataSet( 256 parameters=next_parameters, 257 type=next_type, 258 db=self.db, 259 parent=self.dataset.key, 260 extension=available_processors[next_type].extension, 261 is_private=self.dataset.is_private, 262 owner=self.dataset.creator, 263 modules=self.modules 264 ) 265 self.queue.add_job(next_type, remote_id=next_analysis.key) 266 else: 267 can_run_next = False 268 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (self.dataset.key, self.type, next_type)) 269 270 if not can_run_next: 271 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 272 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 273 if "attach_to" in self.parameters: 274 # Probably should not happen, but for some reason a mid processor has been designated as the processor 275 # the parent should attach to 276 pass 277 else: 278 # Check for "attach_to" parameter in descendents 279 while True: 280 if "attach_to" in next_parameters: 281 self.parameters["attach_to"] = next_parameters["attach_to"] 282 break 283 else: 284 if "next" in next_parameters: 285 next_parameters = next_parameters["next"][0]["parameters"] 286 else: 287 # No more descendents 288 # Should not happen; we cannot find the source dataset 289 self.log.warning("Cannot find preset's source dataset for dataset %s" % self.dataset.key) 290 break 291 292 # see if we need to register the result somewhere 293 if "copy_to" in self.parameters: 294 # copy the results to an arbitrary place that was passed 295 if self.dataset.get_results_path().exists(): 296 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 297 else: 298 # if copy_to was passed, that means it's important that this 299 # file exists somewhere, so we create it as an empty file 300 with open(self.parameters["copy_to"], "w") as empty_file: 301 empty_file.write("") 302 303 # see if this query chain is to be attached to another query 304 # if so, the full genealogy of this query (minus the original dataset) 305 # is attached to the given query - this is mostly useful for presets, 306 # where a chain of processors can be marked as 'underlying' a preset 307 if "attach_to" in self.parameters: 308 try: 309 # copy metadata and results to the surrogate 310 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db) 311 312 if self.dataset.get_results_path().exists(): 313 # Update the surrogate's results file suffix to match this dataset's suffix 314 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(self.dataset.get_results_path().suffix) 315 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 316 317 try: 318 surrogate.finish(self.dataset.data["num_rows"]) 319 except RuntimeError: 320 # already finished, could happen (though it shouldn't) 321 pass 322 323 surrogate.update_status(self.dataset.get_status()) 324 325 except ValueError: 326 # dataset with key to attach to doesn't exist... 327 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % ( 328 self.dataset.key, self.parameters["attach_to"])) 329 330 self.job.finish() 331 332 if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 333 owner = self.dataset.get_parameters().get("email-complete", False) 334 # Check that username is email address 335 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 336 from email.mime.multipart import MIMEMultipart 337 from email.mime.text import MIMEText 338 from smtplib import SMTPException 339 import socket 340 import html2text 341 342 self.log.debug("Sending email to %s" % owner) 343 dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key 344 sender = config.get('mail.noreply') 345 message = MIMEMultipart("alternative") 346 message["From"] = sender 347 message["To"] = owner 348 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 349 mail = """ 350 <p>Hello %s,</p> 351 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 352 <p>You can view your dataset via the following link:</p> 353 <p><a href="%s">%s</a></p> 354 <p>Sincerely,</p> 355 <p>Your friendly neighborhood 4CAT admin</p> 356 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 357 html_parser = html2text.HTML2Text() 358 message.attach(MIMEText(html_parser.handle(mail), "plain")) 359 message.attach(MIMEText(mail, "html")) 360 try: 361 send_email([owner], message) 362 except (SMTPException, ConnectionRefusedError, socket.timeout) as e: 363 self.log.error("Error sending email to %s" % owner) 364 365 def remove_files(self): 366 """ 367 Clean up result files and any staging files for processor to be attempted 368 later if desired. 369 """ 370 # Remove the results file that was created 371 if self.dataset.get_results_path().exists(): 372 self.dataset.get_results_path().unlink() 373 if self.dataset.get_results_folder_path().exists(): 374 shutil.rmtree(self.dataset.get_results_folder_path()) 375 376 # Remove any staging areas with temporary data 377 self.dataset.remove_staging_areas() 378 379 def abort(self): 380 """ 381 Abort dataset creation and clean up so it may be attempted again later 382 """ 383 # remove any result files that have been created so far 384 self.remove_files() 385 386 # we release instead of finish, since interrupting is just that - the 387 # job should resume at a later point. Delay resuming by 10 seconds to 388 # give 4CAT the time to do whatever it wants (though usually this isn't 389 # needed since restarting also stops the spawning of new workers) 390 if self.interrupted == self.INTERRUPT_RETRY: 391 # retry later - wait at least 10 seconds to give the backend time to shut down 392 self.job.release(delay=10) 393 elif self.interrupted == self.INTERRUPT_CANCEL: 394 # cancel job 395 self.job.finish() 396 397 def add_field_to_parent(self, field_name, new_data, which_parent=source_dataset, update_existing=False): 398 """ 399 This function adds a new field to the parent dataset. Expects a list of data points, one for each item 400 in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used 401 to overwrite an existing field. 402 403 TODO: could be improved by accepting different types of data depending on csv or ndjson. 404 405 :param str field_name: name of the desired 406 :param List new_data: List of data to be added to parent dataset 407 :param DataSet which_parent: DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent()) 408 :param bool update_existing: False (default) will raise an error if the field_name already exists 409 True will allow updating existing data 410 """ 411 if len(new_data) < 1: 412 # no data 413 raise ProcessorException('No data provided') 414 415 if not hasattr(self, "source_dataset") and which_parent is not None: 416 # no source to update 417 raise ProcessorException('No source dataset to update') 418 419 # Get the source file data path 420 parent_path = which_parent.get_results_path() 421 422 if len(new_data) != which_parent.num_rows: 423 raise ProcessorException('Must have new data point for each record: parent dataset: %i, new data points: %i' % (which_parent.num_rows, len(new_data))) 424 425 self.dataset.update_status("Adding new field %s to the source file" % field_name) 426 427 # Get a temporary path where we can store the data 428 tmp_path = self.dataset.get_staging_area() 429 tmp_file_path = tmp_path.joinpath(parent_path.name) 430 431 # go through items one by one, optionally mapping them 432 if parent_path.suffix.lower() == ".csv": 433 # Get field names 434 fieldnames = which_parent.get_columns() 435 if not update_existing and field_name in fieldnames: 436 raise ProcessorException('field_name %s already exists!' % field_name) 437 fieldnames.append(field_name) 438 439 # Iterate through the original dataset and add values to a new column 440 self.dataset.update_status("Writing new source file with %s." % field_name) 441 with tmp_file_path.open("w", encoding="utf-8", newline="") as output: 442 writer = csv.DictWriter(output, fieldnames=fieldnames) 443 writer.writeheader() 444 445 for count, post in enumerate(which_parent.iterate_items(self)): 446 # stop processing if worker has been asked to stop 447 if self.interrupted: 448 raise ProcessorInterruptedException("Interrupted while writing CSV file") 449 450 post.original[field_name] = new_data[count] 451 writer.writerow(post.original) 452 453 elif parent_path.suffix.lower() == ".ndjson": 454 # JSON cannot encode sets 455 if type(new_data[0]) is set: 456 # could check each if type(datapoint) is set, but that could be extensive... 457 new_data = [list(datapoint) for datapoint in new_data] 458 459 with tmp_file_path.open("w", encoding="utf-8", newline="") as output: 460 for count, post in enumerate(which_parent.iterate_items(self)): 461 # stop processing if worker has been asked to stop 462 if self.interrupted: 463 raise ProcessorInterruptedException("Interrupted while writing NDJSON file") 464 465 if not update_existing and field_name in post.original.keys(): 466 raise ProcessorException('field_name %s already exists!' % field_name) 467 468 # Update data 469 post.original[field_name] = new_data[count] 470 471 output.write(json.dumps(post.original) + "\n") 472 else: 473 raise NotImplementedError("Cannot iterate through %s file" % parent_path.suffix) 474 475 # Replace the source file path with the new file 476 shutil.copy(str(tmp_file_path), str(parent_path)) 477 478 # delete temporary files and folder 479 shutil.rmtree(tmp_path) 480 481 self.dataset.update_status("Parent dataset updated.") 482 483 def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]): 484 """ 485 A generator that iterates through files in an archive 486 487 With every iteration, the processor's 'interrupted' flag is checked, 488 and if set a ProcessorInterruptedException is raised, which by default 489 is caught and subsequently stops execution gracefully. 490 491 Files are temporarily unzipped and deleted after use. 492 493 :param Path path: Path to zip file to read 494 :param Path staging_area: Where to store the files while they're 495 being worked with. If omitted, a temporary folder is created and 496 deleted after use 497 :param bool immediately_delete: Temporary files are removed after yielded; 498 False keeps files until the staging_area is removed (usually during processor 499 cleanup) 500 :param list filename_filter: Whitelist of filenames to iterate. 501 Other files will be ignored. If empty, do not ignore anything. 502 :return: An iterator with a Path item for each file 503 """ 504 505 if not path.exists(): 506 return 507 508 if not staging_area: 509 staging_area = self.dataset.get_staging_area() 510 511 if not staging_area.exists() or not staging_area.is_dir(): 512 raise RuntimeError("Staging area %s is not a valid folder") 513 514 with zipfile.ZipFile(path, "r") as archive_file: 515 archive_contents = sorted(archive_file.namelist()) 516 517 for archived_file in archive_contents: 518 if filename_filter and archived_file not in filename_filter: 519 continue 520 521 info = archive_file.getinfo(archived_file) 522 if info.is_dir(): 523 continue 524 525 if self.interrupted: 526 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 527 528 temp_file = staging_area.joinpath(archived_file) 529 archive_file.extract(archived_file, staging_area) 530 531 yield temp_file 532 if immediately_delete: 533 temp_file.unlink() 534 535 def unpack_archive_contents(self, path, staging_area=None): 536 """ 537 Unpack all files in an archive to a staging area 538 539 With every iteration, the processor's 'interrupted' flag is checked, 540 and if set a ProcessorInterruptedException is raised, which by default 541 is caught and subsequently stops execution gracefully. 542 543 Files are unzipped to a staging area. The staging area is *not* 544 cleaned up automatically. 545 546 :param Path path: Path to zip file to read 547 :param Path staging_area: Where to store the files while they're 548 being worked with. If omitted, a temporary folder is created and 549 deleted after use 550 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 551 :return Path: A path to the staging area 552 """ 553 554 if not path.exists(): 555 return 556 557 if not staging_area: 558 staging_area = self.dataset.get_staging_area() 559 560 if not staging_area.exists() or not staging_area.is_dir(): 561 raise RuntimeError("Staging area %s is not a valid folder") 562 563 paths = [] 564 with zipfile.ZipFile(path, "r") as archive_file: 565 archive_contents = sorted(archive_file.namelist()) 566 567 for archived_file in archive_contents: 568 if self.interrupted: 569 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 570 571 file_name = archived_file.split("/")[-1] 572 temp_file = staging_area.joinpath(file_name) 573 archive_file.extract(archived_file, staging_area) 574 paths.append(temp_file) 575 576 return staging_area 577 578 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 579 """ 580 Extract a file from an archive by name 581 582 :param str filename: Name of file to extract 583 :param Path archive_path: Path to zip file to read 584 :param Path staging_area: Where to store the files while they're 585 being worked with. If omitted, a temporary folder is created 586 :return Path: A path to the extracted file 587 """ 588 if not archive_path.exists(): 589 return 590 591 if not staging_area: 592 staging_area = self.dataset.get_staging_area() 593 594 if not staging_area.exists() or not staging_area.is_dir(): 595 raise RuntimeError("Staging area %s is not a valid folder") 596 597 with zipfile.ZipFile(archive_path, "r") as archive_file: 598 if filename not in archive_file.namelist(): 599 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 600 else: 601 archive_file.extract(filename, staging_area) 602 return staging_area.joinpath(filename) 603 604 def write_csv_items_and_finish(self, data): 605 """ 606 Write data as csv to results file and finish dataset 607 608 Determines result file path using dataset's path determination helper 609 methods. After writing results, the dataset is marked finished. Will 610 raise a ProcessorInterruptedException if the interrupted flag for this 611 processor is set while iterating. 612 613 :param data: A list or tuple of dictionaries, all with the same keys 614 """ 615 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 616 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 617 618 if not data: 619 raise ValueError("write_csv_items requires a dictionary with at least one item") 620 621 self.dataset.update_status("Writing results file") 622 writer = False 623 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 624 for row in data: 625 if self.interrupted: 626 raise ProcessorInterruptedException("Interrupted while writing results file") 627 628 row = remove_nuls(row) 629 if not writer: 630 writer = csv.DictWriter(results, fieldnames=row.keys()) 631 writer.writeheader() 632 633 writer.writerow(row) 634 635 self.dataset.update_status("Finished") 636 self.dataset.finish(len(data)) 637 638 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 639 """ 640 Archive a bunch of files into a zip archive and finish processing 641 642 :param list|Path files: If a list, all files will be added to the 643 archive and deleted afterwards. If a folder, all files in the folder 644 will be added and the folder will be deleted afterwards. 645 :param int num_items: Items in the dataset. If None, the amount of 646 files added to the archive will be used. 647 :param int compression: Type of compression to use. By default, files 648 are not compressed, to speed up unarchiving. 649 :param bool finish: Finish the dataset/job afterwards or not? 650 """ 651 is_folder = False 652 if issubclass(type(files), PurePath): 653 is_folder = files 654 if not files.exists() or not files.is_dir(): 655 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 656 657 files = files.glob("*") 658 659 # create zip of archive and delete temporary files and folder 660 self.dataset.update_status("Compressing results into archive") 661 done = 0 662 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 663 for output_path in files: 664 zip.write(output_path, output_path.name) 665 output_path.unlink() 666 done += 1 667 668 # delete temporary folder 669 if is_folder: 670 shutil.rmtree(is_folder) 671 672 self.dataset.update_status("Finished") 673 if num_items is None: 674 num_items = done 675 676 if finish: 677 self.dataset.finish(num_items) 678 679 def create_standalone(self): 680 """ 681 Copy this dataset and make that copy standalone 682 683 This has the benefit of allowing for all analyses that can be run on 684 full datasets on the new, filtered copy as well. 685 686 :return DataSet: The new standalone dataset 687 """ 688 top_parent = self.source_dataset 689 690 finished = self.dataset.check_dataset_finished() 691 if finished == 'empty': 692 # No data to process, so we can't create a standalone dataset 693 return 694 elif finished is None: 695 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 696 pass 697 698 standalone = self.dataset.copy(shallow=False) 699 standalone.body_match = "(Filtered) " + top_parent.query 700 standalone.datasource = top_parent.parameters.get("datasource", "custom") 701 702 try: 703 standalone.board = top_parent.board 704 except AttributeError: 705 standalone.board = self.type 706 707 standalone.type = top_parent.type 708 709 standalone.detach() 710 standalone.delete_parameter("key_parent") 711 712 self.dataset.copied_to = standalone.key 713 714 # we don't need this file anymore - it has been copied to the new 715 # standalone dataset, and this one is not accessible via the interface 716 # except as a link to the copied standalone dataset 717 os.unlink(self.dataset.get_results_path()) 718 719 # Copy the log 720 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 721 722 return standalone 723 724 @classmethod 725 def map_item_method_available(cls, dataset): 726 """ 727 Check if this processor can use map_item 728 729 Checks if map_item method exists and is compatible with dataset. If 730 dataset has a different extension than the default for this processor, 731 or if the dataset has no extension, this means we cannot be sure the 732 data is in the right format to be mapped, so `False` is returned in 733 that case even if a map_item() method is available. 734 735 :param BasicProcessor processor: The BasicProcessor subclass object 736 with which to use map_item 737 :param DataSet dataset: The DataSet object with which to 738 use map_item 739 """ 740 # only run item mapper if extension of processor == extension of 741 # data file, for the scenario where a csv file was uploaded and 742 # converted to an ndjson-based data source, for example 743 # todo: this is kind of ugly, and a better fix may be possible 744 dataset_extension = dataset.get_extension() 745 if not dataset_extension: 746 # DataSet results file does not exist or has no extension, use expected extension 747 if hasattr(dataset, "extension"): 748 dataset_extension = dataset.extension 749 else: 750 # No known DataSet extension; cannot determine if map_item method compatible 751 return False 752 753 return hasattr(cls, "map_item") and cls.extension == dataset_extension 754 755 @classmethod 756 def get_mapped_item(cls, item): 757 """ 758 Get the mapped item using a processors map_item method. 759 760 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 761 """ 762 try: 763 mapped_item = cls.map_item(item) 764 except (KeyError, IndexError) as e: 765 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 766 767 if not mapped_item: 768 raise MapItemException("Unable to map item!") 769 770 return mapped_item 771 772 @classmethod 773 def is_filter(cls): 774 """ 775 Is this processor a filter? 776 777 Filters do not produce their own dataset but replace the source_dataset dataset 778 instead. 779 780 :todo: Make this a bit more robust than sniffing the processor category 781 :return bool: 782 """ 783 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower() 784 785 @classmethod 786 def get_options(cls, parent_dataset=None, user=None): 787 """ 788 Get processor options 789 790 This method by default returns the class's "options" attribute, or an 791 empty dictionary. It can be redefined by processors that need more 792 fine-grained options, e.g. in cases where the availability of options 793 is partially determined by the parent dataset's parameters. 794 795 :param DataSet parent_dataset: An object representing the dataset that 796 the processor would be run on 797 :param User user: Flask user the options will be displayed for, in 798 case they are requested for display in the 4CAT web interface. This can 799 be used to show some options only to privileges users. 800 """ 801 return cls.options if hasattr(cls, "options") else {} 802 803 @classmethod 804 def get_status(cls): 805 """ 806 Get processor status 807 808 :return list: Statuses of this processor 809 """ 810 return cls.status if hasattr(cls, "status") else None 811 812 @classmethod 813 def is_top_dataset(cls): 814 """ 815 Confirm this is *not* a top dataset, but a processor. 816 817 Used for processor compatibility checks. 818 819 :return bool: Always `False`, because this is a processor. 820 """ 821 return False 822 823 @classmethod 824 def is_from_collector(cls): 825 """ 826 Check if this processor is one that collects data, i.e. a search or 827 import worker. 828 829 :return bool: 830 """ 831 return cls.type.endswith("-search") or cls.type.endswith("-import") 832 833 @classmethod 834 def get_extension(self, parent_dataset=None): 835 """ 836 Return the extension of the processor's dataset 837 838 Used for processor compatibility checks. 839 840 :param DataSet parent_dataset: An object representing the dataset that 841 the processor would be run on 842 :return str|None: Dataset extension (without leading `.`) or `None`. 843 """ 844 if self.is_filter(): 845 if parent_dataset is not None: 846 # Filters should use the same extension as the parent dataset 847 return parent_dataset.get_extension() 848 else: 849 # No dataset provided, unable to determine extension of parent dataset 850 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 851 return None 852 elif self.extension: 853 # Use explicitly defined extension in class (Processor class defaults to "csv") 854 return self.extension 855 else: 856 # A non filter processor updated the base Processor extension to None/False? 857 return None 858 859 @classmethod 860 def is_rankable(cls, multiple_items=True): 861 """ 862 Used for processor compatibility 863 864 :param bool multiple_items: Consider datasets with multiple items per 865 item (e.g. word_1, word_2, etc)? Included for compatibility 866 """ 867 return False 868 869 @classmethod 870 def exclude_followup_processors(cls, processor_type=None): 871 """ 872 Used for processor compatibility 873 874 To be defined by the child processor if it should exclude certain follow-up processors. 875 e.g.: 876 877 def exclude_followup_processors(cls, processor_type): 878 if processor_type in ["undesirable-followup-processor"]: 879 return True 880 return False 881 882 :param str processor_type: Processor type to exclude 883 :return bool: True if processor should be excluded, False otherwise 884 """ 885 return False 886 887 @abc.abstractmethod 888 def process(self): 889 """ 890 Process data 891 892 To be defined by the child processor. 893 """ 894 pass 895 896 @staticmethod 897 def is_4cat_processor(): 898 """ 899 Is this a 4CAT processor? 900 901 This is used to determine whether a class is a 4CAT 902 processor. 903 904 :return: True 905 """ 906 return True
30class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta): 31 """ 32 Abstract processor class 33 34 A processor takes a finished dataset as input and processes its result in 35 some way, with another dataset set as output. The input thus is a file, and 36 the output (usually) as well. In other words, the result of a processor can 37 be used as input for another processor (though whether and when this is 38 useful is another question). 39 40 To determine whether a processor can process a given dataset, you can 41 define a `is_compatible_with(FourcatModule module=None, str user=None):) -> bool` class 42 method which takes a dataset as argument and returns a bool that determines 43 if this processor is considered compatible with that dataset. For example: 44 45 .. code-block:: python 46 47 @classmethod 48 def is_compatible_with(cls, module=None, user=None): 49 return module.type == "linguistic-features" 50 51 52 """ 53 54 #: Database handler to interface with the 4CAT database 55 db = None 56 57 #: Job object that requests the execution of this processor 58 job = None 59 60 #: The dataset object that the processor is *creating*. 61 dataset = None 62 63 #: Owner (username) of the dataset 64 owner = None 65 66 #: The dataset object that the processor is *processing*. 67 source_dataset = None 68 69 #: The file that is being processed 70 source_file = None 71 72 #: Processor description, which will be displayed in the web interface 73 description = "No description available" 74 75 #: Category identifier, used to group processors in the web interface 76 category = "Other" 77 78 #: Extension of the file created by the processor 79 extension = "csv" 80 81 #: 4CAT settings from the perspective of the dataset's owner 82 config = None 83 84 #: Is this processor running 'within' a preset processor? 85 is_running_in_preset = False 86 87 #: Is this processor hidden in the front-end, and only used internally/in presets? 88 is_hidden = False 89 90 #: This will be defined automatically upon loading the processor. There is 91 #: no need to override manually 92 filepath = None 93 94 def work(self): 95 """ 96 Process a dataset 97 98 Loads dataset metadata, sets up the scaffolding for performing some kind 99 of processing on that dataset, and then processes it. Afterwards, clean 100 up. 101 """ 102 try: 103 # a dataset can have multiple owners, but the creator is the user 104 # that actually queued the processor, so their config is relevant 105 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 106 self.owner = self.dataset.creator 107 except DataSetException as e: 108 # query has been deleted in the meantime. finish without error, 109 # as deleting it will have been a conscious choice by a user 110 self.job.finish() 111 return 112 113 # set up config reader using the worker's DB connection and the dataset 114 # creator. This ensures that if a value has been overriden for the owner, 115 # the overridden value is used instead. 116 config.with_db(self.db) 117 self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner)) 118 119 if self.dataset.data.get("key_parent", None): 120 # search workers never have parents (for now), so we don't need to 121 # find out what the source_dataset dataset is if it's a search worker 122 try: 123 self.source_dataset = self.dataset.get_parent() 124 125 # for presets, transparently use the *top* dataset as a source_dataset 126 # since that is where any underlying processors should get 127 # their data from. However, this should only be done as long as the 128 # preset is not finished yet, because after that there may be processors 129 # that run on the final preset result 130 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 131 self.is_running_in_preset = True 132 self.source_dataset = self.source_dataset.get_parent() 133 if self.source_dataset is None: 134 # this means there is no dataset that is *not* a preset anywhere 135 # above this dataset. This should never occur, but if it does, we 136 # cannot continue 137 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 138 (self.type, self.dataset.key)) 139 self.job.finish() 140 return 141 142 except DataSetException: 143 # we need to know what the source_dataset dataset was to properly handle the 144 # analysis 145 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 146 self.type, self.dataset.key)) 147 self.job.finish() 148 return 149 150 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 151 # not finished yet - retry after a while 152 # exception for presets, since these *should* be unfinished 153 # until underlying processors are done 154 self.job.release(delay=30) 155 return 156 157 self.source_file = self.source_dataset.get_results_path() 158 if not self.source_file.exists(): 159 self.dataset.update_status("Finished, no input data found.") 160 161 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 162 163 processor_name = self.title if hasattr(self, "title") else self.type 164 self.dataset.clear_log() 165 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 166 167 # start log file 168 self.dataset.update_status("Processing data") 169 self.dataset.update_version(get_software_commit(self)) 170 171 # get parameters 172 # if possible, fill defaults where parameters are not provided 173 given_parameters = self.dataset.parameters.copy() 174 all_parameters = self.get_options(self.dataset) 175 self.parameters = { 176 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 177 for param in [*all_parameters.keys(), *given_parameters.keys()] 178 } 179 180 # now the parameters have been loaded into memory, clear any sensitive 181 # ones. This has a side-effect that a processor may not run again 182 # without starting from scratch, but this is the price of progress 183 options = self.get_options(self.dataset.get_parent()) 184 for option, option_settings in options.items(): 185 if option_settings.get("sensitive"): 186 self.dataset.delete_parameter(option) 187 188 if self.interrupted: 189 self.dataset.log("Processing interrupted, trying again later") 190 return self.abort() 191 192 if not self.dataset.is_finished(): 193 try: 194 self.process() 195 self.after_process() 196 except WorkerInterruptedException as e: 197 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 198 self.abort() 199 except Exception as e: 200 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 201 frames = traceback.extract_tb(e.__traceback__) 202 last_frame = frames[-1] 203 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames[1:]] 204 location = "->".join(frames) 205 206 # Not all datasets have source_dataset keys 207 if len(self.dataset.get_genealogy()) > 1: 208 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 209 else: 210 parent_key = "" 211 212 # remove any result files that have been created so far 213 self.remove_files() 214 215 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 216 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=last_frame) 217 else: 218 # dataset already finished, job shouldn't be open anymore 219 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (self.job.data["jobtype"], self.job.data["remote_id"])) 220 self.job.finish() 221 222 223 def after_process(self): 224 """ 225 Run after processing the dataset 226 227 This method cleans up temporary files, and if needed, handles logistics 228 concerning the result file, e.g. running a pre-defined processor on the 229 result, copying it to another dataset, and so on. 230 """ 231 if self.dataset.data["num_rows"] > 0: 232 self.dataset.update_status("Dataset completed.") 233 234 if not self.dataset.is_finished(): 235 self.dataset.finish() 236 237 self.dataset.remove_staging_areas() 238 239 # see if we have anything else lined up to run next 240 for next in self.parameters.get("next", []): 241 can_run_next = True 242 next_parameters = next.get("parameters", {}) 243 next_type = next.get("type", "") 244 try: 245 available_processors = self.dataset.get_available_processors(user=self.dataset.creator) 246 except ValueError: 247 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 248 break 249 250 # run it only if the post-processor is actually available for this query 251 if self.dataset.data["num_rows"] <= 0: 252 can_run_next = False 253 self.log.info("Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (next_type, self.dataset.key)) 254 255 elif next_type in available_processors: 256 next_analysis = DataSet( 257 parameters=next_parameters, 258 type=next_type, 259 db=self.db, 260 parent=self.dataset.key, 261 extension=available_processors[next_type].extension, 262 is_private=self.dataset.is_private, 263 owner=self.dataset.creator, 264 modules=self.modules 265 ) 266 self.queue.add_job(next_type, remote_id=next_analysis.key) 267 else: 268 can_run_next = False 269 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (self.dataset.key, self.type, next_type)) 270 271 if not can_run_next: 272 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 273 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 274 if "attach_to" in self.parameters: 275 # Probably should not happen, but for some reason a mid processor has been designated as the processor 276 # the parent should attach to 277 pass 278 else: 279 # Check for "attach_to" parameter in descendents 280 while True: 281 if "attach_to" in next_parameters: 282 self.parameters["attach_to"] = next_parameters["attach_to"] 283 break 284 else: 285 if "next" in next_parameters: 286 next_parameters = next_parameters["next"][0]["parameters"] 287 else: 288 # No more descendents 289 # Should not happen; we cannot find the source dataset 290 self.log.warning("Cannot find preset's source dataset for dataset %s" % self.dataset.key) 291 break 292 293 # see if we need to register the result somewhere 294 if "copy_to" in self.parameters: 295 # copy the results to an arbitrary place that was passed 296 if self.dataset.get_results_path().exists(): 297 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 298 else: 299 # if copy_to was passed, that means it's important that this 300 # file exists somewhere, so we create it as an empty file 301 with open(self.parameters["copy_to"], "w") as empty_file: 302 empty_file.write("") 303 304 # see if this query chain is to be attached to another query 305 # if so, the full genealogy of this query (minus the original dataset) 306 # is attached to the given query - this is mostly useful for presets, 307 # where a chain of processors can be marked as 'underlying' a preset 308 if "attach_to" in self.parameters: 309 try: 310 # copy metadata and results to the surrogate 311 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db) 312 313 if self.dataset.get_results_path().exists(): 314 # Update the surrogate's results file suffix to match this dataset's suffix 315 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(self.dataset.get_results_path().suffix) 316 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 317 318 try: 319 surrogate.finish(self.dataset.data["num_rows"]) 320 except RuntimeError: 321 # already finished, could happen (though it shouldn't) 322 pass 323 324 surrogate.update_status(self.dataset.get_status()) 325 326 except ValueError: 327 # dataset with key to attach to doesn't exist... 328 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % ( 329 self.dataset.key, self.parameters["attach_to"])) 330 331 self.job.finish() 332 333 if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 334 owner = self.dataset.get_parameters().get("email-complete", False) 335 # Check that username is email address 336 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 337 from email.mime.multipart import MIMEMultipart 338 from email.mime.text import MIMEText 339 from smtplib import SMTPException 340 import socket 341 import html2text 342 343 self.log.debug("Sending email to %s" % owner) 344 dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key 345 sender = config.get('mail.noreply') 346 message = MIMEMultipart("alternative") 347 message["From"] = sender 348 message["To"] = owner 349 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 350 mail = """ 351 <p>Hello %s,</p> 352 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 353 <p>You can view your dataset via the following link:</p> 354 <p><a href="%s">%s</a></p> 355 <p>Sincerely,</p> 356 <p>Your friendly neighborhood 4CAT admin</p> 357 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 358 html_parser = html2text.HTML2Text() 359 message.attach(MIMEText(html_parser.handle(mail), "plain")) 360 message.attach(MIMEText(mail, "html")) 361 try: 362 send_email([owner], message) 363 except (SMTPException, ConnectionRefusedError, socket.timeout) as e: 364 self.log.error("Error sending email to %s" % owner) 365 366 def remove_files(self): 367 """ 368 Clean up result files and any staging files for processor to be attempted 369 later if desired. 370 """ 371 # Remove the results file that was created 372 if self.dataset.get_results_path().exists(): 373 self.dataset.get_results_path().unlink() 374 if self.dataset.get_results_folder_path().exists(): 375 shutil.rmtree(self.dataset.get_results_folder_path()) 376 377 # Remove any staging areas with temporary data 378 self.dataset.remove_staging_areas() 379 380 def abort(self): 381 """ 382 Abort dataset creation and clean up so it may be attempted again later 383 """ 384 # remove any result files that have been created so far 385 self.remove_files() 386 387 # we release instead of finish, since interrupting is just that - the 388 # job should resume at a later point. Delay resuming by 10 seconds to 389 # give 4CAT the time to do whatever it wants (though usually this isn't 390 # needed since restarting also stops the spawning of new workers) 391 if self.interrupted == self.INTERRUPT_RETRY: 392 # retry later - wait at least 10 seconds to give the backend time to shut down 393 self.job.release(delay=10) 394 elif self.interrupted == self.INTERRUPT_CANCEL: 395 # cancel job 396 self.job.finish() 397 398 def add_field_to_parent(self, field_name, new_data, which_parent=source_dataset, update_existing=False): 399 """ 400 This function adds a new field to the parent dataset. Expects a list of data points, one for each item 401 in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used 402 to overwrite an existing field. 403 404 TODO: could be improved by accepting different types of data depending on csv or ndjson. 405 406 :param str field_name: name of the desired 407 :param List new_data: List of data to be added to parent dataset 408 :param DataSet which_parent: DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent()) 409 :param bool update_existing: False (default) will raise an error if the field_name already exists 410 True will allow updating existing data 411 """ 412 if len(new_data) < 1: 413 # no data 414 raise ProcessorException('No data provided') 415 416 if not hasattr(self, "source_dataset") and which_parent is not None: 417 # no source to update 418 raise ProcessorException('No source dataset to update') 419 420 # Get the source file data path 421 parent_path = which_parent.get_results_path() 422 423 if len(new_data) != which_parent.num_rows: 424 raise ProcessorException('Must have new data point for each record: parent dataset: %i, new data points: %i' % (which_parent.num_rows, len(new_data))) 425 426 self.dataset.update_status("Adding new field %s to the source file" % field_name) 427 428 # Get a temporary path where we can store the data 429 tmp_path = self.dataset.get_staging_area() 430 tmp_file_path = tmp_path.joinpath(parent_path.name) 431 432 # go through items one by one, optionally mapping them 433 if parent_path.suffix.lower() == ".csv": 434 # Get field names 435 fieldnames = which_parent.get_columns() 436 if not update_existing and field_name in fieldnames: 437 raise ProcessorException('field_name %s already exists!' % field_name) 438 fieldnames.append(field_name) 439 440 # Iterate through the original dataset and add values to a new column 441 self.dataset.update_status("Writing new source file with %s." % field_name) 442 with tmp_file_path.open("w", encoding="utf-8", newline="") as output: 443 writer = csv.DictWriter(output, fieldnames=fieldnames) 444 writer.writeheader() 445 446 for count, post in enumerate(which_parent.iterate_items(self)): 447 # stop processing if worker has been asked to stop 448 if self.interrupted: 449 raise ProcessorInterruptedException("Interrupted while writing CSV file") 450 451 post.original[field_name] = new_data[count] 452 writer.writerow(post.original) 453 454 elif parent_path.suffix.lower() == ".ndjson": 455 # JSON cannot encode sets 456 if type(new_data[0]) is set: 457 # could check each if type(datapoint) is set, but that could be extensive... 458 new_data = [list(datapoint) for datapoint in new_data] 459 460 with tmp_file_path.open("w", encoding="utf-8", newline="") as output: 461 for count, post in enumerate(which_parent.iterate_items(self)): 462 # stop processing if worker has been asked to stop 463 if self.interrupted: 464 raise ProcessorInterruptedException("Interrupted while writing NDJSON file") 465 466 if not update_existing and field_name in post.original.keys(): 467 raise ProcessorException('field_name %s already exists!' % field_name) 468 469 # Update data 470 post.original[field_name] = new_data[count] 471 472 output.write(json.dumps(post.original) + "\n") 473 else: 474 raise NotImplementedError("Cannot iterate through %s file" % parent_path.suffix) 475 476 # Replace the source file path with the new file 477 shutil.copy(str(tmp_file_path), str(parent_path)) 478 479 # delete temporary files and folder 480 shutil.rmtree(tmp_path) 481 482 self.dataset.update_status("Parent dataset updated.") 483 484 def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]): 485 """ 486 A generator that iterates through files in an archive 487 488 With every iteration, the processor's 'interrupted' flag is checked, 489 and if set a ProcessorInterruptedException is raised, which by default 490 is caught and subsequently stops execution gracefully. 491 492 Files are temporarily unzipped and deleted after use. 493 494 :param Path path: Path to zip file to read 495 :param Path staging_area: Where to store the files while they're 496 being worked with. If omitted, a temporary folder is created and 497 deleted after use 498 :param bool immediately_delete: Temporary files are removed after yielded; 499 False keeps files until the staging_area is removed (usually during processor 500 cleanup) 501 :param list filename_filter: Whitelist of filenames to iterate. 502 Other files will be ignored. If empty, do not ignore anything. 503 :return: An iterator with a Path item for each file 504 """ 505 506 if not path.exists(): 507 return 508 509 if not staging_area: 510 staging_area = self.dataset.get_staging_area() 511 512 if not staging_area.exists() or not staging_area.is_dir(): 513 raise RuntimeError("Staging area %s is not a valid folder") 514 515 with zipfile.ZipFile(path, "r") as archive_file: 516 archive_contents = sorted(archive_file.namelist()) 517 518 for archived_file in archive_contents: 519 if filename_filter and archived_file not in filename_filter: 520 continue 521 522 info = archive_file.getinfo(archived_file) 523 if info.is_dir(): 524 continue 525 526 if self.interrupted: 527 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 528 529 temp_file = staging_area.joinpath(archived_file) 530 archive_file.extract(archived_file, staging_area) 531 532 yield temp_file 533 if immediately_delete: 534 temp_file.unlink() 535 536 def unpack_archive_contents(self, path, staging_area=None): 537 """ 538 Unpack all files in an archive to a staging area 539 540 With every iteration, the processor's 'interrupted' flag is checked, 541 and if set a ProcessorInterruptedException is raised, which by default 542 is caught and subsequently stops execution gracefully. 543 544 Files are unzipped to a staging area. The staging area is *not* 545 cleaned up automatically. 546 547 :param Path path: Path to zip file to read 548 :param Path staging_area: Where to store the files while they're 549 being worked with. If omitted, a temporary folder is created and 550 deleted after use 551 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 552 :return Path: A path to the staging area 553 """ 554 555 if not path.exists(): 556 return 557 558 if not staging_area: 559 staging_area = self.dataset.get_staging_area() 560 561 if not staging_area.exists() or not staging_area.is_dir(): 562 raise RuntimeError("Staging area %s is not a valid folder") 563 564 paths = [] 565 with zipfile.ZipFile(path, "r") as archive_file: 566 archive_contents = sorted(archive_file.namelist()) 567 568 for archived_file in archive_contents: 569 if self.interrupted: 570 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 571 572 file_name = archived_file.split("/")[-1] 573 temp_file = staging_area.joinpath(file_name) 574 archive_file.extract(archived_file, staging_area) 575 paths.append(temp_file) 576 577 return staging_area 578 579 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 580 """ 581 Extract a file from an archive by name 582 583 :param str filename: Name of file to extract 584 :param Path archive_path: Path to zip file to read 585 :param Path staging_area: Where to store the files while they're 586 being worked with. If omitted, a temporary folder is created 587 :return Path: A path to the extracted file 588 """ 589 if not archive_path.exists(): 590 return 591 592 if not staging_area: 593 staging_area = self.dataset.get_staging_area() 594 595 if not staging_area.exists() or not staging_area.is_dir(): 596 raise RuntimeError("Staging area %s is not a valid folder") 597 598 with zipfile.ZipFile(archive_path, "r") as archive_file: 599 if filename not in archive_file.namelist(): 600 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 601 else: 602 archive_file.extract(filename, staging_area) 603 return staging_area.joinpath(filename) 604 605 def write_csv_items_and_finish(self, data): 606 """ 607 Write data as csv to results file and finish dataset 608 609 Determines result file path using dataset's path determination helper 610 methods. After writing results, the dataset is marked finished. Will 611 raise a ProcessorInterruptedException if the interrupted flag for this 612 processor is set while iterating. 613 614 :param data: A list or tuple of dictionaries, all with the same keys 615 """ 616 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 617 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 618 619 if not data: 620 raise ValueError("write_csv_items requires a dictionary with at least one item") 621 622 self.dataset.update_status("Writing results file") 623 writer = False 624 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 625 for row in data: 626 if self.interrupted: 627 raise ProcessorInterruptedException("Interrupted while writing results file") 628 629 row = remove_nuls(row) 630 if not writer: 631 writer = csv.DictWriter(results, fieldnames=row.keys()) 632 writer.writeheader() 633 634 writer.writerow(row) 635 636 self.dataset.update_status("Finished") 637 self.dataset.finish(len(data)) 638 639 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 640 """ 641 Archive a bunch of files into a zip archive and finish processing 642 643 :param list|Path files: If a list, all files will be added to the 644 archive and deleted afterwards. If a folder, all files in the folder 645 will be added and the folder will be deleted afterwards. 646 :param int num_items: Items in the dataset. If None, the amount of 647 files added to the archive will be used. 648 :param int compression: Type of compression to use. By default, files 649 are not compressed, to speed up unarchiving. 650 :param bool finish: Finish the dataset/job afterwards or not? 651 """ 652 is_folder = False 653 if issubclass(type(files), PurePath): 654 is_folder = files 655 if not files.exists() or not files.is_dir(): 656 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 657 658 files = files.glob("*") 659 660 # create zip of archive and delete temporary files and folder 661 self.dataset.update_status("Compressing results into archive") 662 done = 0 663 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 664 for output_path in files: 665 zip.write(output_path, output_path.name) 666 output_path.unlink() 667 done += 1 668 669 # delete temporary folder 670 if is_folder: 671 shutil.rmtree(is_folder) 672 673 self.dataset.update_status("Finished") 674 if num_items is None: 675 num_items = done 676 677 if finish: 678 self.dataset.finish(num_items) 679 680 def create_standalone(self): 681 """ 682 Copy this dataset and make that copy standalone 683 684 This has the benefit of allowing for all analyses that can be run on 685 full datasets on the new, filtered copy as well. 686 687 :return DataSet: The new standalone dataset 688 """ 689 top_parent = self.source_dataset 690 691 finished = self.dataset.check_dataset_finished() 692 if finished == 'empty': 693 # No data to process, so we can't create a standalone dataset 694 return 695 elif finished is None: 696 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 697 pass 698 699 standalone = self.dataset.copy(shallow=False) 700 standalone.body_match = "(Filtered) " + top_parent.query 701 standalone.datasource = top_parent.parameters.get("datasource", "custom") 702 703 try: 704 standalone.board = top_parent.board 705 except AttributeError: 706 standalone.board = self.type 707 708 standalone.type = top_parent.type 709 710 standalone.detach() 711 standalone.delete_parameter("key_parent") 712 713 self.dataset.copied_to = standalone.key 714 715 # we don't need this file anymore - it has been copied to the new 716 # standalone dataset, and this one is not accessible via the interface 717 # except as a link to the copied standalone dataset 718 os.unlink(self.dataset.get_results_path()) 719 720 # Copy the log 721 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 722 723 return standalone 724 725 @classmethod 726 def map_item_method_available(cls, dataset): 727 """ 728 Check if this processor can use map_item 729 730 Checks if map_item method exists and is compatible with dataset. If 731 dataset has a different extension than the default for this processor, 732 or if the dataset has no extension, this means we cannot be sure the 733 data is in the right format to be mapped, so `False` is returned in 734 that case even if a map_item() method is available. 735 736 :param BasicProcessor processor: The BasicProcessor subclass object 737 with which to use map_item 738 :param DataSet dataset: The DataSet object with which to 739 use map_item 740 """ 741 # only run item mapper if extension of processor == extension of 742 # data file, for the scenario where a csv file was uploaded and 743 # converted to an ndjson-based data source, for example 744 # todo: this is kind of ugly, and a better fix may be possible 745 dataset_extension = dataset.get_extension() 746 if not dataset_extension: 747 # DataSet results file does not exist or has no extension, use expected extension 748 if hasattr(dataset, "extension"): 749 dataset_extension = dataset.extension 750 else: 751 # No known DataSet extension; cannot determine if map_item method compatible 752 return False 753 754 return hasattr(cls, "map_item") and cls.extension == dataset_extension 755 756 @classmethod 757 def get_mapped_item(cls, item): 758 """ 759 Get the mapped item using a processors map_item method. 760 761 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 762 """ 763 try: 764 mapped_item = cls.map_item(item) 765 except (KeyError, IndexError) as e: 766 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 767 768 if not mapped_item: 769 raise MapItemException("Unable to map item!") 770 771 return mapped_item 772 773 @classmethod 774 def is_filter(cls): 775 """ 776 Is this processor a filter? 777 778 Filters do not produce their own dataset but replace the source_dataset dataset 779 instead. 780 781 :todo: Make this a bit more robust than sniffing the processor category 782 :return bool: 783 """ 784 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower() 785 786 @classmethod 787 def get_options(cls, parent_dataset=None, user=None): 788 """ 789 Get processor options 790 791 This method by default returns the class's "options" attribute, or an 792 empty dictionary. It can be redefined by processors that need more 793 fine-grained options, e.g. in cases where the availability of options 794 is partially determined by the parent dataset's parameters. 795 796 :param DataSet parent_dataset: An object representing the dataset that 797 the processor would be run on 798 :param User user: Flask user the options will be displayed for, in 799 case they are requested for display in the 4CAT web interface. This can 800 be used to show some options only to privileges users. 801 """ 802 return cls.options if hasattr(cls, "options") else {} 803 804 @classmethod 805 def get_status(cls): 806 """ 807 Get processor status 808 809 :return list: Statuses of this processor 810 """ 811 return cls.status if hasattr(cls, "status") else None 812 813 @classmethod 814 def is_top_dataset(cls): 815 """ 816 Confirm this is *not* a top dataset, but a processor. 817 818 Used for processor compatibility checks. 819 820 :return bool: Always `False`, because this is a processor. 821 """ 822 return False 823 824 @classmethod 825 def is_from_collector(cls): 826 """ 827 Check if this processor is one that collects data, i.e. a search or 828 import worker. 829 830 :return bool: 831 """ 832 return cls.type.endswith("-search") or cls.type.endswith("-import") 833 834 @classmethod 835 def get_extension(self, parent_dataset=None): 836 """ 837 Return the extension of the processor's dataset 838 839 Used for processor compatibility checks. 840 841 :param DataSet parent_dataset: An object representing the dataset that 842 the processor would be run on 843 :return str|None: Dataset extension (without leading `.`) or `None`. 844 """ 845 if self.is_filter(): 846 if parent_dataset is not None: 847 # Filters should use the same extension as the parent dataset 848 return parent_dataset.get_extension() 849 else: 850 # No dataset provided, unable to determine extension of parent dataset 851 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 852 return None 853 elif self.extension: 854 # Use explicitly defined extension in class (Processor class defaults to "csv") 855 return self.extension 856 else: 857 # A non filter processor updated the base Processor extension to None/False? 858 return None 859 860 @classmethod 861 def is_rankable(cls, multiple_items=True): 862 """ 863 Used for processor compatibility 864 865 :param bool multiple_items: Consider datasets with multiple items per 866 item (e.g. word_1, word_2, etc)? Included for compatibility 867 """ 868 return False 869 870 @classmethod 871 def exclude_followup_processors(cls, processor_type=None): 872 """ 873 Used for processor compatibility 874 875 To be defined by the child processor if it should exclude certain follow-up processors. 876 e.g.: 877 878 def exclude_followup_processors(cls, processor_type): 879 if processor_type in ["undesirable-followup-processor"]: 880 return True 881 return False 882 883 :param str processor_type: Processor type to exclude 884 :return bool: True if processor should be excluded, False otherwise 885 """ 886 return False 887 888 @abc.abstractmethod 889 def process(self): 890 """ 891 Process data 892 893 To be defined by the child processor. 894 """ 895 pass 896 897 @staticmethod 898 def is_4cat_processor(): 899 """ 900 Is this a 4CAT processor? 901 902 This is used to determine whether a class is a 4CAT 903 processor. 904 905 :return: True 906 """ 907 return True
Abstract processor class
A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).
To determine whether a processor can process a given dataset, you can
define a is_compatible_with(FourcatModule module=None, str user=None):) -> bool
class
method which takes a dataset as argument and returns a bool that determines
if this processor is considered compatible with that dataset. For example:
@classmethod def is_compatible_with(cls, module=None, user=None): return module.type == "linguistic-features"
94 def work(self): 95 """ 96 Process a dataset 97 98 Loads dataset metadata, sets up the scaffolding for performing some kind 99 of processing on that dataset, and then processes it. Afterwards, clean 100 up. 101 """ 102 try: 103 # a dataset can have multiple owners, but the creator is the user 104 # that actually queued the processor, so their config is relevant 105 self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules) 106 self.owner = self.dataset.creator 107 except DataSetException as e: 108 # query has been deleted in the meantime. finish without error, 109 # as deleting it will have been a conscious choice by a user 110 self.job.finish() 111 return 112 113 # set up config reader using the worker's DB connection and the dataset 114 # creator. This ensures that if a value has been overriden for the owner, 115 # the overridden value is used instead. 116 config.with_db(self.db) 117 self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner)) 118 119 if self.dataset.data.get("key_parent", None): 120 # search workers never have parents (for now), so we don't need to 121 # find out what the source_dataset dataset is if it's a search worker 122 try: 123 self.source_dataset = self.dataset.get_parent() 124 125 # for presets, transparently use the *top* dataset as a source_dataset 126 # since that is where any underlying processors should get 127 # their data from. However, this should only be done as long as the 128 # preset is not finished yet, because after that there may be processors 129 # that run on the final preset result 130 while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished(): 131 self.is_running_in_preset = True 132 self.source_dataset = self.source_dataset.get_parent() 133 if self.source_dataset is None: 134 # this means there is no dataset that is *not* a preset anywhere 135 # above this dataset. This should never occur, but if it does, we 136 # cannot continue 137 self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset", 138 (self.type, self.dataset.key)) 139 self.job.finish() 140 return 141 142 except DataSetException: 143 # we need to know what the source_dataset dataset was to properly handle the 144 # analysis 145 self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % ( 146 self.type, self.dataset.key)) 147 self.job.finish() 148 return 149 150 if not self.source_dataset.is_finished() and not self.is_running_in_preset: 151 # not finished yet - retry after a while 152 # exception for presets, since these *should* be unfinished 153 # until underlying processors are done 154 self.job.release(delay=30) 155 return 156 157 self.source_file = self.source_dataset.get_results_path() 158 if not self.source_file.exists(): 159 self.dataset.update_status("Finished, no input data found.") 160 161 self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"])) 162 163 processor_name = self.title if hasattr(self, "title") else self.type 164 self.dataset.clear_log() 165 self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key)) 166 167 # start log file 168 self.dataset.update_status("Processing data") 169 self.dataset.update_version(get_software_commit(self)) 170 171 # get parameters 172 # if possible, fill defaults where parameters are not provided 173 given_parameters = self.dataset.parameters.copy() 174 all_parameters = self.get_options(self.dataset) 175 self.parameters = { 176 param: given_parameters.get(param, all_parameters.get(param, {}).get("default")) 177 for param in [*all_parameters.keys(), *given_parameters.keys()] 178 } 179 180 # now the parameters have been loaded into memory, clear any sensitive 181 # ones. This has a side-effect that a processor may not run again 182 # without starting from scratch, but this is the price of progress 183 options = self.get_options(self.dataset.get_parent()) 184 for option, option_settings in options.items(): 185 if option_settings.get("sensitive"): 186 self.dataset.delete_parameter(option) 187 188 if self.interrupted: 189 self.dataset.log("Processing interrupted, trying again later") 190 return self.abort() 191 192 if not self.dataset.is_finished(): 193 try: 194 self.process() 195 self.after_process() 196 except WorkerInterruptedException as e: 197 self.dataset.log("Processing interrupted (%s), trying again later" % str(e)) 198 self.abort() 199 except Exception as e: 200 self.dataset.log("Processor crashed (%s), trying again later" % str(e)) 201 frames = traceback.extract_tb(e.__traceback__) 202 last_frame = frames[-1] 203 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames[1:]] 204 location = "->".join(frames) 205 206 # Not all datasets have source_dataset keys 207 if len(self.dataset.get_genealogy()) > 1: 208 parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")" 209 else: 210 parent_key = "" 211 212 # remove any result files that have been created so far 213 self.remove_files() 214 215 raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n %s\n" % ( 216 self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=last_frame) 217 else: 218 # dataset already finished, job shouldn't be open anymore 219 self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (self.job.data["jobtype"], self.job.data["remote_id"])) 220 self.job.finish()
Process a dataset
Loads dataset metadata, sets up the scaffolding for performing some kind of processing on that dataset, and then processes it. Afterwards, clean up.
223 def after_process(self): 224 """ 225 Run after processing the dataset 226 227 This method cleans up temporary files, and if needed, handles logistics 228 concerning the result file, e.g. running a pre-defined processor on the 229 result, copying it to another dataset, and so on. 230 """ 231 if self.dataset.data["num_rows"] > 0: 232 self.dataset.update_status("Dataset completed.") 233 234 if not self.dataset.is_finished(): 235 self.dataset.finish() 236 237 self.dataset.remove_staging_areas() 238 239 # see if we have anything else lined up to run next 240 for next in self.parameters.get("next", []): 241 can_run_next = True 242 next_parameters = next.get("parameters", {}) 243 next_type = next.get("type", "") 244 try: 245 available_processors = self.dataset.get_available_processors(user=self.dataset.creator) 246 except ValueError: 247 self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting") 248 break 249 250 # run it only if the post-processor is actually available for this query 251 if self.dataset.data["num_rows"] <= 0: 252 can_run_next = False 253 self.log.info("Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (next_type, self.dataset.key)) 254 255 elif next_type in available_processors: 256 next_analysis = DataSet( 257 parameters=next_parameters, 258 type=next_type, 259 db=self.db, 260 parent=self.dataset.key, 261 extension=available_processors[next_type].extension, 262 is_private=self.dataset.is_private, 263 owner=self.dataset.creator, 264 modules=self.modules 265 ) 266 self.queue.add_job(next_type, remote_id=next_analysis.key) 267 else: 268 can_run_next = False 269 self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (self.dataset.key, self.type, next_type)) 270 271 if not can_run_next: 272 # We are unable to continue the chain of processors, so we check to see if we are attaching to a parent 273 # preset; this allows the parent (for example a preset) to be finished and any successful processors displayed 274 if "attach_to" in self.parameters: 275 # Probably should not happen, but for some reason a mid processor has been designated as the processor 276 # the parent should attach to 277 pass 278 else: 279 # Check for "attach_to" parameter in descendents 280 while True: 281 if "attach_to" in next_parameters: 282 self.parameters["attach_to"] = next_parameters["attach_to"] 283 break 284 else: 285 if "next" in next_parameters: 286 next_parameters = next_parameters["next"][0]["parameters"] 287 else: 288 # No more descendents 289 # Should not happen; we cannot find the source dataset 290 self.log.warning("Cannot find preset's source dataset for dataset %s" % self.dataset.key) 291 break 292 293 # see if we need to register the result somewhere 294 if "copy_to" in self.parameters: 295 # copy the results to an arbitrary place that was passed 296 if self.dataset.get_results_path().exists(): 297 shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"]) 298 else: 299 # if copy_to was passed, that means it's important that this 300 # file exists somewhere, so we create it as an empty file 301 with open(self.parameters["copy_to"], "w") as empty_file: 302 empty_file.write("") 303 304 # see if this query chain is to be attached to another query 305 # if so, the full genealogy of this query (minus the original dataset) 306 # is attached to the given query - this is mostly useful for presets, 307 # where a chain of processors can be marked as 'underlying' a preset 308 if "attach_to" in self.parameters: 309 try: 310 # copy metadata and results to the surrogate 311 surrogate = DataSet(key=self.parameters["attach_to"], db=self.db) 312 313 if self.dataset.get_results_path().exists(): 314 # Update the surrogate's results file suffix to match this dataset's suffix 315 surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(self.dataset.get_results_path().suffix) 316 shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path())) 317 318 try: 319 surrogate.finish(self.dataset.data["num_rows"]) 320 except RuntimeError: 321 # already finished, could happen (though it shouldn't) 322 pass 323 324 surrogate.update_status(self.dataset.get_status()) 325 326 except ValueError: 327 # dataset with key to attach to doesn't exist... 328 self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % ( 329 self.dataset.key, self.parameters["attach_to"])) 330 331 self.job.finish() 332 333 if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): 334 owner = self.dataset.get_parameters().get("email-complete", False) 335 # Check that username is email address 336 if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner): 337 from email.mime.multipart import MIMEMultipart 338 from email.mime.text import MIMEText 339 from smtplib import SMTPException 340 import socket 341 import html2text 342 343 self.log.debug("Sending email to %s" % owner) 344 dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key 345 sender = config.get('mail.noreply') 346 message = MIMEMultipart("alternative") 347 message["From"] = sender 348 message["To"] = owner 349 message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label()) 350 mail = """ 351 <p>Hello %s,</p> 352 <p>4CAT has finished collecting your %s dataset labeled: %s</p> 353 <p>You can view your dataset via the following link:</p> 354 <p><a href="%s">%s</a></p> 355 <p>Sincerely,</p> 356 <p>Your friendly neighborhood 4CAT admin</p> 357 """ % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url) 358 html_parser = html2text.HTML2Text() 359 message.attach(MIMEText(html_parser.handle(mail), "plain")) 360 message.attach(MIMEText(mail, "html")) 361 try: 362 send_email([owner], message) 363 except (SMTPException, ConnectionRefusedError, socket.timeout) as e: 364 self.log.error("Error sending email to %s" % owner)
Run after processing the dataset
This method cleans up temporary files, and if needed, handles logistics concerning the result file, e.g. running a pre-defined processor on the result, copying it to another dataset, and so on.
366 def remove_files(self): 367 """ 368 Clean up result files and any staging files for processor to be attempted 369 later if desired. 370 """ 371 # Remove the results file that was created 372 if self.dataset.get_results_path().exists(): 373 self.dataset.get_results_path().unlink() 374 if self.dataset.get_results_folder_path().exists(): 375 shutil.rmtree(self.dataset.get_results_folder_path()) 376 377 # Remove any staging areas with temporary data 378 self.dataset.remove_staging_areas()
Clean up result files and any staging files for processor to be attempted later if desired.
380 def abort(self): 381 """ 382 Abort dataset creation and clean up so it may be attempted again later 383 """ 384 # remove any result files that have been created so far 385 self.remove_files() 386 387 # we release instead of finish, since interrupting is just that - the 388 # job should resume at a later point. Delay resuming by 10 seconds to 389 # give 4CAT the time to do whatever it wants (though usually this isn't 390 # needed since restarting also stops the spawning of new workers) 391 if self.interrupted == self.INTERRUPT_RETRY: 392 # retry later - wait at least 10 seconds to give the backend time to shut down 393 self.job.release(delay=10) 394 elif self.interrupted == self.INTERRUPT_CANCEL: 395 # cancel job 396 self.job.finish()
Abort dataset creation and clean up so it may be attempted again later
398 def add_field_to_parent(self, field_name, new_data, which_parent=source_dataset, update_existing=False): 399 """ 400 This function adds a new field to the parent dataset. Expects a list of data points, one for each item 401 in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used 402 to overwrite an existing field. 403 404 TODO: could be improved by accepting different types of data depending on csv or ndjson. 405 406 :param str field_name: name of the desired 407 :param List new_data: List of data to be added to parent dataset 408 :param DataSet which_parent: DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent()) 409 :param bool update_existing: False (default) will raise an error if the field_name already exists 410 True will allow updating existing data 411 """ 412 if len(new_data) < 1: 413 # no data 414 raise ProcessorException('No data provided') 415 416 if not hasattr(self, "source_dataset") and which_parent is not None: 417 # no source to update 418 raise ProcessorException('No source dataset to update') 419 420 # Get the source file data path 421 parent_path = which_parent.get_results_path() 422 423 if len(new_data) != which_parent.num_rows: 424 raise ProcessorException('Must have new data point for each record: parent dataset: %i, new data points: %i' % (which_parent.num_rows, len(new_data))) 425 426 self.dataset.update_status("Adding new field %s to the source file" % field_name) 427 428 # Get a temporary path where we can store the data 429 tmp_path = self.dataset.get_staging_area() 430 tmp_file_path = tmp_path.joinpath(parent_path.name) 431 432 # go through items one by one, optionally mapping them 433 if parent_path.suffix.lower() == ".csv": 434 # Get field names 435 fieldnames = which_parent.get_columns() 436 if not update_existing and field_name in fieldnames: 437 raise ProcessorException('field_name %s already exists!' % field_name) 438 fieldnames.append(field_name) 439 440 # Iterate through the original dataset and add values to a new column 441 self.dataset.update_status("Writing new source file with %s." % field_name) 442 with tmp_file_path.open("w", encoding="utf-8", newline="") as output: 443 writer = csv.DictWriter(output, fieldnames=fieldnames) 444 writer.writeheader() 445 446 for count, post in enumerate(which_parent.iterate_items(self)): 447 # stop processing if worker has been asked to stop 448 if self.interrupted: 449 raise ProcessorInterruptedException("Interrupted while writing CSV file") 450 451 post.original[field_name] = new_data[count] 452 writer.writerow(post.original) 453 454 elif parent_path.suffix.lower() == ".ndjson": 455 # JSON cannot encode sets 456 if type(new_data[0]) is set: 457 # could check each if type(datapoint) is set, but that could be extensive... 458 new_data = [list(datapoint) for datapoint in new_data] 459 460 with tmp_file_path.open("w", encoding="utf-8", newline="") as output: 461 for count, post in enumerate(which_parent.iterate_items(self)): 462 # stop processing if worker has been asked to stop 463 if self.interrupted: 464 raise ProcessorInterruptedException("Interrupted while writing NDJSON file") 465 466 if not update_existing and field_name in post.original.keys(): 467 raise ProcessorException('field_name %s already exists!' % field_name) 468 469 # Update data 470 post.original[field_name] = new_data[count] 471 472 output.write(json.dumps(post.original) + "\n") 473 else: 474 raise NotImplementedError("Cannot iterate through %s file" % parent_path.suffix) 475 476 # Replace the source file path with the new file 477 shutil.copy(str(tmp_file_path), str(parent_path)) 478 479 # delete temporary files and folder 480 shutil.rmtree(tmp_path) 481 482 self.dataset.update_status("Parent dataset updated.")
This function adds a new field to the parent dataset. Expects a list of data points, one for each item in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used to overwrite an existing field.
TODO: could be improved by accepting different types of data depending on csv or ndjson.
Parameters
- str field_name: name of the desired
- List new_data: List of data to be added to parent dataset
- DataSet which_parent: DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent())
- bool update_existing: False (default) will raise an error if the field_name already exists True will allow updating existing data
484 def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]): 485 """ 486 A generator that iterates through files in an archive 487 488 With every iteration, the processor's 'interrupted' flag is checked, 489 and if set a ProcessorInterruptedException is raised, which by default 490 is caught and subsequently stops execution gracefully. 491 492 Files are temporarily unzipped and deleted after use. 493 494 :param Path path: Path to zip file to read 495 :param Path staging_area: Where to store the files while they're 496 being worked with. If omitted, a temporary folder is created and 497 deleted after use 498 :param bool immediately_delete: Temporary files are removed after yielded; 499 False keeps files until the staging_area is removed (usually during processor 500 cleanup) 501 :param list filename_filter: Whitelist of filenames to iterate. 502 Other files will be ignored. If empty, do not ignore anything. 503 :return: An iterator with a Path item for each file 504 """ 505 506 if not path.exists(): 507 return 508 509 if not staging_area: 510 staging_area = self.dataset.get_staging_area() 511 512 if not staging_area.exists() or not staging_area.is_dir(): 513 raise RuntimeError("Staging area %s is not a valid folder") 514 515 with zipfile.ZipFile(path, "r") as archive_file: 516 archive_contents = sorted(archive_file.namelist()) 517 518 for archived_file in archive_contents: 519 if filename_filter and archived_file not in filename_filter: 520 continue 521 522 info = archive_file.getinfo(archived_file) 523 if info.is_dir(): 524 continue 525 526 if self.interrupted: 527 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 528 529 temp_file = staging_area.joinpath(archived_file) 530 archive_file.extract(archived_file, staging_area) 531 532 yield temp_file 533 if immediately_delete: 534 temp_file.unlink()
A generator that iterates through files in an archive
With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.
Files are temporarily unzipped and deleted after use.
Parameters
- Path path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
- bool immediately_delete: Temporary files are removed after yielded; False keeps files until the staging_area is removed (usually during processor cleanup)
- list filename_filter: Whitelist of filenames to iterate. Other files will be ignored. If empty, do not ignore anything.
Returns
An iterator with a Path item for each file
536 def unpack_archive_contents(self, path, staging_area=None): 537 """ 538 Unpack all files in an archive to a staging area 539 540 With every iteration, the processor's 'interrupted' flag is checked, 541 and if set a ProcessorInterruptedException is raised, which by default 542 is caught and subsequently stops execution gracefully. 543 544 Files are unzipped to a staging area. The staging area is *not* 545 cleaned up automatically. 546 547 :param Path path: Path to zip file to read 548 :param Path staging_area: Where to store the files while they're 549 being worked with. If omitted, a temporary folder is created and 550 deleted after use 551 :param int max_number_files: Maximum number of files to unpack. If None, all files unpacked 552 :return Path: A path to the staging area 553 """ 554 555 if not path.exists(): 556 return 557 558 if not staging_area: 559 staging_area = self.dataset.get_staging_area() 560 561 if not staging_area.exists() or not staging_area.is_dir(): 562 raise RuntimeError("Staging area %s is not a valid folder") 563 564 paths = [] 565 with zipfile.ZipFile(path, "r") as archive_file: 566 archive_contents = sorted(archive_file.namelist()) 567 568 for archived_file in archive_contents: 569 if self.interrupted: 570 raise ProcessorInterruptedException("Interrupted while iterating zip file contents") 571 572 file_name = archived_file.split("/")[-1] 573 temp_file = staging_area.joinpath(file_name) 574 archive_file.extract(archived_file, staging_area) 575 paths.append(temp_file) 576 577 return staging_area
Unpack all files in an archive to a staging area
With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.
Files are unzipped to a staging area. The staging area is not cleaned up automatically.
Parameters
- Path path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
- int max_number_files: Maximum number of files to unpack. If None, all files unpacked
Returns
A path to the staging area
579 def extract_archived_file_by_name(self, filename, archive_path, staging_area=None): 580 """ 581 Extract a file from an archive by name 582 583 :param str filename: Name of file to extract 584 :param Path archive_path: Path to zip file to read 585 :param Path staging_area: Where to store the files while they're 586 being worked with. If omitted, a temporary folder is created 587 :return Path: A path to the extracted file 588 """ 589 if not archive_path.exists(): 590 return 591 592 if not staging_area: 593 staging_area = self.dataset.get_staging_area() 594 595 if not staging_area.exists() or not staging_area.is_dir(): 596 raise RuntimeError("Staging area %s is not a valid folder") 597 598 with zipfile.ZipFile(archive_path, "r") as archive_file: 599 if filename not in archive_file.namelist(): 600 raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path)) 601 else: 602 archive_file.extract(filename, staging_area) 603 return staging_area.joinpath(filename)
Extract a file from an archive by name
Parameters
- str filename: Name of file to extract
- Path archive_path: Path to zip file to read
- Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created
Returns
A path to the extracted file
605 def write_csv_items_and_finish(self, data): 606 """ 607 Write data as csv to results file and finish dataset 608 609 Determines result file path using dataset's path determination helper 610 methods. After writing results, the dataset is marked finished. Will 611 raise a ProcessorInterruptedException if the interrupted flag for this 612 processor is set while iterating. 613 614 :param data: A list or tuple of dictionaries, all with the same keys 615 """ 616 if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str): 617 raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data)) 618 619 if not data: 620 raise ValueError("write_csv_items requires a dictionary with at least one item") 621 622 self.dataset.update_status("Writing results file") 623 writer = False 624 with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results: 625 for row in data: 626 if self.interrupted: 627 raise ProcessorInterruptedException("Interrupted while writing results file") 628 629 row = remove_nuls(row) 630 if not writer: 631 writer = csv.DictWriter(results, fieldnames=row.keys()) 632 writer.writeheader() 633 634 writer.writerow(row) 635 636 self.dataset.update_status("Finished") 637 self.dataset.finish(len(data))
Write data as csv to results file and finish dataset
Determines result file path using dataset's path determination helper methods. After writing results, the dataset is marked finished. Will raise a ProcessorInterruptedException if the interrupted flag for this processor is set while iterating.
Parameters
- data: A list or tuple of dictionaries, all with the same keys
639 def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True): 640 """ 641 Archive a bunch of files into a zip archive and finish processing 642 643 :param list|Path files: If a list, all files will be added to the 644 archive and deleted afterwards. If a folder, all files in the folder 645 will be added and the folder will be deleted afterwards. 646 :param int num_items: Items in the dataset. If None, the amount of 647 files added to the archive will be used. 648 :param int compression: Type of compression to use. By default, files 649 are not compressed, to speed up unarchiving. 650 :param bool finish: Finish the dataset/job afterwards or not? 651 """ 652 is_folder = False 653 if issubclass(type(files), PurePath): 654 is_folder = files 655 if not files.exists() or not files.is_dir(): 656 raise RuntimeError("Folder %s is not a folder that can be archived" % files) 657 658 files = files.glob("*") 659 660 # create zip of archive and delete temporary files and folder 661 self.dataset.update_status("Compressing results into archive") 662 done = 0 663 with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip: 664 for output_path in files: 665 zip.write(output_path, output_path.name) 666 output_path.unlink() 667 done += 1 668 669 # delete temporary folder 670 if is_folder: 671 shutil.rmtree(is_folder) 672 673 self.dataset.update_status("Finished") 674 if num_items is None: 675 num_items = done 676 677 if finish: 678 self.dataset.finish(num_items)
Archive a bunch of files into a zip archive and finish processing
Parameters
- list|Path files: If a list, all files will be added to the archive and deleted afterwards. If a folder, all files in the folder will be added and the folder will be deleted afterwards.
- int num_items: Items in the dataset. If None, the amount of files added to the archive will be used.
- int compression: Type of compression to use. By default, files are not compressed, to speed up unarchiving.
- bool finish: Finish the dataset/job afterwards or not?
680 def create_standalone(self): 681 """ 682 Copy this dataset and make that copy standalone 683 684 This has the benefit of allowing for all analyses that can be run on 685 full datasets on the new, filtered copy as well. 686 687 :return DataSet: The new standalone dataset 688 """ 689 top_parent = self.source_dataset 690 691 finished = self.dataset.check_dataset_finished() 692 if finished == 'empty': 693 # No data to process, so we can't create a standalone dataset 694 return 695 elif finished is None: 696 # I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now 697 pass 698 699 standalone = self.dataset.copy(shallow=False) 700 standalone.body_match = "(Filtered) " + top_parent.query 701 standalone.datasource = top_parent.parameters.get("datasource", "custom") 702 703 try: 704 standalone.board = top_parent.board 705 except AttributeError: 706 standalone.board = self.type 707 708 standalone.type = top_parent.type 709 710 standalone.detach() 711 standalone.delete_parameter("key_parent") 712 713 self.dataset.copied_to = standalone.key 714 715 # we don't need this file anymore - it has been copied to the new 716 # standalone dataset, and this one is not accessible via the interface 717 # except as a link to the copied standalone dataset 718 os.unlink(self.dataset.get_results_path()) 719 720 # Copy the log 721 shutil.copy(self.dataset.get_log_path(), standalone.get_log_path()) 722 723 return standalone
Copy this dataset and make that copy standalone
This has the benefit of allowing for all analyses that can be run on full datasets on the new, filtered copy as well.
Returns
The new standalone dataset
725 @classmethod 726 def map_item_method_available(cls, dataset): 727 """ 728 Check if this processor can use map_item 729 730 Checks if map_item method exists and is compatible with dataset. If 731 dataset has a different extension than the default for this processor, 732 or if the dataset has no extension, this means we cannot be sure the 733 data is in the right format to be mapped, so `False` is returned in 734 that case even if a map_item() method is available. 735 736 :param BasicProcessor processor: The BasicProcessor subclass object 737 with which to use map_item 738 :param DataSet dataset: The DataSet object with which to 739 use map_item 740 """ 741 # only run item mapper if extension of processor == extension of 742 # data file, for the scenario where a csv file was uploaded and 743 # converted to an ndjson-based data source, for example 744 # todo: this is kind of ugly, and a better fix may be possible 745 dataset_extension = dataset.get_extension() 746 if not dataset_extension: 747 # DataSet results file does not exist or has no extension, use expected extension 748 if hasattr(dataset, "extension"): 749 dataset_extension = dataset.extension 750 else: 751 # No known DataSet extension; cannot determine if map_item method compatible 752 return False 753 754 return hasattr(cls, "map_item") and cls.extension == dataset_extension
Check if this processor can use map_item
Checks if map_item method exists and is compatible with dataset. If
dataset has a different extension than the default for this processor,
or if the dataset has no extension, this means we cannot be sure the
data is in the right format to be mapped, so False
is returned in
that case even if a map_item() method is available.
Parameters
- BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
- DataSet dataset: The DataSet object with which to use map_item
756 @classmethod 757 def get_mapped_item(cls, item): 758 """ 759 Get the mapped item using a processors map_item method. 760 761 Ensure map_item method is compatible with a dataset by checking map_item_method_available first. 762 """ 763 try: 764 mapped_item = cls.map_item(item) 765 except (KeyError, IndexError) as e: 766 raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}") 767 768 if not mapped_item: 769 raise MapItemException("Unable to map item!") 770 771 return mapped_item
Get the mapped item using a processors map_item method.
Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
773 @classmethod 774 def is_filter(cls): 775 """ 776 Is this processor a filter? 777 778 Filters do not produce their own dataset but replace the source_dataset dataset 779 instead. 780 781 :todo: Make this a bit more robust than sniffing the processor category 782 :return bool: 783 """ 784 return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
Is this processor a filter?
Filters do not produce their own dataset but replace the source_dataset dataset instead.
:todo: Make this a bit more robust than sniffing the processor category
Returns
786 @classmethod 787 def get_options(cls, parent_dataset=None, user=None): 788 """ 789 Get processor options 790 791 This method by default returns the class's "options" attribute, or an 792 empty dictionary. It can be redefined by processors that need more 793 fine-grained options, e.g. in cases where the availability of options 794 is partially determined by the parent dataset's parameters. 795 796 :param DataSet parent_dataset: An object representing the dataset that 797 the processor would be run on 798 :param User user: Flask user the options will be displayed for, in 799 case they are requested for display in the 4CAT web interface. This can 800 be used to show some options only to privileges users. 801 """ 802 return cls.options if hasattr(cls, "options") else {}
Get processor options
This method by default returns the class's "options" attribute, or an empty dictionary. It can be redefined by processors that need more fine-grained options, e.g. in cases where the availability of options is partially determined by the parent dataset's parameters.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
- User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
804 @classmethod 805 def get_status(cls): 806 """ 807 Get processor status 808 809 :return list: Statuses of this processor 810 """ 811 return cls.status if hasattr(cls, "status") else None
Get processor status
Returns
Statuses of this processor
813 @classmethod 814 def is_top_dataset(cls): 815 """ 816 Confirm this is *not* a top dataset, but a processor. 817 818 Used for processor compatibility checks. 819 820 :return bool: Always `False`, because this is a processor. 821 """ 822 return False
Confirm this is not a top dataset, but a processor.
Used for processor compatibility checks.
Returns
Always
False
, because this is a processor.
824 @classmethod 825 def is_from_collector(cls): 826 """ 827 Check if this processor is one that collects data, i.e. a search or 828 import worker. 829 830 :return bool: 831 """ 832 return cls.type.endswith("-search") or cls.type.endswith("-import")
Check if this processor is one that collects data, i.e. a search or import worker.
Returns
834 @classmethod 835 def get_extension(self, parent_dataset=None): 836 """ 837 Return the extension of the processor's dataset 838 839 Used for processor compatibility checks. 840 841 :param DataSet parent_dataset: An object representing the dataset that 842 the processor would be run on 843 :return str|None: Dataset extension (without leading `.`) or `None`. 844 """ 845 if self.is_filter(): 846 if parent_dataset is not None: 847 # Filters should use the same extension as the parent dataset 848 return parent_dataset.get_extension() 849 else: 850 # No dataset provided, unable to determine extension of parent dataset 851 # if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead 852 return None 853 elif self.extension: 854 # Use explicitly defined extension in class (Processor class defaults to "csv") 855 return self.extension 856 else: 857 # A non filter processor updated the base Processor extension to None/False? 858 return None
Return the extension of the processor's dataset
Used for processor compatibility checks.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
Returns
Dataset extension (without leading
.
) orNone
.
860 @classmethod 861 def is_rankable(cls, multiple_items=True): 862 """ 863 Used for processor compatibility 864 865 :param bool multiple_items: Consider datasets with multiple items per 866 item (e.g. word_1, word_2, etc)? Included for compatibility 867 """ 868 return False
Used for processor compatibility
Parameters
- bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)? Included for compatibility
870 @classmethod 871 def exclude_followup_processors(cls, processor_type=None): 872 """ 873 Used for processor compatibility 874 875 To be defined by the child processor if it should exclude certain follow-up processors. 876 e.g.: 877 878 def exclude_followup_processors(cls, processor_type): 879 if processor_type in ["undesirable-followup-processor"]: 880 return True 881 return False 882 883 :param str processor_type: Processor type to exclude 884 :return bool: True if processor should be excluded, False otherwise 885 """ 886 return False
Used for processor compatibility
To be defined by the child processor if it should exclude certain follow-up processors. e.g.:
def exclude_followup_processors(cls, processor_type): if processor_type in ["undesirable-followup-processor"]: return True return False
Parameters
- str processor_type: Processor type to exclude
Returns
True if processor should be excluded, False otherwise
888 @abc.abstractmethod 889 def process(self): 890 """ 891 Process data 892 893 To be defined by the child processor. 894 """ 895 pass
Process data
To be defined by the child processor.
897 @staticmethod 898 def is_4cat_processor(): 899 """ 900 Is this a 4CAT processor? 901 902 This is used to determine whether a class is a 4CAT 903 processor. 904 905 :return: True 906 """ 907 return True
Is this a 4CAT processor?
This is used to determine whether a class is a 4CAT processor.
Returns
True