Edit on GitHub

backend.lib.processor

Basic post-processor worker - should be inherited by workers to post-process results

  1"""
  2Basic post-processor worker - should be inherited by workers to post-process results
  3"""
  4import re
  5import traceback
  6import zipfile
  7import typing
  8import shutil
  9import json
 10import abc
 11import csv
 12import os
 13
 14from pathlib import Path, PurePath
 15
 16from backend.lib.worker import BasicWorker
 17from common.lib.dataset import DataSet
 18from common.lib.fourcat_module import FourcatModule
 19from common.lib.helpers import get_software_commit, remove_nuls, send_email
 20from common.lib.exceptions import (WorkerInterruptedException, ProcessorInterruptedException, ProcessorException,
 21								   DataSetException, MapItemException)
 22from common.config_manager import config, ConfigWrapper
 23from common.lib.user import User
 24
 25
 26csv.field_size_limit(1024 * 1024 * 1024)
 27
 28
 29class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
 30	"""
 31	Abstract processor class
 32
 33	A processor takes a finished dataset as input and processes its result in
 34	some way, with another dataset set as output. The input thus is a file, and
 35	the output (usually) as well. In other words, the result of a processor can
 36	be used as input for another processor (though whether and when this is
 37	useful is another question).
 38
 39	To determine whether a processor can process a given dataset, you can
 40	define a `is_compatible_with(FourcatModule module=None, str user=None):) -> bool` class
 41	method which takes a dataset as argument and returns a bool that determines
 42	if this processor is considered compatible with that dataset. For example:
 43
 44	.. code-block:: python
 45
 46        @classmethod
 47        def is_compatible_with(cls, module=None, user=None):
 48            return module.type == "linguistic-features"
 49
 50
 51	"""
 52
 53	#: Database handler to interface with the 4CAT database
 54	db = None
 55
 56	#: Job object that requests the execution of this processor
 57	job = None
 58
 59	#: The dataset object that the processor is *creating*.
 60	dataset = None
 61
 62	#: Owner (username) of the dataset
 63	owner = None
 64
 65	#: The dataset object that the processor is *processing*.
 66	source_dataset = None
 67
 68	#: The file that is being processed
 69	source_file = None
 70
 71	#: Processor description, which will be displayed in the web interface
 72	description = "No description available"
 73
 74	#: Category identifier, used to group processors in the web interface
 75	category = "Other"
 76
 77	#: Extension of the file created by the processor
 78	extension = "csv"
 79
 80	#: 4CAT settings from the perspective of the dataset's owner
 81	config = None
 82
 83	#: Is this processor running 'within' a preset processor?
 84	is_running_in_preset = False
 85
 86	#: Is this processor hidden in the front-end, and only used internally/in presets?
 87	is_hidden = False
 88
 89	#: This will be defined automatically upon loading the processor. There is
 90	#: no need to override manually
 91	filepath = None
 92
 93	def work(self):
 94		"""
 95		Process a dataset
 96
 97		Loads dataset metadata, sets up the scaffolding for performing some kind
 98		of processing on that dataset, and then processes it. Afterwards, clean
 99		up.
100		"""
101		try:
102			# a dataset can have multiple owners, but the creator is the user
103			# that actually queued the processor, so their config is relevant
104			self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
105			self.owner = self.dataset.creator
106		except DataSetException as e:
107			# query has been deleted in the meantime. finish without error,
108			# as deleting it will have been a conscious choice by a user
109			self.job.finish()
110			return
111
112		# set up config reader using the worker's DB connection and the dataset
113		# creator. This ensures that if a value has been overriden for the owner,
114		# the overridden value is used instead.
115		config.with_db(self.db)
116		self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner))
117
118		if self.dataset.data.get("key_parent", None):
119			# search workers never have parents (for now), so we don't need to
120			# find out what the source_dataset dataset is if it's a search worker
121			try:
122				self.source_dataset = self.dataset.get_parent()
123
124				# for presets, transparently use the *top* dataset as a source_dataset
125				# since that is where any underlying processors should get
126				# their data from. However, this should only be done as long as the
127				# preset is not finished yet, because after that there may be processors
128				# that run on the final preset result
129				while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
130					self.is_running_in_preset = True
131					self.source_dataset = self.source_dataset.get_parent()
132					if self.source_dataset is None:
133						# this means there is no dataset that is *not* a preset anywhere
134						# above this dataset. This should never occur, but if it does, we
135						# cannot continue
136						self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
137									   (self.type, self.dataset.key))
138						self.job.finish()
139						return
140
141			except DataSetException:
142				# we need to know what the source_dataset dataset was to properly handle the
143				# analysis
144				self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
145					self.type, self.dataset.key))
146				self.job.finish()
147				return
148
149			if not self.source_dataset.is_finished() and not self.is_running_in_preset:
150				# not finished yet - retry after a while
151				# exception for presets, since these *should* be unfinished
152				# until underlying processors are done
153				self.job.release(delay=30)
154				return
155
156			self.source_file = self.source_dataset.get_results_path()
157			if not self.source_file.exists():
158				self.dataset.update_status("Finished, no input data found.")
159
160		self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
161
162		processor_name = self.title if hasattr(self, "title") else self.type
163		self.dataset.clear_log()
164		self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
165
166		# start log file
167		self.dataset.update_status("Processing data")
168		self.dataset.update_version(get_software_commit(self))
169
170		# get parameters
171		# if possible, fill defaults where parameters are not provided
172		given_parameters = self.dataset.parameters.copy()
173		all_parameters = self.get_options(self.dataset)
174		self.parameters = {
175			param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
176			for param in [*all_parameters.keys(), *given_parameters.keys()]
177		}
178
179		# now the parameters have been loaded into memory, clear any sensitive
180		# ones. This has a side-effect that a processor may not run again
181		# without starting from scratch, but this is the price of progress
182		options = self.get_options(self.dataset.get_parent())
183		for option, option_settings in options.items():
184			if option_settings.get("sensitive"):
185				self.dataset.delete_parameter(option)
186
187		if self.interrupted:
188			self.dataset.log("Processing interrupted, trying again later")
189			return self.abort()
190
191		if not self.dataset.is_finished():
192			try:
193				self.process()
194				self.after_process()
195			except WorkerInterruptedException as e:
196				self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
197				self.abort()
198			except Exception as e:
199				self.dataset.log("Processor crashed (%s), trying again later" % str(e))
200				frames = traceback.extract_tb(e.__traceback__)
201				last_frame = frames[-1]
202				frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames[1:]]
203				location = "->".join(frames)
204
205				# Not all datasets have source_dataset keys
206				if len(self.dataset.get_genealogy()) > 1:
207					parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
208				else:
209					parent_key = ""
210
211				# remove any result files that have been created so far
212				self.remove_files()
213
214				raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
215				self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=last_frame)
216		else:
217			# dataset already finished, job shouldn't be open anymore
218			self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (self.job.data["jobtype"], self.job.data["remote_id"]))
219			self.job.finish()
220
221
222	def after_process(self):
223		"""
224		Run after processing the dataset
225
226		This method cleans up temporary files, and if needed, handles logistics
227		concerning the result file, e.g. running a pre-defined processor on the
228		result, copying it to another dataset, and so on.
229		"""
230		if self.dataset.data["num_rows"] > 0:
231			self.dataset.update_status("Dataset completed.")
232
233		if not self.dataset.is_finished():
234			self.dataset.finish()
235
236		self.dataset.remove_staging_areas()
237
238		# see if we have anything else lined up to run next
239		for next in self.parameters.get("next", []):
240			can_run_next = True
241			next_parameters = next.get("parameters", {})
242			next_type = next.get("type", "")
243			try:
244				available_processors = self.dataset.get_available_processors(user=self.dataset.creator)
245			except ValueError:
246				self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
247				break
248
249			# run it only if the post-processor is actually available for this query
250			if self.dataset.data["num_rows"] <= 0:
251				can_run_next = False
252				self.log.info("Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (next_type, self.dataset.key))
253
254			elif next_type in available_processors:
255				next_analysis = DataSet(
256					parameters=next_parameters,
257					type=next_type,
258					db=self.db,
259					parent=self.dataset.key,
260					extension=available_processors[next_type].extension,
261					is_private=self.dataset.is_private,
262					owner=self.dataset.creator,
263					modules=self.modules
264				)
265				self.queue.add_job(next_type, remote_id=next_analysis.key)
266			else:
267				can_run_next = False
268				self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (self.dataset.key, self.type, next_type))
269
270			if not can_run_next:
271				# We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
272				# preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
273				if "attach_to" in self.parameters:
274					# Probably should not happen, but for some reason a mid processor has been designated as the processor
275					# the parent should attach to
276					pass
277				else:
278					# Check for "attach_to" parameter in descendents
279					while True:
280						if "attach_to" in next_parameters:
281							self.parameters["attach_to"] = next_parameters["attach_to"]
282							break
283						else:
284							if "next" in next_parameters:
285								next_parameters = next_parameters["next"][0]["parameters"]
286							else:
287								# No more descendents
288								# Should not happen; we cannot find the source dataset
289								self.log.warning("Cannot find preset's source dataset for dataset %s" % self.dataset.key)
290								break
291
292		# see if we need to register the result somewhere
293		if "copy_to" in self.parameters:
294			# copy the results to an arbitrary place that was passed
295			if self.dataset.get_results_path().exists():
296				shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
297			else:
298				# if copy_to was passed, that means it's important that this
299				# file exists somewhere, so we create it as an empty file
300				with open(self.parameters["copy_to"], "w") as empty_file:
301					empty_file.write("")
302
303		# see if this query chain is to be attached to another query
304		# if so, the full genealogy of this query (minus the original dataset)
305		# is attached to the given query - this is mostly useful for presets,
306		# where a chain of processors can be marked as 'underlying' a preset
307		if "attach_to" in self.parameters:
308			try:
309				# copy metadata and results to the surrogate
310				surrogate = DataSet(key=self.parameters["attach_to"], db=self.db)
311
312				if self.dataset.get_results_path().exists():
313					# Update the surrogate's results file suffix to match this dataset's suffix
314					surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(self.dataset.get_results_path().suffix)
315					shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
316
317				try:
318					surrogate.finish(self.dataset.data["num_rows"])
319				except RuntimeError:
320					# already finished, could happen (though it shouldn't)
321					pass
322
323				surrogate.update_status(self.dataset.get_status())
324
325			except ValueError:
326				# dataset with key to attach to doesn't exist...
327				self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % (
328				self.dataset.key, self.parameters["attach_to"]))
329
330		self.job.finish()
331
332		if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
333			owner = self.dataset.get_parameters().get("email-complete", False)
334			# Check that username is email address
335			if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
336				from email.mime.multipart import MIMEMultipart
337				from email.mime.text import MIMEText
338				from smtplib import SMTPException
339				import socket
340				import html2text
341
342				self.log.debug("Sending email to %s" % owner)
343				dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key
344				sender = config.get('mail.noreply')
345				message = MIMEMultipart("alternative")
346				message["From"] = sender
347				message["To"] = owner
348				message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
349				mail = """
350					<p>Hello %s,</p>
351					<p>4CAT has finished collecting your %s dataset labeled: %s</p>
352					<p>You can view your dataset via the following link:</p>
353					<p><a href="%s">%s</a></p> 
354					<p>Sincerely,</p>
355					<p>Your friendly neighborhood 4CAT admin</p>
356					""" % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
357				html_parser = html2text.HTML2Text()
358				message.attach(MIMEText(html_parser.handle(mail), "plain"))
359				message.attach(MIMEText(mail, "html"))
360				try:
361					send_email([owner], message)
362				except (SMTPException, ConnectionRefusedError, socket.timeout) as e:
363					self.log.error("Error sending email to %s" % owner)
364
365	def remove_files(self):
366		"""
367		Clean up result files and any staging files for processor to be attempted
368		later if desired.
369		"""
370		# Remove the results file that was created
371		if self.dataset.get_results_path().exists():
372			self.dataset.get_results_path().unlink()
373		if self.dataset.get_results_folder_path().exists():
374			shutil.rmtree(self.dataset.get_results_folder_path())
375
376		# Remove any staging areas with temporary data
377		self.dataset.remove_staging_areas()
378
379	def abort(self):
380		"""
381		Abort dataset creation and clean up so it may be attempted again later
382		"""
383		# remove any result files that have been created so far
384		self.remove_files()
385
386		# we release instead of finish, since interrupting is just that - the
387		# job should resume at a later point. Delay resuming by 10 seconds to
388		# give 4CAT the time to do whatever it wants (though usually this isn't
389		# needed since restarting also stops the spawning of new workers)
390		if self.interrupted == self.INTERRUPT_RETRY:
391			# retry later - wait at least 10 seconds to give the backend time to shut down
392			self.job.release(delay=10)
393		elif self.interrupted == self.INTERRUPT_CANCEL:
394			# cancel job
395			self.job.finish()
396
397	def add_field_to_parent(self, field_name, new_data, which_parent=source_dataset, update_existing=False):
398		"""
399		This function adds a new field to the parent dataset. Expects a list of data points, one for each item
400		in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used
401		to overwrite an existing field.
402
403		TODO: could be improved by accepting different types of data depending on csv or ndjson.
404
405		:param str field_name: 	name of the desired
406		:param List new_data: 	List of data to be added to parent dataset
407		:param DataSet which_parent: 	DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent())
408		:param bool update_existing: 	False (default) will raise an error if the field_name already exists
409										True will allow updating existing data
410		"""
411		if len(new_data) < 1:
412			# no data
413			raise ProcessorException('No data provided')
414
415		if not hasattr(self, "source_dataset") and which_parent is not None:
416			# no source to update
417			raise ProcessorException('No source dataset to update')
418
419		# Get the source file data path
420		parent_path = which_parent.get_results_path()
421
422		if len(new_data) != which_parent.num_rows:
423			raise ProcessorException('Must have new data point for each record: parent dataset: %i, new data points: %i' % (which_parent.num_rows, len(new_data)))
424
425		self.dataset.update_status("Adding new field %s to the source file" % field_name)
426
427		# Get a temporary path where we can store the data
428		tmp_path = self.dataset.get_staging_area()
429		tmp_file_path = tmp_path.joinpath(parent_path.name)
430
431		# go through items one by one, optionally mapping them
432		if parent_path.suffix.lower() == ".csv":
433			# Get field names
434			fieldnames = which_parent.get_columns()
435			if not update_existing and field_name in fieldnames:
436				raise ProcessorException('field_name %s already exists!' % field_name)
437			fieldnames.append(field_name)
438
439			# Iterate through the original dataset and add values to a new column
440			self.dataset.update_status("Writing new source file with %s." % field_name)
441			with tmp_file_path.open("w", encoding="utf-8", newline="") as output:
442				writer = csv.DictWriter(output, fieldnames=fieldnames)
443				writer.writeheader()
444
445				for count, post in enumerate(which_parent.iterate_items(self)):
446					# stop processing if worker has been asked to stop
447					if self.interrupted:
448						raise ProcessorInterruptedException("Interrupted while writing CSV file")
449
450					post.original[field_name] = new_data[count]
451					writer.writerow(post.original)
452
453		elif parent_path.suffix.lower() == ".ndjson":
454			# JSON cannot encode sets
455			if type(new_data[0]) is set:
456				# could check each if type(datapoint) is set, but that could be extensive...
457				new_data = [list(datapoint) for datapoint in new_data]
458
459			with tmp_file_path.open("w", encoding="utf-8", newline="") as output:
460				for count, post in enumerate(which_parent.iterate_items(self)):
461					# stop processing if worker has been asked to stop
462					if self.interrupted:
463						raise ProcessorInterruptedException("Interrupted while writing NDJSON file")
464
465					if not update_existing and field_name in post.original.keys():
466						raise ProcessorException('field_name %s already exists!' % field_name)
467
468					# Update data
469					post.original[field_name] = new_data[count]
470
471					output.write(json.dumps(post.original) + "\n")
472		else:
473			raise NotImplementedError("Cannot iterate through %s file" % parent_path.suffix)
474
475		# Replace the source file path with the new file
476		shutil.copy(str(tmp_file_path), str(parent_path))
477
478		# delete temporary files and folder
479		shutil.rmtree(tmp_path)
480
481		self.dataset.update_status("Parent dataset updated.")
482
483	def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
484		"""
485		A generator that iterates through files in an archive
486
487		With every iteration, the processor's 'interrupted' flag is checked,
488		and if set a ProcessorInterruptedException is raised, which by default
489		is caught and subsequently stops execution gracefully.
490
491		Files are temporarily unzipped and deleted after use.
492
493		:param Path path: 	Path to zip file to read
494		:param Path staging_area:  Where to store the files while they're
495		  being worked with. If omitted, a temporary folder is created and
496		  deleted after use
497		:param bool immediately_delete:  Temporary files are removed after yielded;
498		  False keeps files until the staging_area is removed (usually during processor
499		  cleanup)
500		:param list filename_filter:  Whitelist of filenames to iterate.
501		Other files will be ignored. If empty, do not ignore anything.
502		:return:  An iterator with a Path item for each file
503		"""
504
505		if not path.exists():
506			return
507
508		if not staging_area:
509			staging_area = self.dataset.get_staging_area()
510
511		if not staging_area.exists() or not staging_area.is_dir():
512			raise RuntimeError("Staging area %s is not a valid folder")
513
514		with zipfile.ZipFile(path, "r") as archive_file:
515			archive_contents = sorted(archive_file.namelist())
516
517			for archived_file in archive_contents:
518				if filename_filter and archived_file not in filename_filter:
519					continue
520
521				info = archive_file.getinfo(archived_file)
522				if info.is_dir():
523					continue
524
525				if self.interrupted:
526					raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
527
528				temp_file = staging_area.joinpath(archived_file)
529				archive_file.extract(archived_file, staging_area)
530
531				yield temp_file
532				if immediately_delete:
533					temp_file.unlink()
534
535	def unpack_archive_contents(self, path, staging_area=None):
536		"""
537		Unpack all files in an archive to a staging area
538
539		With every iteration, the processor's 'interrupted' flag is checked,
540		and if set a ProcessorInterruptedException is raised, which by default
541		is caught and subsequently stops execution gracefully.
542
543		Files are unzipped to a staging area. The staging area is *not*
544		cleaned up automatically.
545
546		:param Path path: 	Path to zip file to read
547		:param Path staging_area:  Where to store the files while they're
548		  being worked with. If omitted, a temporary folder is created and
549		  deleted after use
550		:param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
551		:return Path:  A path to the staging area
552		"""
553
554		if not path.exists():
555			return
556
557		if not staging_area:
558			staging_area = self.dataset.get_staging_area()
559
560		if not staging_area.exists() or not staging_area.is_dir():
561			raise RuntimeError("Staging area %s is not a valid folder")
562
563		paths = []
564		with zipfile.ZipFile(path, "r") as archive_file:
565			archive_contents = sorted(archive_file.namelist())
566
567			for archived_file in archive_contents:
568				if self.interrupted:
569					raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
570
571				file_name = archived_file.split("/")[-1]
572				temp_file = staging_area.joinpath(file_name)
573				archive_file.extract(archived_file, staging_area)
574				paths.append(temp_file)
575
576		return staging_area
577
578	def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
579		"""
580		Extract a file from an archive by name
581
582		:param str filename:  Name of file to extract
583		:param Path archive_path:  Path to zip file to read
584		:param Path staging_area:  Where to store the files while they're
585		  		being worked with. If omitted, a temporary folder is created
586		:return Path:  A path to the extracted file
587		"""
588		if not archive_path.exists():
589			return
590
591		if not staging_area:
592			staging_area = self.dataset.get_staging_area()
593
594		if not staging_area.exists() or not staging_area.is_dir():
595			raise RuntimeError("Staging area %s is not a valid folder")
596
597		with zipfile.ZipFile(archive_path, "r") as archive_file:
598			if filename not in archive_file.namelist():
599				raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
600			else:
601				archive_file.extract(filename, staging_area)
602				return staging_area.joinpath(filename)
603
604	def write_csv_items_and_finish(self, data):
605		"""
606		Write data as csv to results file and finish dataset
607
608		Determines result file path using dataset's path determination helper
609		methods. After writing results, the dataset is marked finished. Will
610		raise a ProcessorInterruptedException if the interrupted flag for this
611		processor is set while iterating.
612
613		:param data: A list or tuple of dictionaries, all with the same keys
614		"""
615		if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
616			raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
617
618		if not data:
619			raise ValueError("write_csv_items requires a dictionary with at least one item")
620
621		self.dataset.update_status("Writing results file")
622		writer = False
623		with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
624			for row in data:
625				if self.interrupted:
626					raise ProcessorInterruptedException("Interrupted while writing results file")
627
628				row = remove_nuls(row)
629				if not writer:
630					writer = csv.DictWriter(results, fieldnames=row.keys())
631					writer.writeheader()
632
633				writer.writerow(row)
634
635		self.dataset.update_status("Finished")
636		self.dataset.finish(len(data))
637
638	def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
639		"""
640		Archive a bunch of files into a zip archive and finish processing
641
642		:param list|Path files: If a list, all files will be added to the
643		  archive and deleted afterwards. If a folder, all files in the folder
644		  will be added and the folder will be deleted afterwards.
645		:param int num_items: Items in the dataset. If None, the amount of
646		  files added to the archive will be used.
647		:param int compression:  Type of compression to use. By default, files
648		  are not compressed, to speed up unarchiving.
649		:param bool finish:  Finish the dataset/job afterwards or not?
650		"""
651		is_folder = False
652		if issubclass(type(files), PurePath):
653			is_folder = files
654			if not files.exists() or not files.is_dir():
655				raise RuntimeError("Folder %s is not a folder that can be archived" % files)
656
657			files = files.glob("*")
658
659		# create zip of archive and delete temporary files and folder
660		self.dataset.update_status("Compressing results into archive")
661		done = 0
662		with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
663			for output_path in files:
664				zip.write(output_path, output_path.name)
665				output_path.unlink()
666				done += 1
667
668		# delete temporary folder
669		if is_folder:
670			shutil.rmtree(is_folder)
671
672		self.dataset.update_status("Finished")
673		if num_items is None:
674			num_items = done
675
676		if finish:
677			self.dataset.finish(num_items)
678
679	def create_standalone(self):
680		"""
681		Copy this dataset and make that copy standalone
682
683		This has the benefit of allowing for all analyses that can be run on
684		full datasets on the new, filtered copy as well.
685
686		:return DataSet:  The new standalone dataset
687		"""
688		top_parent = self.source_dataset
689
690		finished = self.dataset.check_dataset_finished()
691		if finished == 'empty':
692			# No data to process, so we can't create a standalone dataset
693			return
694		elif finished is None:
695			# I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
696			pass
697
698		standalone = self.dataset.copy(shallow=False)
699		standalone.body_match = "(Filtered) " + top_parent.query
700		standalone.datasource = top_parent.parameters.get("datasource", "custom")
701
702		try:
703			standalone.board = top_parent.board
704		except AttributeError:
705			standalone.board = self.type
706
707		standalone.type = top_parent.type
708
709		standalone.detach()
710		standalone.delete_parameter("key_parent")
711
712		self.dataset.copied_to = standalone.key
713
714		# we don't need this file anymore - it has been copied to the new
715		# standalone dataset, and this one is not accessible via the interface
716		# except as a link to the copied standalone dataset
717		os.unlink(self.dataset.get_results_path())
718
719		# Copy the log
720		shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
721
722		return standalone
723
724	@classmethod
725	def map_item_method_available(cls, dataset):
726		"""
727		Check if this processor can use map_item
728
729		Checks if map_item method exists and is compatible with dataset. If
730		dataset has a different extension than the default for this processor,
731		or if the dataset has no extension, this means we cannot be sure the
732		data is in the right format to be mapped, so `False` is returned in
733		that case even if a map_item() method is available.
734
735		:param BasicProcessor processor:	The BasicProcessor subclass object
736		with which to use map_item
737		:param DataSet dataset:				The DataSet object with which to
738		use map_item
739		"""
740		# only run item mapper if extension of processor == extension of
741		# data file, for the scenario where a csv file was uploaded and
742		# converted to an ndjson-based data source, for example
743		# todo: this is kind of ugly, and a better fix may be possible
744		dataset_extension = dataset.get_extension()
745		if not dataset_extension:
746			# DataSet results file does not exist or has no extension, use expected extension
747			if hasattr(dataset, "extension"):
748				dataset_extension = dataset.extension
749			else:
750				# No known DataSet extension; cannot determine if map_item method compatible
751				return False
752
753		return hasattr(cls, "map_item") and cls.extension == dataset_extension
754
755	@classmethod
756	def get_mapped_item(cls, item):
757		"""
758		Get the mapped item using a processors map_item method.
759
760		Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
761		"""
762		try:
763			mapped_item = cls.map_item(item)
764		except (KeyError, IndexError) as e:
765			raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
766
767		if not mapped_item:
768			raise MapItemException("Unable to map item!")
769
770		return mapped_item
771
772	@classmethod
773	def is_filter(cls):
774		"""
775		Is this processor a filter?
776
777		Filters do not produce their own dataset but replace the source_dataset dataset
778		instead.
779
780		:todo: Make this a bit more robust than sniffing the processor category
781		:return bool:
782		"""
783		return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
784
785	@classmethod
786	def get_options(cls, parent_dataset=None, user=None):
787		"""
788		Get processor options
789
790		This method by default returns the class's "options" attribute, or an
791		empty dictionary. It can be redefined by processors that need more
792		fine-grained options, e.g. in cases where the availability of options
793		is partially determined by the parent dataset's parameters.
794
795		:param DataSet parent_dataset:  An object representing the dataset that
796		  the processor would be run on
797		:param User user:  Flask user the options will be displayed for, in
798		  case they are requested for display in the 4CAT web interface. This can
799		  be used to show some options only to privileges users.
800		"""
801		return cls.options if hasattr(cls, "options") else {}
802
803	@classmethod
804	def get_status(cls):
805		"""
806		Get processor status
807
808		:return list:	Statuses of this processor
809		"""
810		return cls.status if hasattr(cls, "status") else None
811
812	@classmethod
813	def is_top_dataset(cls):
814		"""
815		Confirm this is *not* a top dataset, but a processor.
816
817		Used for processor compatibility checks.
818
819		:return bool:  Always `False`, because this is a processor.
820		"""
821		return False
822
823	@classmethod
824	def is_from_collector(cls):
825		"""
826		Check if this processor is one that collects data, i.e. a search or
827		import worker.
828
829		:return bool:
830		"""
831		return cls.type.endswith("-search") or cls.type.endswith("-import")
832
833	@classmethod
834	def get_extension(self, parent_dataset=None):
835		"""
836		Return the extension of the processor's dataset
837
838		Used for processor compatibility checks.
839
840		:param DataSet parent_dataset:  An object representing the dataset that
841		  the processor would be run on
842		:return str|None:  Dataset extension (without leading `.`) or `None`.
843		"""
844		if self.is_filter():
845			if parent_dataset is not None:
846				# Filters should use the same extension as the parent dataset
847				return parent_dataset.get_extension()
848			else:
849				# No dataset provided, unable to determine extension of parent dataset
850				# if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
851				return None
852		elif self.extension:
853			# Use explicitly defined extension in class (Processor class defaults to "csv")
854			return self.extension
855		else:
856			# A non filter processor updated the base Processor extension to None/False?
857			return None
858
859	@classmethod
860	def is_rankable(cls, multiple_items=True):
861		"""
862		Used for processor compatibility
863
864		:param bool multiple_items:  Consider datasets with multiple items per
865		  item (e.g. word_1, word_2, etc)? Included for compatibility
866		"""
867		return False
868
869	@classmethod
870	def exclude_followup_processors(cls, processor_type=None):
871		"""
872        Used for processor compatibility
873
874        To be defined by the child processor if it should exclude certain follow-up processors.
875        e.g.:
876
877        def exclude_followup_processors(cls, processor_type):
878			if processor_type in ["undesirable-followup-processor"]:
879				return True
880			return False
881
882        :param str processor_type:  Processor type to exclude
883        :return bool:  True if processor should be excluded, False otherwise
884        """
885		return False
886
887	@abc.abstractmethod
888	def process(self):
889		"""
890		Process data
891
892		To be defined by the child processor.
893		"""
894		pass
895
896	@staticmethod
897	def is_4cat_processor():
898		"""
899		Is this a 4CAT processor?
900
901		This is used to determine whether a class is a 4CAT
902		processor.
903		
904		:return:  True
905		"""
906		return True
 30class BasicProcessor(FourcatModule, BasicWorker, metaclass=abc.ABCMeta):
 31	"""
 32	Abstract processor class
 33
 34	A processor takes a finished dataset as input and processes its result in
 35	some way, with another dataset set as output. The input thus is a file, and
 36	the output (usually) as well. In other words, the result of a processor can
 37	be used as input for another processor (though whether and when this is
 38	useful is another question).
 39
 40	To determine whether a processor can process a given dataset, you can
 41	define a `is_compatible_with(FourcatModule module=None, str user=None):) -> bool` class
 42	method which takes a dataset as argument and returns a bool that determines
 43	if this processor is considered compatible with that dataset. For example:
 44
 45	.. code-block:: python
 46
 47        @classmethod
 48        def is_compatible_with(cls, module=None, user=None):
 49            return module.type == "linguistic-features"
 50
 51
 52	"""
 53
 54	#: Database handler to interface with the 4CAT database
 55	db = None
 56
 57	#: Job object that requests the execution of this processor
 58	job = None
 59
 60	#: The dataset object that the processor is *creating*.
 61	dataset = None
 62
 63	#: Owner (username) of the dataset
 64	owner = None
 65
 66	#: The dataset object that the processor is *processing*.
 67	source_dataset = None
 68
 69	#: The file that is being processed
 70	source_file = None
 71
 72	#: Processor description, which will be displayed in the web interface
 73	description = "No description available"
 74
 75	#: Category identifier, used to group processors in the web interface
 76	category = "Other"
 77
 78	#: Extension of the file created by the processor
 79	extension = "csv"
 80
 81	#: 4CAT settings from the perspective of the dataset's owner
 82	config = None
 83
 84	#: Is this processor running 'within' a preset processor?
 85	is_running_in_preset = False
 86
 87	#: Is this processor hidden in the front-end, and only used internally/in presets?
 88	is_hidden = False
 89
 90	#: This will be defined automatically upon loading the processor. There is
 91	#: no need to override manually
 92	filepath = None
 93
 94	def work(self):
 95		"""
 96		Process a dataset
 97
 98		Loads dataset metadata, sets up the scaffolding for performing some kind
 99		of processing on that dataset, and then processes it. Afterwards, clean
100		up.
101		"""
102		try:
103			# a dataset can have multiple owners, but the creator is the user
104			# that actually queued the processor, so their config is relevant
105			self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
106			self.owner = self.dataset.creator
107		except DataSetException as e:
108			# query has been deleted in the meantime. finish without error,
109			# as deleting it will have been a conscious choice by a user
110			self.job.finish()
111			return
112
113		# set up config reader using the worker's DB connection and the dataset
114		# creator. This ensures that if a value has been overriden for the owner,
115		# the overridden value is used instead.
116		config.with_db(self.db)
117		self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner))
118
119		if self.dataset.data.get("key_parent", None):
120			# search workers never have parents (for now), so we don't need to
121			# find out what the source_dataset dataset is if it's a search worker
122			try:
123				self.source_dataset = self.dataset.get_parent()
124
125				# for presets, transparently use the *top* dataset as a source_dataset
126				# since that is where any underlying processors should get
127				# their data from. However, this should only be done as long as the
128				# preset is not finished yet, because after that there may be processors
129				# that run on the final preset result
130				while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
131					self.is_running_in_preset = True
132					self.source_dataset = self.source_dataset.get_parent()
133					if self.source_dataset is None:
134						# this means there is no dataset that is *not* a preset anywhere
135						# above this dataset. This should never occur, but if it does, we
136						# cannot continue
137						self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
138									   (self.type, self.dataset.key))
139						self.job.finish()
140						return
141
142			except DataSetException:
143				# we need to know what the source_dataset dataset was to properly handle the
144				# analysis
145				self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
146					self.type, self.dataset.key))
147				self.job.finish()
148				return
149
150			if not self.source_dataset.is_finished() and not self.is_running_in_preset:
151				# not finished yet - retry after a while
152				# exception for presets, since these *should* be unfinished
153				# until underlying processors are done
154				self.job.release(delay=30)
155				return
156
157			self.source_file = self.source_dataset.get_results_path()
158			if not self.source_file.exists():
159				self.dataset.update_status("Finished, no input data found.")
160
161		self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
162
163		processor_name = self.title if hasattr(self, "title") else self.type
164		self.dataset.clear_log()
165		self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
166
167		# start log file
168		self.dataset.update_status("Processing data")
169		self.dataset.update_version(get_software_commit(self))
170
171		# get parameters
172		# if possible, fill defaults where parameters are not provided
173		given_parameters = self.dataset.parameters.copy()
174		all_parameters = self.get_options(self.dataset)
175		self.parameters = {
176			param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
177			for param in [*all_parameters.keys(), *given_parameters.keys()]
178		}
179
180		# now the parameters have been loaded into memory, clear any sensitive
181		# ones. This has a side-effect that a processor may not run again
182		# without starting from scratch, but this is the price of progress
183		options = self.get_options(self.dataset.get_parent())
184		for option, option_settings in options.items():
185			if option_settings.get("sensitive"):
186				self.dataset.delete_parameter(option)
187
188		if self.interrupted:
189			self.dataset.log("Processing interrupted, trying again later")
190			return self.abort()
191
192		if not self.dataset.is_finished():
193			try:
194				self.process()
195				self.after_process()
196			except WorkerInterruptedException as e:
197				self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
198				self.abort()
199			except Exception as e:
200				self.dataset.log("Processor crashed (%s), trying again later" % str(e))
201				frames = traceback.extract_tb(e.__traceback__)
202				last_frame = frames[-1]
203				frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames[1:]]
204				location = "->".join(frames)
205
206				# Not all datasets have source_dataset keys
207				if len(self.dataset.get_genealogy()) > 1:
208					parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
209				else:
210					parent_key = ""
211
212				# remove any result files that have been created so far
213				self.remove_files()
214
215				raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
216				self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=last_frame)
217		else:
218			# dataset already finished, job shouldn't be open anymore
219			self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (self.job.data["jobtype"], self.job.data["remote_id"]))
220			self.job.finish()
221
222
223	def after_process(self):
224		"""
225		Run after processing the dataset
226
227		This method cleans up temporary files, and if needed, handles logistics
228		concerning the result file, e.g. running a pre-defined processor on the
229		result, copying it to another dataset, and so on.
230		"""
231		if self.dataset.data["num_rows"] > 0:
232			self.dataset.update_status("Dataset completed.")
233
234		if not self.dataset.is_finished():
235			self.dataset.finish()
236
237		self.dataset.remove_staging_areas()
238
239		# see if we have anything else lined up to run next
240		for next in self.parameters.get("next", []):
241			can_run_next = True
242			next_parameters = next.get("parameters", {})
243			next_type = next.get("type", "")
244			try:
245				available_processors = self.dataset.get_available_processors(user=self.dataset.creator)
246			except ValueError:
247				self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
248				break
249
250			# run it only if the post-processor is actually available for this query
251			if self.dataset.data["num_rows"] <= 0:
252				can_run_next = False
253				self.log.info("Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (next_type, self.dataset.key))
254
255			elif next_type in available_processors:
256				next_analysis = DataSet(
257					parameters=next_parameters,
258					type=next_type,
259					db=self.db,
260					parent=self.dataset.key,
261					extension=available_processors[next_type].extension,
262					is_private=self.dataset.is_private,
263					owner=self.dataset.creator,
264					modules=self.modules
265				)
266				self.queue.add_job(next_type, remote_id=next_analysis.key)
267			else:
268				can_run_next = False
269				self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (self.dataset.key, self.type, next_type))
270
271			if not can_run_next:
272				# We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
273				# preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
274				if "attach_to" in self.parameters:
275					# Probably should not happen, but for some reason a mid processor has been designated as the processor
276					# the parent should attach to
277					pass
278				else:
279					# Check for "attach_to" parameter in descendents
280					while True:
281						if "attach_to" in next_parameters:
282							self.parameters["attach_to"] = next_parameters["attach_to"]
283							break
284						else:
285							if "next" in next_parameters:
286								next_parameters = next_parameters["next"][0]["parameters"]
287							else:
288								# No more descendents
289								# Should not happen; we cannot find the source dataset
290								self.log.warning("Cannot find preset's source dataset for dataset %s" % self.dataset.key)
291								break
292
293		# see if we need to register the result somewhere
294		if "copy_to" in self.parameters:
295			# copy the results to an arbitrary place that was passed
296			if self.dataset.get_results_path().exists():
297				shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
298			else:
299				# if copy_to was passed, that means it's important that this
300				# file exists somewhere, so we create it as an empty file
301				with open(self.parameters["copy_to"], "w") as empty_file:
302					empty_file.write("")
303
304		# see if this query chain is to be attached to another query
305		# if so, the full genealogy of this query (minus the original dataset)
306		# is attached to the given query - this is mostly useful for presets,
307		# where a chain of processors can be marked as 'underlying' a preset
308		if "attach_to" in self.parameters:
309			try:
310				# copy metadata and results to the surrogate
311				surrogate = DataSet(key=self.parameters["attach_to"], db=self.db)
312
313				if self.dataset.get_results_path().exists():
314					# Update the surrogate's results file suffix to match this dataset's suffix
315					surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(self.dataset.get_results_path().suffix)
316					shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
317
318				try:
319					surrogate.finish(self.dataset.data["num_rows"])
320				except RuntimeError:
321					# already finished, could happen (though it shouldn't)
322					pass
323
324				surrogate.update_status(self.dataset.get_status())
325
326			except ValueError:
327				# dataset with key to attach to doesn't exist...
328				self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % (
329				self.dataset.key, self.parameters["attach_to"]))
330
331		self.job.finish()
332
333		if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
334			owner = self.dataset.get_parameters().get("email-complete", False)
335			# Check that username is email address
336			if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
337				from email.mime.multipart import MIMEMultipart
338				from email.mime.text import MIMEText
339				from smtplib import SMTPException
340				import socket
341				import html2text
342
343				self.log.debug("Sending email to %s" % owner)
344				dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key
345				sender = config.get('mail.noreply')
346				message = MIMEMultipart("alternative")
347				message["From"] = sender
348				message["To"] = owner
349				message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
350				mail = """
351					<p>Hello %s,</p>
352					<p>4CAT has finished collecting your %s dataset labeled: %s</p>
353					<p>You can view your dataset via the following link:</p>
354					<p><a href="%s">%s</a></p> 
355					<p>Sincerely,</p>
356					<p>Your friendly neighborhood 4CAT admin</p>
357					""" % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
358				html_parser = html2text.HTML2Text()
359				message.attach(MIMEText(html_parser.handle(mail), "plain"))
360				message.attach(MIMEText(mail, "html"))
361				try:
362					send_email([owner], message)
363				except (SMTPException, ConnectionRefusedError, socket.timeout) as e:
364					self.log.error("Error sending email to %s" % owner)
365
366	def remove_files(self):
367		"""
368		Clean up result files and any staging files for processor to be attempted
369		later if desired.
370		"""
371		# Remove the results file that was created
372		if self.dataset.get_results_path().exists():
373			self.dataset.get_results_path().unlink()
374		if self.dataset.get_results_folder_path().exists():
375			shutil.rmtree(self.dataset.get_results_folder_path())
376
377		# Remove any staging areas with temporary data
378		self.dataset.remove_staging_areas()
379
380	def abort(self):
381		"""
382		Abort dataset creation and clean up so it may be attempted again later
383		"""
384		# remove any result files that have been created so far
385		self.remove_files()
386
387		# we release instead of finish, since interrupting is just that - the
388		# job should resume at a later point. Delay resuming by 10 seconds to
389		# give 4CAT the time to do whatever it wants (though usually this isn't
390		# needed since restarting also stops the spawning of new workers)
391		if self.interrupted == self.INTERRUPT_RETRY:
392			# retry later - wait at least 10 seconds to give the backend time to shut down
393			self.job.release(delay=10)
394		elif self.interrupted == self.INTERRUPT_CANCEL:
395			# cancel job
396			self.job.finish()
397
398	def add_field_to_parent(self, field_name, new_data, which_parent=source_dataset, update_existing=False):
399		"""
400		This function adds a new field to the parent dataset. Expects a list of data points, one for each item
401		in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used
402		to overwrite an existing field.
403
404		TODO: could be improved by accepting different types of data depending on csv or ndjson.
405
406		:param str field_name: 	name of the desired
407		:param List new_data: 	List of data to be added to parent dataset
408		:param DataSet which_parent: 	DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent())
409		:param bool update_existing: 	False (default) will raise an error if the field_name already exists
410										True will allow updating existing data
411		"""
412		if len(new_data) < 1:
413			# no data
414			raise ProcessorException('No data provided')
415
416		if not hasattr(self, "source_dataset") and which_parent is not None:
417			# no source to update
418			raise ProcessorException('No source dataset to update')
419
420		# Get the source file data path
421		parent_path = which_parent.get_results_path()
422
423		if len(new_data) != which_parent.num_rows:
424			raise ProcessorException('Must have new data point for each record: parent dataset: %i, new data points: %i' % (which_parent.num_rows, len(new_data)))
425
426		self.dataset.update_status("Adding new field %s to the source file" % field_name)
427
428		# Get a temporary path where we can store the data
429		tmp_path = self.dataset.get_staging_area()
430		tmp_file_path = tmp_path.joinpath(parent_path.name)
431
432		# go through items one by one, optionally mapping them
433		if parent_path.suffix.lower() == ".csv":
434			# Get field names
435			fieldnames = which_parent.get_columns()
436			if not update_existing and field_name in fieldnames:
437				raise ProcessorException('field_name %s already exists!' % field_name)
438			fieldnames.append(field_name)
439
440			# Iterate through the original dataset and add values to a new column
441			self.dataset.update_status("Writing new source file with %s." % field_name)
442			with tmp_file_path.open("w", encoding="utf-8", newline="") as output:
443				writer = csv.DictWriter(output, fieldnames=fieldnames)
444				writer.writeheader()
445
446				for count, post in enumerate(which_parent.iterate_items(self)):
447					# stop processing if worker has been asked to stop
448					if self.interrupted:
449						raise ProcessorInterruptedException("Interrupted while writing CSV file")
450
451					post.original[field_name] = new_data[count]
452					writer.writerow(post.original)
453
454		elif parent_path.suffix.lower() == ".ndjson":
455			# JSON cannot encode sets
456			if type(new_data[0]) is set:
457				# could check each if type(datapoint) is set, but that could be extensive...
458				new_data = [list(datapoint) for datapoint in new_data]
459
460			with tmp_file_path.open("w", encoding="utf-8", newline="") as output:
461				for count, post in enumerate(which_parent.iterate_items(self)):
462					# stop processing if worker has been asked to stop
463					if self.interrupted:
464						raise ProcessorInterruptedException("Interrupted while writing NDJSON file")
465
466					if not update_existing and field_name in post.original.keys():
467						raise ProcessorException('field_name %s already exists!' % field_name)
468
469					# Update data
470					post.original[field_name] = new_data[count]
471
472					output.write(json.dumps(post.original) + "\n")
473		else:
474			raise NotImplementedError("Cannot iterate through %s file" % parent_path.suffix)
475
476		# Replace the source file path with the new file
477		shutil.copy(str(tmp_file_path), str(parent_path))
478
479		# delete temporary files and folder
480		shutil.rmtree(tmp_path)
481
482		self.dataset.update_status("Parent dataset updated.")
483
484	def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
485		"""
486		A generator that iterates through files in an archive
487
488		With every iteration, the processor's 'interrupted' flag is checked,
489		and if set a ProcessorInterruptedException is raised, which by default
490		is caught and subsequently stops execution gracefully.
491
492		Files are temporarily unzipped and deleted after use.
493
494		:param Path path: 	Path to zip file to read
495		:param Path staging_area:  Where to store the files while they're
496		  being worked with. If omitted, a temporary folder is created and
497		  deleted after use
498		:param bool immediately_delete:  Temporary files are removed after yielded;
499		  False keeps files until the staging_area is removed (usually during processor
500		  cleanup)
501		:param list filename_filter:  Whitelist of filenames to iterate.
502		Other files will be ignored. If empty, do not ignore anything.
503		:return:  An iterator with a Path item for each file
504		"""
505
506		if not path.exists():
507			return
508
509		if not staging_area:
510			staging_area = self.dataset.get_staging_area()
511
512		if not staging_area.exists() or not staging_area.is_dir():
513			raise RuntimeError("Staging area %s is not a valid folder")
514
515		with zipfile.ZipFile(path, "r") as archive_file:
516			archive_contents = sorted(archive_file.namelist())
517
518			for archived_file in archive_contents:
519				if filename_filter and archived_file not in filename_filter:
520					continue
521
522				info = archive_file.getinfo(archived_file)
523				if info.is_dir():
524					continue
525
526				if self.interrupted:
527					raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
528
529				temp_file = staging_area.joinpath(archived_file)
530				archive_file.extract(archived_file, staging_area)
531
532				yield temp_file
533				if immediately_delete:
534					temp_file.unlink()
535
536	def unpack_archive_contents(self, path, staging_area=None):
537		"""
538		Unpack all files in an archive to a staging area
539
540		With every iteration, the processor's 'interrupted' flag is checked,
541		and if set a ProcessorInterruptedException is raised, which by default
542		is caught and subsequently stops execution gracefully.
543
544		Files are unzipped to a staging area. The staging area is *not*
545		cleaned up automatically.
546
547		:param Path path: 	Path to zip file to read
548		:param Path staging_area:  Where to store the files while they're
549		  being worked with. If omitted, a temporary folder is created and
550		  deleted after use
551		:param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
552		:return Path:  A path to the staging area
553		"""
554
555		if not path.exists():
556			return
557
558		if not staging_area:
559			staging_area = self.dataset.get_staging_area()
560
561		if not staging_area.exists() or not staging_area.is_dir():
562			raise RuntimeError("Staging area %s is not a valid folder")
563
564		paths = []
565		with zipfile.ZipFile(path, "r") as archive_file:
566			archive_contents = sorted(archive_file.namelist())
567
568			for archived_file in archive_contents:
569				if self.interrupted:
570					raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
571
572				file_name = archived_file.split("/")[-1]
573				temp_file = staging_area.joinpath(file_name)
574				archive_file.extract(archived_file, staging_area)
575				paths.append(temp_file)
576
577		return staging_area
578
579	def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
580		"""
581		Extract a file from an archive by name
582
583		:param str filename:  Name of file to extract
584		:param Path archive_path:  Path to zip file to read
585		:param Path staging_area:  Where to store the files while they're
586		  		being worked with. If omitted, a temporary folder is created
587		:return Path:  A path to the extracted file
588		"""
589		if not archive_path.exists():
590			return
591
592		if not staging_area:
593			staging_area = self.dataset.get_staging_area()
594
595		if not staging_area.exists() or not staging_area.is_dir():
596			raise RuntimeError("Staging area %s is not a valid folder")
597
598		with zipfile.ZipFile(archive_path, "r") as archive_file:
599			if filename not in archive_file.namelist():
600				raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
601			else:
602				archive_file.extract(filename, staging_area)
603				return staging_area.joinpath(filename)
604
605	def write_csv_items_and_finish(self, data):
606		"""
607		Write data as csv to results file and finish dataset
608
609		Determines result file path using dataset's path determination helper
610		methods. After writing results, the dataset is marked finished. Will
611		raise a ProcessorInterruptedException if the interrupted flag for this
612		processor is set while iterating.
613
614		:param data: A list or tuple of dictionaries, all with the same keys
615		"""
616		if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
617			raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
618
619		if not data:
620			raise ValueError("write_csv_items requires a dictionary with at least one item")
621
622		self.dataset.update_status("Writing results file")
623		writer = False
624		with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
625			for row in data:
626				if self.interrupted:
627					raise ProcessorInterruptedException("Interrupted while writing results file")
628
629				row = remove_nuls(row)
630				if not writer:
631					writer = csv.DictWriter(results, fieldnames=row.keys())
632					writer.writeheader()
633
634				writer.writerow(row)
635
636		self.dataset.update_status("Finished")
637		self.dataset.finish(len(data))
638
639	def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
640		"""
641		Archive a bunch of files into a zip archive and finish processing
642
643		:param list|Path files: If a list, all files will be added to the
644		  archive and deleted afterwards. If a folder, all files in the folder
645		  will be added and the folder will be deleted afterwards.
646		:param int num_items: Items in the dataset. If None, the amount of
647		  files added to the archive will be used.
648		:param int compression:  Type of compression to use. By default, files
649		  are not compressed, to speed up unarchiving.
650		:param bool finish:  Finish the dataset/job afterwards or not?
651		"""
652		is_folder = False
653		if issubclass(type(files), PurePath):
654			is_folder = files
655			if not files.exists() or not files.is_dir():
656				raise RuntimeError("Folder %s is not a folder that can be archived" % files)
657
658			files = files.glob("*")
659
660		# create zip of archive and delete temporary files and folder
661		self.dataset.update_status("Compressing results into archive")
662		done = 0
663		with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
664			for output_path in files:
665				zip.write(output_path, output_path.name)
666				output_path.unlink()
667				done += 1
668
669		# delete temporary folder
670		if is_folder:
671			shutil.rmtree(is_folder)
672
673		self.dataset.update_status("Finished")
674		if num_items is None:
675			num_items = done
676
677		if finish:
678			self.dataset.finish(num_items)
679
680	def create_standalone(self):
681		"""
682		Copy this dataset and make that copy standalone
683
684		This has the benefit of allowing for all analyses that can be run on
685		full datasets on the new, filtered copy as well.
686
687		:return DataSet:  The new standalone dataset
688		"""
689		top_parent = self.source_dataset
690
691		finished = self.dataset.check_dataset_finished()
692		if finished == 'empty':
693			# No data to process, so we can't create a standalone dataset
694			return
695		elif finished is None:
696			# I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
697			pass
698
699		standalone = self.dataset.copy(shallow=False)
700		standalone.body_match = "(Filtered) " + top_parent.query
701		standalone.datasource = top_parent.parameters.get("datasource", "custom")
702
703		try:
704			standalone.board = top_parent.board
705		except AttributeError:
706			standalone.board = self.type
707
708		standalone.type = top_parent.type
709
710		standalone.detach()
711		standalone.delete_parameter("key_parent")
712
713		self.dataset.copied_to = standalone.key
714
715		# we don't need this file anymore - it has been copied to the new
716		# standalone dataset, and this one is not accessible via the interface
717		# except as a link to the copied standalone dataset
718		os.unlink(self.dataset.get_results_path())
719
720		# Copy the log
721		shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
722
723		return standalone
724
725	@classmethod
726	def map_item_method_available(cls, dataset):
727		"""
728		Check if this processor can use map_item
729
730		Checks if map_item method exists and is compatible with dataset. If
731		dataset has a different extension than the default for this processor,
732		or if the dataset has no extension, this means we cannot be sure the
733		data is in the right format to be mapped, so `False` is returned in
734		that case even if a map_item() method is available.
735
736		:param BasicProcessor processor:	The BasicProcessor subclass object
737		with which to use map_item
738		:param DataSet dataset:				The DataSet object with which to
739		use map_item
740		"""
741		# only run item mapper if extension of processor == extension of
742		# data file, for the scenario where a csv file was uploaded and
743		# converted to an ndjson-based data source, for example
744		# todo: this is kind of ugly, and a better fix may be possible
745		dataset_extension = dataset.get_extension()
746		if not dataset_extension:
747			# DataSet results file does not exist or has no extension, use expected extension
748			if hasattr(dataset, "extension"):
749				dataset_extension = dataset.extension
750			else:
751				# No known DataSet extension; cannot determine if map_item method compatible
752				return False
753
754		return hasattr(cls, "map_item") and cls.extension == dataset_extension
755
756	@classmethod
757	def get_mapped_item(cls, item):
758		"""
759		Get the mapped item using a processors map_item method.
760
761		Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
762		"""
763		try:
764			mapped_item = cls.map_item(item)
765		except (KeyError, IndexError) as e:
766			raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
767
768		if not mapped_item:
769			raise MapItemException("Unable to map item!")
770
771		return mapped_item
772
773	@classmethod
774	def is_filter(cls):
775		"""
776		Is this processor a filter?
777
778		Filters do not produce their own dataset but replace the source_dataset dataset
779		instead.
780
781		:todo: Make this a bit more robust than sniffing the processor category
782		:return bool:
783		"""
784		return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()
785
786	@classmethod
787	def get_options(cls, parent_dataset=None, user=None):
788		"""
789		Get processor options
790
791		This method by default returns the class's "options" attribute, or an
792		empty dictionary. It can be redefined by processors that need more
793		fine-grained options, e.g. in cases where the availability of options
794		is partially determined by the parent dataset's parameters.
795
796		:param DataSet parent_dataset:  An object representing the dataset that
797		  the processor would be run on
798		:param User user:  Flask user the options will be displayed for, in
799		  case they are requested for display in the 4CAT web interface. This can
800		  be used to show some options only to privileges users.
801		"""
802		return cls.options if hasattr(cls, "options") else {}
803
804	@classmethod
805	def get_status(cls):
806		"""
807		Get processor status
808
809		:return list:	Statuses of this processor
810		"""
811		return cls.status if hasattr(cls, "status") else None
812
813	@classmethod
814	def is_top_dataset(cls):
815		"""
816		Confirm this is *not* a top dataset, but a processor.
817
818		Used for processor compatibility checks.
819
820		:return bool:  Always `False`, because this is a processor.
821		"""
822		return False
823
824	@classmethod
825	def is_from_collector(cls):
826		"""
827		Check if this processor is one that collects data, i.e. a search or
828		import worker.
829
830		:return bool:
831		"""
832		return cls.type.endswith("-search") or cls.type.endswith("-import")
833
834	@classmethod
835	def get_extension(self, parent_dataset=None):
836		"""
837		Return the extension of the processor's dataset
838
839		Used for processor compatibility checks.
840
841		:param DataSet parent_dataset:  An object representing the dataset that
842		  the processor would be run on
843		:return str|None:  Dataset extension (without leading `.`) or `None`.
844		"""
845		if self.is_filter():
846			if parent_dataset is not None:
847				# Filters should use the same extension as the parent dataset
848				return parent_dataset.get_extension()
849			else:
850				# No dataset provided, unable to determine extension of parent dataset
851				# if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
852				return None
853		elif self.extension:
854			# Use explicitly defined extension in class (Processor class defaults to "csv")
855			return self.extension
856		else:
857			# A non filter processor updated the base Processor extension to None/False?
858			return None
859
860	@classmethod
861	def is_rankable(cls, multiple_items=True):
862		"""
863		Used for processor compatibility
864
865		:param bool multiple_items:  Consider datasets with multiple items per
866		  item (e.g. word_1, word_2, etc)? Included for compatibility
867		"""
868		return False
869
870	@classmethod
871	def exclude_followup_processors(cls, processor_type=None):
872		"""
873        Used for processor compatibility
874
875        To be defined by the child processor if it should exclude certain follow-up processors.
876        e.g.:
877
878        def exclude_followup_processors(cls, processor_type):
879			if processor_type in ["undesirable-followup-processor"]:
880				return True
881			return False
882
883        :param str processor_type:  Processor type to exclude
884        :return bool:  True if processor should be excluded, False otherwise
885        """
886		return False
887
888	@abc.abstractmethod
889	def process(self):
890		"""
891		Process data
892
893		To be defined by the child processor.
894		"""
895		pass
896
897	@staticmethod
898	def is_4cat_processor():
899		"""
900		Is this a 4CAT processor?
901
902		This is used to determine whether a class is a 4CAT
903		processor.
904		
905		:return:  True
906		"""
907		return True

Abstract processor class

A processor takes a finished dataset as input and processes its result in some way, with another dataset set as output. The input thus is a file, and the output (usually) as well. In other words, the result of a processor can be used as input for another processor (though whether and when this is useful is another question).

To determine whether a processor can process a given dataset, you can define a is_compatible_with(FourcatModule module=None, str user=None):) -> bool class method which takes a dataset as argument and returns a bool that determines if this processor is considered compatible with that dataset. For example:


@classmethod def is_compatible_with(cls, module=None, user=None): return module.type == "linguistic-features"

db = None
job = None
dataset = None
owner = None
source_dataset = None
source_file = None
description = 'No description available'
category = 'Other'
extension = 'csv'
config = None
is_running_in_preset = False
is_hidden = False
filepath = None
def work(self):
 94	def work(self):
 95		"""
 96		Process a dataset
 97
 98		Loads dataset metadata, sets up the scaffolding for performing some kind
 99		of processing on that dataset, and then processes it. Afterwards, clean
100		up.
101		"""
102		try:
103			# a dataset can have multiple owners, but the creator is the user
104			# that actually queued the processor, so their config is relevant
105			self.dataset = DataSet(key=self.job.data["remote_id"], db=self.db, modules=self.modules)
106			self.owner = self.dataset.creator
107		except DataSetException as e:
108			# query has been deleted in the meantime. finish without error,
109			# as deleting it will have been a conscious choice by a user
110			self.job.finish()
111			return
112
113		# set up config reader using the worker's DB connection and the dataset
114		# creator. This ensures that if a value has been overriden for the owner,
115		# the overridden value is used instead.
116		config.with_db(self.db)
117		self.config = ConfigWrapper(config=config, user=User.get_by_name(self.db, self.owner))
118
119		if self.dataset.data.get("key_parent", None):
120			# search workers never have parents (for now), so we don't need to
121			# find out what the source_dataset dataset is if it's a search worker
122			try:
123				self.source_dataset = self.dataset.get_parent()
124
125				# for presets, transparently use the *top* dataset as a source_dataset
126				# since that is where any underlying processors should get
127				# their data from. However, this should only be done as long as the
128				# preset is not finished yet, because after that there may be processors
129				# that run on the final preset result
130				while self.source_dataset.type.startswith("preset-") and not self.source_dataset.is_finished():
131					self.is_running_in_preset = True
132					self.source_dataset = self.source_dataset.get_parent()
133					if self.source_dataset is None:
134						# this means there is no dataset that is *not* a preset anywhere
135						# above this dataset. This should never occur, but if it does, we
136						# cannot continue
137						self.log.error("Processor preset %s for dataset %s cannot find non-preset parent dataset",
138									   (self.type, self.dataset.key))
139						self.job.finish()
140						return
141
142			except DataSetException:
143				# we need to know what the source_dataset dataset was to properly handle the
144				# analysis
145				self.log.warning("Processor %s queued for orphan dataset %s: cannot run, cancelling job" % (
146					self.type, self.dataset.key))
147				self.job.finish()
148				return
149
150			if not self.source_dataset.is_finished() and not self.is_running_in_preset:
151				# not finished yet - retry after a while
152				# exception for presets, since these *should* be unfinished
153				# until underlying processors are done
154				self.job.release(delay=30)
155				return
156
157			self.source_file = self.source_dataset.get_results_path()
158			if not self.source_file.exists():
159				self.dataset.update_status("Finished, no input data found.")
160
161		self.log.info("Running processor %s on dataset %s" % (self.type, self.job.data["remote_id"]))
162
163		processor_name = self.title if hasattr(self, "title") else self.type
164		self.dataset.clear_log()
165		self.dataset.log("Processing '%s' started for dataset %s" % (processor_name, self.dataset.key))
166
167		# start log file
168		self.dataset.update_status("Processing data")
169		self.dataset.update_version(get_software_commit(self))
170
171		# get parameters
172		# if possible, fill defaults where parameters are not provided
173		given_parameters = self.dataset.parameters.copy()
174		all_parameters = self.get_options(self.dataset)
175		self.parameters = {
176			param: given_parameters.get(param, all_parameters.get(param, {}).get("default"))
177			for param in [*all_parameters.keys(), *given_parameters.keys()]
178		}
179
180		# now the parameters have been loaded into memory, clear any sensitive
181		# ones. This has a side-effect that a processor may not run again
182		# without starting from scratch, but this is the price of progress
183		options = self.get_options(self.dataset.get_parent())
184		for option, option_settings in options.items():
185			if option_settings.get("sensitive"):
186				self.dataset.delete_parameter(option)
187
188		if self.interrupted:
189			self.dataset.log("Processing interrupted, trying again later")
190			return self.abort()
191
192		if not self.dataset.is_finished():
193			try:
194				self.process()
195				self.after_process()
196			except WorkerInterruptedException as e:
197				self.dataset.log("Processing interrupted (%s), trying again later" % str(e))
198				self.abort()
199			except Exception as e:
200				self.dataset.log("Processor crashed (%s), trying again later" % str(e))
201				frames = traceback.extract_tb(e.__traceback__)
202				last_frame = frames[-1]
203				frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames[1:]]
204				location = "->".join(frames)
205
206				# Not all datasets have source_dataset keys
207				if len(self.dataset.get_genealogy()) > 1:
208					parent_key = " (via " + self.dataset.get_genealogy()[0].key + ")"
209				else:
210					parent_key = ""
211
212				# remove any result files that have been created so far
213				self.remove_files()
214
215				raise ProcessorException("Processor %s raised %s while processing dataset %s%s in %s:\n   %s\n" % (
216				self.type, e.__class__.__name__, self.dataset.key, parent_key, location, str(e)), frame=last_frame)
217		else:
218			# dataset already finished, job shouldn't be open anymore
219			self.log.warning("Job %s/%s was queued for a dataset already marked as finished, deleting..." % (self.job.data["jobtype"], self.job.data["remote_id"]))
220			self.job.finish()

Process a dataset

Loads dataset metadata, sets up the scaffolding for performing some kind of processing on that dataset, and then processes it. Afterwards, clean up.

def after_process(self):
223	def after_process(self):
224		"""
225		Run after processing the dataset
226
227		This method cleans up temporary files, and if needed, handles logistics
228		concerning the result file, e.g. running a pre-defined processor on the
229		result, copying it to another dataset, and so on.
230		"""
231		if self.dataset.data["num_rows"] > 0:
232			self.dataset.update_status("Dataset completed.")
233
234		if not self.dataset.is_finished():
235			self.dataset.finish()
236
237		self.dataset.remove_staging_areas()
238
239		# see if we have anything else lined up to run next
240		for next in self.parameters.get("next", []):
241			can_run_next = True
242			next_parameters = next.get("parameters", {})
243			next_type = next.get("type", "")
244			try:
245				available_processors = self.dataset.get_available_processors(user=self.dataset.creator)
246			except ValueError:
247				self.log.info("Trying to queue next processor, but parent dataset no longer exists, halting")
248				break
249
250			# run it only if the post-processor is actually available for this query
251			if self.dataset.data["num_rows"] <= 0:
252				can_run_next = False
253				self.log.info("Not running follow-up processor of type %s for dataset %s, no input data for follow-up" % (next_type, self.dataset.key))
254
255			elif next_type in available_processors:
256				next_analysis = DataSet(
257					parameters=next_parameters,
258					type=next_type,
259					db=self.db,
260					parent=self.dataset.key,
261					extension=available_processors[next_type].extension,
262					is_private=self.dataset.is_private,
263					owner=self.dataset.creator,
264					modules=self.modules
265				)
266				self.queue.add_job(next_type, remote_id=next_analysis.key)
267			else:
268				can_run_next = False
269				self.log.warning("Dataset %s (of type %s) wants to run processor %s next, but it is incompatible" % (self.dataset.key, self.type, next_type))
270
271			if not can_run_next:
272				# We are unable to continue the chain of processors, so we check to see if we are attaching to a parent
273				# preset; this allows the parent (for example a preset) to be finished and any successful processors displayed
274				if "attach_to" in self.parameters:
275					# Probably should not happen, but for some reason a mid processor has been designated as the processor
276					# the parent should attach to
277					pass
278				else:
279					# Check for "attach_to" parameter in descendents
280					while True:
281						if "attach_to" in next_parameters:
282							self.parameters["attach_to"] = next_parameters["attach_to"]
283							break
284						else:
285							if "next" in next_parameters:
286								next_parameters = next_parameters["next"][0]["parameters"]
287							else:
288								# No more descendents
289								# Should not happen; we cannot find the source dataset
290								self.log.warning("Cannot find preset's source dataset for dataset %s" % self.dataset.key)
291								break
292
293		# see if we need to register the result somewhere
294		if "copy_to" in self.parameters:
295			# copy the results to an arbitrary place that was passed
296			if self.dataset.get_results_path().exists():
297				shutil.copyfile(str(self.dataset.get_results_path()), self.parameters["copy_to"])
298			else:
299				# if copy_to was passed, that means it's important that this
300				# file exists somewhere, so we create it as an empty file
301				with open(self.parameters["copy_to"], "w") as empty_file:
302					empty_file.write("")
303
304		# see if this query chain is to be attached to another query
305		# if so, the full genealogy of this query (minus the original dataset)
306		# is attached to the given query - this is mostly useful for presets,
307		# where a chain of processors can be marked as 'underlying' a preset
308		if "attach_to" in self.parameters:
309			try:
310				# copy metadata and results to the surrogate
311				surrogate = DataSet(key=self.parameters["attach_to"], db=self.db)
312
313				if self.dataset.get_results_path().exists():
314					# Update the surrogate's results file suffix to match this dataset's suffix
315					surrogate.data["result_file"] = surrogate.get_results_path().with_suffix(self.dataset.get_results_path().suffix)
316					shutil.copyfile(str(self.dataset.get_results_path()), str(surrogate.get_results_path()))
317
318				try:
319					surrogate.finish(self.dataset.data["num_rows"])
320				except RuntimeError:
321					# already finished, could happen (though it shouldn't)
322					pass
323
324				surrogate.update_status(self.dataset.get_status())
325
326			except ValueError:
327				# dataset with key to attach to doesn't exist...
328				self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist)" % (
329				self.dataset.key, self.parameters["attach_to"]))
330
331		self.job.finish()
332
333		if config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
334			owner = self.dataset.get_parameters().get("email-complete", False)
335			# Check that username is email address
336			if re.match(r"[^@]+\@.*?\.[a-zA-Z]+", owner):
337				from email.mime.multipart import MIMEMultipart
338				from email.mime.text import MIMEText
339				from smtplib import SMTPException
340				import socket
341				import html2text
342
343				self.log.debug("Sending email to %s" % owner)
344				dataset_url = ('https://' if config.get('flask.https') else 'http://') + config.get('flask.server_name') + '/results/' + self.dataset.key
345				sender = config.get('mail.noreply')
346				message = MIMEMultipart("alternative")
347				message["From"] = sender
348				message["To"] = owner
349				message["Subject"] = "4CAT dataset completed: %s - %s" % (self.dataset.type, self.dataset.get_label())
350				mail = """
351					<p>Hello %s,</p>
352					<p>4CAT has finished collecting your %s dataset labeled: %s</p>
353					<p>You can view your dataset via the following link:</p>
354					<p><a href="%s">%s</a></p> 
355					<p>Sincerely,</p>
356					<p>Your friendly neighborhood 4CAT admin</p>
357					""" % (owner, self.dataset.type, self.dataset.get_label(), dataset_url, dataset_url)
358				html_parser = html2text.HTML2Text()
359				message.attach(MIMEText(html_parser.handle(mail), "plain"))
360				message.attach(MIMEText(mail, "html"))
361				try:
362					send_email([owner], message)
363				except (SMTPException, ConnectionRefusedError, socket.timeout) as e:
364					self.log.error("Error sending email to %s" % owner)

Run after processing the dataset

This method cleans up temporary files, and if needed, handles logistics concerning the result file, e.g. running a pre-defined processor on the result, copying it to another dataset, and so on.

def remove_files(self):
366	def remove_files(self):
367		"""
368		Clean up result files and any staging files for processor to be attempted
369		later if desired.
370		"""
371		# Remove the results file that was created
372		if self.dataset.get_results_path().exists():
373			self.dataset.get_results_path().unlink()
374		if self.dataset.get_results_folder_path().exists():
375			shutil.rmtree(self.dataset.get_results_folder_path())
376
377		# Remove any staging areas with temporary data
378		self.dataset.remove_staging_areas()

Clean up result files and any staging files for processor to be attempted later if desired.

def abort(self):
380	def abort(self):
381		"""
382		Abort dataset creation and clean up so it may be attempted again later
383		"""
384		# remove any result files that have been created so far
385		self.remove_files()
386
387		# we release instead of finish, since interrupting is just that - the
388		# job should resume at a later point. Delay resuming by 10 seconds to
389		# give 4CAT the time to do whatever it wants (though usually this isn't
390		# needed since restarting also stops the spawning of new workers)
391		if self.interrupted == self.INTERRUPT_RETRY:
392			# retry later - wait at least 10 seconds to give the backend time to shut down
393			self.job.release(delay=10)
394		elif self.interrupted == self.INTERRUPT_CANCEL:
395			# cancel job
396			self.job.finish()

Abort dataset creation and clean up so it may be attempted again later

def add_field_to_parent(self, field_name, new_data, which_parent=None, update_existing=False):
398	def add_field_to_parent(self, field_name, new_data, which_parent=source_dataset, update_existing=False):
399		"""
400		This function adds a new field to the parent dataset. Expects a list of data points, one for each item
401		in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used
402		to overwrite an existing field.
403
404		TODO: could be improved by accepting different types of data depending on csv or ndjson.
405
406		:param str field_name: 	name of the desired
407		:param List new_data: 	List of data to be added to parent dataset
408		:param DataSet which_parent: 	DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent())
409		:param bool update_existing: 	False (default) will raise an error if the field_name already exists
410										True will allow updating existing data
411		"""
412		if len(new_data) < 1:
413			# no data
414			raise ProcessorException('No data provided')
415
416		if not hasattr(self, "source_dataset") and which_parent is not None:
417			# no source to update
418			raise ProcessorException('No source dataset to update')
419
420		# Get the source file data path
421		parent_path = which_parent.get_results_path()
422
423		if len(new_data) != which_parent.num_rows:
424			raise ProcessorException('Must have new data point for each record: parent dataset: %i, new data points: %i' % (which_parent.num_rows, len(new_data)))
425
426		self.dataset.update_status("Adding new field %s to the source file" % field_name)
427
428		# Get a temporary path where we can store the data
429		tmp_path = self.dataset.get_staging_area()
430		tmp_file_path = tmp_path.joinpath(parent_path.name)
431
432		# go through items one by one, optionally mapping them
433		if parent_path.suffix.lower() == ".csv":
434			# Get field names
435			fieldnames = which_parent.get_columns()
436			if not update_existing and field_name in fieldnames:
437				raise ProcessorException('field_name %s already exists!' % field_name)
438			fieldnames.append(field_name)
439
440			# Iterate through the original dataset and add values to a new column
441			self.dataset.update_status("Writing new source file with %s." % field_name)
442			with tmp_file_path.open("w", encoding="utf-8", newline="") as output:
443				writer = csv.DictWriter(output, fieldnames=fieldnames)
444				writer.writeheader()
445
446				for count, post in enumerate(which_parent.iterate_items(self)):
447					# stop processing if worker has been asked to stop
448					if self.interrupted:
449						raise ProcessorInterruptedException("Interrupted while writing CSV file")
450
451					post.original[field_name] = new_data[count]
452					writer.writerow(post.original)
453
454		elif parent_path.suffix.lower() == ".ndjson":
455			# JSON cannot encode sets
456			if type(new_data[0]) is set:
457				# could check each if type(datapoint) is set, but that could be extensive...
458				new_data = [list(datapoint) for datapoint in new_data]
459
460			with tmp_file_path.open("w", encoding="utf-8", newline="") as output:
461				for count, post in enumerate(which_parent.iterate_items(self)):
462					# stop processing if worker has been asked to stop
463					if self.interrupted:
464						raise ProcessorInterruptedException("Interrupted while writing NDJSON file")
465
466					if not update_existing and field_name in post.original.keys():
467						raise ProcessorException('field_name %s already exists!' % field_name)
468
469					# Update data
470					post.original[field_name] = new_data[count]
471
472					output.write(json.dumps(post.original) + "\n")
473		else:
474			raise NotImplementedError("Cannot iterate through %s file" % parent_path.suffix)
475
476		# Replace the source file path with the new file
477		shutil.copy(str(tmp_file_path), str(parent_path))
478
479		# delete temporary files and folder
480		shutil.rmtree(tmp_path)
481
482		self.dataset.update_status("Parent dataset updated.")

This function adds a new field to the parent dataset. Expects a list of data points, one for each item in the parent dataset. Processes csv and ndjson. If update_existing is set to True, this can be used to overwrite an existing field.

TODO: could be improved by accepting different types of data depending on csv or ndjson.

Parameters
  • str field_name: name of the desired
  • List new_data: List of data to be added to parent dataset
  • DataSet which_parent: DataSet to be updated (e.g., self.source_dataset, self.dataset.get_parent(), self.dataset.top_parent())
  • bool update_existing: False (default) will raise an error if the field_name already exists True will allow updating existing data
def iterate_archive_contents( self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
484	def iterate_archive_contents(self, path, staging_area=None, immediately_delete=True, filename_filter=[]):
485		"""
486		A generator that iterates through files in an archive
487
488		With every iteration, the processor's 'interrupted' flag is checked,
489		and if set a ProcessorInterruptedException is raised, which by default
490		is caught and subsequently stops execution gracefully.
491
492		Files are temporarily unzipped and deleted after use.
493
494		:param Path path: 	Path to zip file to read
495		:param Path staging_area:  Where to store the files while they're
496		  being worked with. If omitted, a temporary folder is created and
497		  deleted after use
498		:param bool immediately_delete:  Temporary files are removed after yielded;
499		  False keeps files until the staging_area is removed (usually during processor
500		  cleanup)
501		:param list filename_filter:  Whitelist of filenames to iterate.
502		Other files will be ignored. If empty, do not ignore anything.
503		:return:  An iterator with a Path item for each file
504		"""
505
506		if not path.exists():
507			return
508
509		if not staging_area:
510			staging_area = self.dataset.get_staging_area()
511
512		if not staging_area.exists() or not staging_area.is_dir():
513			raise RuntimeError("Staging area %s is not a valid folder")
514
515		with zipfile.ZipFile(path, "r") as archive_file:
516			archive_contents = sorted(archive_file.namelist())
517
518			for archived_file in archive_contents:
519				if filename_filter and archived_file not in filename_filter:
520					continue
521
522				info = archive_file.getinfo(archived_file)
523				if info.is_dir():
524					continue
525
526				if self.interrupted:
527					raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
528
529				temp_file = staging_area.joinpath(archived_file)
530				archive_file.extract(archived_file, staging_area)
531
532				yield temp_file
533				if immediately_delete:
534					temp_file.unlink()

A generator that iterates through files in an archive

With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.

Files are temporarily unzipped and deleted after use.

Parameters
  • Path path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
  • bool immediately_delete: Temporary files are removed after yielded; False keeps files until the staging_area is removed (usually during processor cleanup)
  • list filename_filter: Whitelist of filenames to iterate. Other files will be ignored. If empty, do not ignore anything.
Returns

An iterator with a Path item for each file

def unpack_archive_contents(self, path, staging_area=None):
536	def unpack_archive_contents(self, path, staging_area=None):
537		"""
538		Unpack all files in an archive to a staging area
539
540		With every iteration, the processor's 'interrupted' flag is checked,
541		and if set a ProcessorInterruptedException is raised, which by default
542		is caught and subsequently stops execution gracefully.
543
544		Files are unzipped to a staging area. The staging area is *not*
545		cleaned up automatically.
546
547		:param Path path: 	Path to zip file to read
548		:param Path staging_area:  Where to store the files while they're
549		  being worked with. If omitted, a temporary folder is created and
550		  deleted after use
551		:param int max_number_files:  Maximum number of files to unpack. If None, all files unpacked
552		:return Path:  A path to the staging area
553		"""
554
555		if not path.exists():
556			return
557
558		if not staging_area:
559			staging_area = self.dataset.get_staging_area()
560
561		if not staging_area.exists() or not staging_area.is_dir():
562			raise RuntimeError("Staging area %s is not a valid folder")
563
564		paths = []
565		with zipfile.ZipFile(path, "r") as archive_file:
566			archive_contents = sorted(archive_file.namelist())
567
568			for archived_file in archive_contents:
569				if self.interrupted:
570					raise ProcessorInterruptedException("Interrupted while iterating zip file contents")
571
572				file_name = archived_file.split("/")[-1]
573				temp_file = staging_area.joinpath(file_name)
574				archive_file.extract(archived_file, staging_area)
575				paths.append(temp_file)
576
577		return staging_area

Unpack all files in an archive to a staging area

With every iteration, the processor's 'interrupted' flag is checked, and if set a ProcessorInterruptedException is raised, which by default is caught and subsequently stops execution gracefully.

Files are unzipped to a staging area. The staging area is not cleaned up automatically.

Parameters
  • Path path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created and deleted after use
  • int max_number_files: Maximum number of files to unpack. If None, all files unpacked
Returns

A path to the staging area

def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
579	def extract_archived_file_by_name(self, filename, archive_path, staging_area=None):
580		"""
581		Extract a file from an archive by name
582
583		:param str filename:  Name of file to extract
584		:param Path archive_path:  Path to zip file to read
585		:param Path staging_area:  Where to store the files while they're
586		  		being worked with. If omitted, a temporary folder is created
587		:return Path:  A path to the extracted file
588		"""
589		if not archive_path.exists():
590			return
591
592		if not staging_area:
593			staging_area = self.dataset.get_staging_area()
594
595		if not staging_area.exists() or not staging_area.is_dir():
596			raise RuntimeError("Staging area %s is not a valid folder")
597
598		with zipfile.ZipFile(archive_path, "r") as archive_file:
599			if filename not in archive_file.namelist():
600				raise FileNotFoundError("File %s not found in archive %s" % (filename, archive_path))
601			else:
602				archive_file.extract(filename, staging_area)
603				return staging_area.joinpath(filename)

Extract a file from an archive by name

Parameters
  • str filename: Name of file to extract
  • Path archive_path: Path to zip file to read
  • Path staging_area: Where to store the files while they're being worked with. If omitted, a temporary folder is created
Returns

A path to the extracted file

def write_csv_items_and_finish(self, data):
605	def write_csv_items_and_finish(self, data):
606		"""
607		Write data as csv to results file and finish dataset
608
609		Determines result file path using dataset's path determination helper
610		methods. After writing results, the dataset is marked finished. Will
611		raise a ProcessorInterruptedException if the interrupted flag for this
612		processor is set while iterating.
613
614		:param data: A list or tuple of dictionaries, all with the same keys
615		"""
616		if not (isinstance(data, typing.List) or isinstance(data, typing.Tuple) or callable(data)) or isinstance(data, str):
617			raise TypeError("write_csv_items requires a list or tuple of dictionaries as argument (%s given)" % type(data))
618
619		if not data:
620			raise ValueError("write_csv_items requires a dictionary with at least one item")
621
622		self.dataset.update_status("Writing results file")
623		writer = False
624		with self.dataset.get_results_path().open("w", encoding="utf-8", newline='') as results:
625			for row in data:
626				if self.interrupted:
627					raise ProcessorInterruptedException("Interrupted while writing results file")
628
629				row = remove_nuls(row)
630				if not writer:
631					writer = csv.DictWriter(results, fieldnames=row.keys())
632					writer.writeheader()
633
634				writer.writerow(row)
635
636		self.dataset.update_status("Finished")
637		self.dataset.finish(len(data))

Write data as csv to results file and finish dataset

Determines result file path using dataset's path determination helper methods. After writing results, the dataset is marked finished. Will raise a ProcessorInterruptedException if the interrupted flag for this processor is set while iterating.

Parameters
  • data: A list or tuple of dictionaries, all with the same keys
def write_archive_and_finish(self, files, num_items=None, compression=0, finish=True):
639	def write_archive_and_finish(self, files, num_items=None, compression=zipfile.ZIP_STORED, finish=True):
640		"""
641		Archive a bunch of files into a zip archive and finish processing
642
643		:param list|Path files: If a list, all files will be added to the
644		  archive and deleted afterwards. If a folder, all files in the folder
645		  will be added and the folder will be deleted afterwards.
646		:param int num_items: Items in the dataset. If None, the amount of
647		  files added to the archive will be used.
648		:param int compression:  Type of compression to use. By default, files
649		  are not compressed, to speed up unarchiving.
650		:param bool finish:  Finish the dataset/job afterwards or not?
651		"""
652		is_folder = False
653		if issubclass(type(files), PurePath):
654			is_folder = files
655			if not files.exists() or not files.is_dir():
656				raise RuntimeError("Folder %s is not a folder that can be archived" % files)
657
658			files = files.glob("*")
659
660		# create zip of archive and delete temporary files and folder
661		self.dataset.update_status("Compressing results into archive")
662		done = 0
663		with zipfile.ZipFile(self.dataset.get_results_path(), "w", compression=compression) as zip:
664			for output_path in files:
665				zip.write(output_path, output_path.name)
666				output_path.unlink()
667				done += 1
668
669		# delete temporary folder
670		if is_folder:
671			shutil.rmtree(is_folder)
672
673		self.dataset.update_status("Finished")
674		if num_items is None:
675			num_items = done
676
677		if finish:
678			self.dataset.finish(num_items)

Archive a bunch of files into a zip archive and finish processing

Parameters
  • list|Path files: If a list, all files will be added to the archive and deleted afterwards. If a folder, all files in the folder will be added and the folder will be deleted afterwards.
  • int num_items: Items in the dataset. If None, the amount of files added to the archive will be used.
  • int compression: Type of compression to use. By default, files are not compressed, to speed up unarchiving.
  • bool finish: Finish the dataset/job afterwards or not?
def create_standalone(self):
680	def create_standalone(self):
681		"""
682		Copy this dataset and make that copy standalone
683
684		This has the benefit of allowing for all analyses that can be run on
685		full datasets on the new, filtered copy as well.
686
687		:return DataSet:  The new standalone dataset
688		"""
689		top_parent = self.source_dataset
690
691		finished = self.dataset.check_dataset_finished()
692		if finished == 'empty':
693			# No data to process, so we can't create a standalone dataset
694			return
695		elif finished is None:
696			# I cannot think of why we would create a standalone from an unfinished dataset, but I'll leave it for now
697			pass
698
699		standalone = self.dataset.copy(shallow=False)
700		standalone.body_match = "(Filtered) " + top_parent.query
701		standalone.datasource = top_parent.parameters.get("datasource", "custom")
702
703		try:
704			standalone.board = top_parent.board
705		except AttributeError:
706			standalone.board = self.type
707
708		standalone.type = top_parent.type
709
710		standalone.detach()
711		standalone.delete_parameter("key_parent")
712
713		self.dataset.copied_to = standalone.key
714
715		# we don't need this file anymore - it has been copied to the new
716		# standalone dataset, and this one is not accessible via the interface
717		# except as a link to the copied standalone dataset
718		os.unlink(self.dataset.get_results_path())
719
720		# Copy the log
721		shutil.copy(self.dataset.get_log_path(), standalone.get_log_path())
722
723		return standalone

Copy this dataset and make that copy standalone

This has the benefit of allowing for all analyses that can be run on full datasets on the new, filtered copy as well.

Returns

The new standalone dataset

@classmethod
def map_item_method_available(cls, dataset):
725	@classmethod
726	def map_item_method_available(cls, dataset):
727		"""
728		Check if this processor can use map_item
729
730		Checks if map_item method exists and is compatible with dataset. If
731		dataset has a different extension than the default for this processor,
732		or if the dataset has no extension, this means we cannot be sure the
733		data is in the right format to be mapped, so `False` is returned in
734		that case even if a map_item() method is available.
735
736		:param BasicProcessor processor:	The BasicProcessor subclass object
737		with which to use map_item
738		:param DataSet dataset:				The DataSet object with which to
739		use map_item
740		"""
741		# only run item mapper if extension of processor == extension of
742		# data file, for the scenario where a csv file was uploaded and
743		# converted to an ndjson-based data source, for example
744		# todo: this is kind of ugly, and a better fix may be possible
745		dataset_extension = dataset.get_extension()
746		if not dataset_extension:
747			# DataSet results file does not exist or has no extension, use expected extension
748			if hasattr(dataset, "extension"):
749				dataset_extension = dataset.extension
750			else:
751				# No known DataSet extension; cannot determine if map_item method compatible
752				return False
753
754		return hasattr(cls, "map_item") and cls.extension == dataset_extension

Check if this processor can use map_item

Checks if map_item method exists and is compatible with dataset. If dataset has a different extension than the default for this processor, or if the dataset has no extension, this means we cannot be sure the data is in the right format to be mapped, so False is returned in that case even if a map_item() method is available.

Parameters
  • BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
  • DataSet dataset: The DataSet object with which to use map_item
@classmethod
def get_mapped_item(cls, item):
756	@classmethod
757	def get_mapped_item(cls, item):
758		"""
759		Get the mapped item using a processors map_item method.
760
761		Ensure map_item method is compatible with a dataset by checking map_item_method_available first.
762		"""
763		try:
764			mapped_item = cls.map_item(item)
765		except (KeyError, IndexError) as e:
766			raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")
767
768		if not mapped_item:
769			raise MapItemException("Unable to map item!")
770
771		return mapped_item

Get the mapped item using a processors map_item method.

Ensure map_item method is compatible with a dataset by checking map_item_method_available first.

@classmethod
def is_filter(cls):
773	@classmethod
774	def is_filter(cls):
775		"""
776		Is this processor a filter?
777
778		Filters do not produce their own dataset but replace the source_dataset dataset
779		instead.
780
781		:todo: Make this a bit more robust than sniffing the processor category
782		:return bool:
783		"""
784		return hasattr(cls, "category") and cls.category and "filter" in cls.category.lower()

Is this processor a filter?

Filters do not produce their own dataset but replace the source_dataset dataset instead.

:todo: Make this a bit more robust than sniffing the processor category

Returns
@classmethod
def get_options(cls, parent_dataset=None, user=None):
786	@classmethod
787	def get_options(cls, parent_dataset=None, user=None):
788		"""
789		Get processor options
790
791		This method by default returns the class's "options" attribute, or an
792		empty dictionary. It can be redefined by processors that need more
793		fine-grained options, e.g. in cases where the availability of options
794		is partially determined by the parent dataset's parameters.
795
796		:param DataSet parent_dataset:  An object representing the dataset that
797		  the processor would be run on
798		:param User user:  Flask user the options will be displayed for, in
799		  case they are requested for display in the 4CAT web interface. This can
800		  be used to show some options only to privileges users.
801		"""
802		return cls.options if hasattr(cls, "options") else {}

Get processor options

This method by default returns the class's "options" attribute, or an empty dictionary. It can be redefined by processors that need more fine-grained options, e.g. in cases where the availability of options is partially determined by the parent dataset's parameters.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
  • User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
@classmethod
def get_status(cls):
804	@classmethod
805	def get_status(cls):
806		"""
807		Get processor status
808
809		:return list:	Statuses of this processor
810		"""
811		return cls.status if hasattr(cls, "status") else None

Get processor status

Returns

Statuses of this processor

@classmethod
def is_top_dataset(cls):
813	@classmethod
814	def is_top_dataset(cls):
815		"""
816		Confirm this is *not* a top dataset, but a processor.
817
818		Used for processor compatibility checks.
819
820		:return bool:  Always `False`, because this is a processor.
821		"""
822		return False

Confirm this is not a top dataset, but a processor.

Used for processor compatibility checks.

Returns

Always False, because this is a processor.

@classmethod
def is_from_collector(cls):
824	@classmethod
825	def is_from_collector(cls):
826		"""
827		Check if this processor is one that collects data, i.e. a search or
828		import worker.
829
830		:return bool:
831		"""
832		return cls.type.endswith("-search") or cls.type.endswith("-import")

Check if this processor is one that collects data, i.e. a search or import worker.

Returns
@classmethod
def get_extension(self, parent_dataset=None):
834	@classmethod
835	def get_extension(self, parent_dataset=None):
836		"""
837		Return the extension of the processor's dataset
838
839		Used for processor compatibility checks.
840
841		:param DataSet parent_dataset:  An object representing the dataset that
842		  the processor would be run on
843		:return str|None:  Dataset extension (without leading `.`) or `None`.
844		"""
845		if self.is_filter():
846			if parent_dataset is not None:
847				# Filters should use the same extension as the parent dataset
848				return parent_dataset.get_extension()
849			else:
850				# No dataset provided, unable to determine extension of parent dataset
851				# if self.is_filter(): originally returned None, so maintaining that outcome. BUT we may want to fall back on the processor extension instead
852				return None
853		elif self.extension:
854			# Use explicitly defined extension in class (Processor class defaults to "csv")
855			return self.extension
856		else:
857			# A non filter processor updated the base Processor extension to None/False?
858			return None

Return the extension of the processor's dataset

Used for processor compatibility checks.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
Returns

Dataset extension (without leading .) or None.

@classmethod
def is_rankable(cls, multiple_items=True):
860	@classmethod
861	def is_rankable(cls, multiple_items=True):
862		"""
863		Used for processor compatibility
864
865		:param bool multiple_items:  Consider datasets with multiple items per
866		  item (e.g. word_1, word_2, etc)? Included for compatibility
867		"""
868		return False

Used for processor compatibility

Parameters
  • bool multiple_items: Consider datasets with multiple items per item (e.g. word_1, word_2, etc)? Included for compatibility
@classmethod
def exclude_followup_processors(cls, processor_type=None):
870	@classmethod
871	def exclude_followup_processors(cls, processor_type=None):
872		"""
873        Used for processor compatibility
874
875        To be defined by the child processor if it should exclude certain follow-up processors.
876        e.g.:
877
878        def exclude_followup_processors(cls, processor_type):
879			if processor_type in ["undesirable-followup-processor"]:
880				return True
881			return False
882
883        :param str processor_type:  Processor type to exclude
884        :return bool:  True if processor should be excluded, False otherwise
885        """
886		return False

Used for processor compatibility

To be defined by the child processor if it should exclude certain follow-up processors. e.g.:

def exclude_followup_processors(cls, processor_type): if processor_type in ["undesirable-followup-processor"]: return True return False

Parameters
  • str processor_type: Processor type to exclude
Returns

True if processor should be excluded, False otherwise

@abc.abstractmethod
def process(self):
888	@abc.abstractmethod
889	def process(self):
890		"""
891		Process data
892
893		To be defined by the child processor.
894		"""
895		pass

Process data

To be defined by the child processor.

@staticmethod
def is_4cat_processor():
897	@staticmethod
898	def is_4cat_processor():
899		"""
900		Is this a 4CAT processor?
901
902		This is used to determine whether a class is a 4CAT
903		processor.
904		
905		:return:  True
906		"""
907		return True

Is this a 4CAT processor?

This is used to determine whether a class is a 4CAT processor.

Returns

True