Edit on GitHub

backend.lib.worker

Worker class that all workers should implement

  1"""
  2Worker class that all workers should implement
  3"""
  4import traceback
  5import threading
  6import time
  7import abc
  8
  9from common.lib.queue import JobQueue
 10from common.lib.database import Database
 11from common.lib.exceptions import WorkerInterruptedException, ProcessorException
 12from common.config_manager import config, ConfigDummy
 13
 14
 15class BasicWorker(threading.Thread, metaclass=abc.ABCMeta):
 16	"""
 17	Abstract Worker class
 18
 19	This runs as a separate thread in which a worker method is executed. The
 20	work method can do whatever the worker needs to do - that part is to be
 21	implemented by a child class. This class provides scaffolding that makes
 22	sure crashes are caught properly and the relevant data is available to the
 23	worker code.
 24	"""
 25	#: Worker type - should match Job ID used when queuing jobs
 26	type = "misc"
 27
 28	#: Amount of workers of this type that can run in parallel. Be careful with
 29	#: this, because values higher than 1 will mean that e.g. API rate limits
 30	#: are easily violated.
 31	max_workers = 1
 32
 33	#: Flag value to indicate worker interruption type - not interrupted
 34	INTERRUPT_NONE = False
 35
 36	#: Flag value to indicate worker interruption type - interrupted, but can
 37	#: be retried
 38	INTERRUPT_RETRY = 1
 39
 40	#: Flag value to indicate worker interruption type - interrupted, but
 41	#: should be cancelled
 42	INTERRUPT_CANCEL = 2
 43
 44	#: Job queue that can be used to create or manipulate jobs
 45	queue = None
 46
 47	#: Job this worker is being run for
 48	job = None
 49
 50	#: Local configuration (used in processors)
 51	config = None
 52
 53	#: Logger object
 54	log = None
 55
 56	#: WorkerManager that manages this worker
 57	manager = None
 58
 59	#: Interrupt status, one of the `INTERRUPT_` class constants
 60	interrupted = False
 61
 62	#: Module index
 63	modules = None
 64
 65	#: Unix timestamp at which this worker was started
 66	init_time = 0
 67
 68	def __init__(self, logger, job, queue=None, manager=None, modules=None):
 69		"""
 70		Worker init
 71
 72		Set up object attributes, e.g. the worker queue and manager, and
 73		initialize a new database connection and connected job queue. We cannot
 74		share database connections between workers because they are not
 75		thread-safe.
 76
 77		:param Logger logger:  Logging interface
 78		:param Job job:  Job this worker is being run on
 79		:param JobQueue queue:  Job queue
 80		:param WorkerManager manager:  Scheduler instance that started this worker
 81		:param modules:  Module catalog
 82		"""
 83		super().__init__()
 84		self.name = self.type
 85		self.log = logger
 86		self.manager = manager
 87		self.job = job
 88		self.init_time = int(time.time())
 89		self.config = ConfigDummy()
 90
 91		# ModuleCollector cannot be easily imported into a worker because it itself
 92		# imports all workers, so you get a recursive import that Python (rightly) blocks
 93		# so for workers, modules data is passed as a constructor argument
 94		self.modules = modules
 95
 96		database_appname = "%s-%s" % (self.type, self.job.data["id"])
 97		self.db = Database(logger=self.log, appname=database_appname,
 98						   dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, host=config.DB_HOST, port=config.DB_PORT)
 99		self.queue = JobQueue(logger=self.log, database=self.db) if not queue else queue
100
101	def run(self):
102		"""
103		Run the worker
104
105		This calls the `work()` method, quite simply, but adds some
106		scaffolding to take care of any exceptions that occur during the
107		execution of the worker. The exception is then logged and the worker
108		is gracefully ended, but the job is *not* released to ensure that the
109		job is not run again immediately (which would probably instantly crash
110		in the exact same way).
111
112		You can configure the `WARN_SLACK_URL` configuration variable to make
113		reports of worker crashers be sent to a Slack channel, which is a good
114		way to monitor a running 4CAT instance!
115		"""
116		try:
117			self.work()
118		except WorkerInterruptedException:
119			self.log.info("Worker %s interrupted - cancelling." % self.type)
120
121			# interrupted - retry later or cancel job altogether?
122			if self.interrupted == self.INTERRUPT_RETRY:
123				self.job.release(delay=10)
124			elif self.interrupted == self.INTERRUPT_CANCEL:
125				self.job.finish()
126
127			self.abort()
128		except ProcessorException as e:
129			self.log.error(str(e), frame=e.frame)
130		except Exception as e:
131			frames = traceback.extract_tb(e.__traceback__)
132			frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames]
133			location = "->".join(frames)
134			self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location))
135
136		# Clean up after work successfully completed or terminates
137		self.clean_up()
138
139	def clean_up(self):
140		"""
141		Clean up after a processor runs successfully or results in error.
142		Workers should override this method to implement any procedures
143		to run to clean up a worker; by default this does nothing.
144		"""
145		pass
146
147	def abort(self):
148		"""
149		Called when the application shuts down
150
151		Can be used to stop loops, for looping workers. Workers should override
152		this method to implement any procedures to run to clean up a worker
153		when it is interrupted; by default this does nothing.
154		"""
155		pass
156
157	def request_interrupt(self, level=1):
158		"""
159		Set the 'abort requested' flag
160
161		Child workers should quit at their earliest convenience when this is
162		set. This can be done simply by checking the value of
163		`self.interrupted`.
164
165		:param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
166		  `self.INTERRUPT_CANCEL`.
167		"""
168		self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
169		self.interrupted = level
170
171	@abc.abstractmethod
172	def work(self):
173		"""
174		This is where the actual work happens
175
176		Whatever the worker is supposed to do, it should happen (or be
177		initiated from) this method. By default it does nothing, descending
178		classes should implement this method.
179		"""
180		pass
181
182	@staticmethod
183	def is_4cat_class():
184		"""
185		Is this a 4CAT class?
186
187		This is used to determine whether a class is a 4CAT worker or a
188		processor. This method should always return True for workers.
189
190		:return:  True
191		"""
192		return True
193	
194	@staticmethod
195	def is_4cat_processor():
196		"""
197		Is this a 4CAT processor?
198
199		This is used to determine whether a class is a 4CAT
200		processor.
201		
202		:return:  False
203		"""
204		return False
class BasicWorker(threading.Thread):
 16class BasicWorker(threading.Thread, metaclass=abc.ABCMeta):
 17	"""
 18	Abstract Worker class
 19
 20	This runs as a separate thread in which a worker method is executed. The
 21	work method can do whatever the worker needs to do - that part is to be
 22	implemented by a child class. This class provides scaffolding that makes
 23	sure crashes are caught properly and the relevant data is available to the
 24	worker code.
 25	"""
 26	#: Worker type - should match Job ID used when queuing jobs
 27	type = "misc"
 28
 29	#: Amount of workers of this type that can run in parallel. Be careful with
 30	#: this, because values higher than 1 will mean that e.g. API rate limits
 31	#: are easily violated.
 32	max_workers = 1
 33
 34	#: Flag value to indicate worker interruption type - not interrupted
 35	INTERRUPT_NONE = False
 36
 37	#: Flag value to indicate worker interruption type - interrupted, but can
 38	#: be retried
 39	INTERRUPT_RETRY = 1
 40
 41	#: Flag value to indicate worker interruption type - interrupted, but
 42	#: should be cancelled
 43	INTERRUPT_CANCEL = 2
 44
 45	#: Job queue that can be used to create or manipulate jobs
 46	queue = None
 47
 48	#: Job this worker is being run for
 49	job = None
 50
 51	#: Local configuration (used in processors)
 52	config = None
 53
 54	#: Logger object
 55	log = None
 56
 57	#: WorkerManager that manages this worker
 58	manager = None
 59
 60	#: Interrupt status, one of the `INTERRUPT_` class constants
 61	interrupted = False
 62
 63	#: Module index
 64	modules = None
 65
 66	#: Unix timestamp at which this worker was started
 67	init_time = 0
 68
 69	def __init__(self, logger, job, queue=None, manager=None, modules=None):
 70		"""
 71		Worker init
 72
 73		Set up object attributes, e.g. the worker queue and manager, and
 74		initialize a new database connection and connected job queue. We cannot
 75		share database connections between workers because they are not
 76		thread-safe.
 77
 78		:param Logger logger:  Logging interface
 79		:param Job job:  Job this worker is being run on
 80		:param JobQueue queue:  Job queue
 81		:param WorkerManager manager:  Scheduler instance that started this worker
 82		:param modules:  Module catalog
 83		"""
 84		super().__init__()
 85		self.name = self.type
 86		self.log = logger
 87		self.manager = manager
 88		self.job = job
 89		self.init_time = int(time.time())
 90		self.config = ConfigDummy()
 91
 92		# ModuleCollector cannot be easily imported into a worker because it itself
 93		# imports all workers, so you get a recursive import that Python (rightly) blocks
 94		# so for workers, modules data is passed as a constructor argument
 95		self.modules = modules
 96
 97		database_appname = "%s-%s" % (self.type, self.job.data["id"])
 98		self.db = Database(logger=self.log, appname=database_appname,
 99						   dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, host=config.DB_HOST, port=config.DB_PORT)
100		self.queue = JobQueue(logger=self.log, database=self.db) if not queue else queue
101
102	def run(self):
103		"""
104		Run the worker
105
106		This calls the `work()` method, quite simply, but adds some
107		scaffolding to take care of any exceptions that occur during the
108		execution of the worker. The exception is then logged and the worker
109		is gracefully ended, but the job is *not* released to ensure that the
110		job is not run again immediately (which would probably instantly crash
111		in the exact same way).
112
113		You can configure the `WARN_SLACK_URL` configuration variable to make
114		reports of worker crashers be sent to a Slack channel, which is a good
115		way to monitor a running 4CAT instance!
116		"""
117		try:
118			self.work()
119		except WorkerInterruptedException:
120			self.log.info("Worker %s interrupted - cancelling." % self.type)
121
122			# interrupted - retry later or cancel job altogether?
123			if self.interrupted == self.INTERRUPT_RETRY:
124				self.job.release(delay=10)
125			elif self.interrupted == self.INTERRUPT_CANCEL:
126				self.job.finish()
127
128			self.abort()
129		except ProcessorException as e:
130			self.log.error(str(e), frame=e.frame)
131		except Exception as e:
132			frames = traceback.extract_tb(e.__traceback__)
133			frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames]
134			location = "->".join(frames)
135			self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location))
136
137		# Clean up after work successfully completed or terminates
138		self.clean_up()
139
140	def clean_up(self):
141		"""
142		Clean up after a processor runs successfully or results in error.
143		Workers should override this method to implement any procedures
144		to run to clean up a worker; by default this does nothing.
145		"""
146		pass
147
148	def abort(self):
149		"""
150		Called when the application shuts down
151
152		Can be used to stop loops, for looping workers. Workers should override
153		this method to implement any procedures to run to clean up a worker
154		when it is interrupted; by default this does nothing.
155		"""
156		pass
157
158	def request_interrupt(self, level=1):
159		"""
160		Set the 'abort requested' flag
161
162		Child workers should quit at their earliest convenience when this is
163		set. This can be done simply by checking the value of
164		`self.interrupted`.
165
166		:param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
167		  `self.INTERRUPT_CANCEL`.
168		"""
169		self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
170		self.interrupted = level
171
172	@abc.abstractmethod
173	def work(self):
174		"""
175		This is where the actual work happens
176
177		Whatever the worker is supposed to do, it should happen (or be
178		initiated from) this method. By default it does nothing, descending
179		classes should implement this method.
180		"""
181		pass
182
183	@staticmethod
184	def is_4cat_class():
185		"""
186		Is this a 4CAT class?
187
188		This is used to determine whether a class is a 4CAT worker or a
189		processor. This method should always return True for workers.
190
191		:return:  True
192		"""
193		return True
194	
195	@staticmethod
196	def is_4cat_processor():
197		"""
198		Is this a 4CAT processor?
199
200		This is used to determine whether a class is a 4CAT
201		processor.
202		
203		:return:  False
204		"""
205		return False

Abstract Worker class

This runs as a separate thread in which a worker method is executed. The work method can do whatever the worker needs to do - that part is to be implemented by a child class. This class provides scaffolding that makes sure crashes are caught properly and the relevant data is available to the worker code.

BasicWorker(logger, job, queue=None, manager=None, modules=None)
 69	def __init__(self, logger, job, queue=None, manager=None, modules=None):
 70		"""
 71		Worker init
 72
 73		Set up object attributes, e.g. the worker queue and manager, and
 74		initialize a new database connection and connected job queue. We cannot
 75		share database connections between workers because they are not
 76		thread-safe.
 77
 78		:param Logger logger:  Logging interface
 79		:param Job job:  Job this worker is being run on
 80		:param JobQueue queue:  Job queue
 81		:param WorkerManager manager:  Scheduler instance that started this worker
 82		:param modules:  Module catalog
 83		"""
 84		super().__init__()
 85		self.name = self.type
 86		self.log = logger
 87		self.manager = manager
 88		self.job = job
 89		self.init_time = int(time.time())
 90		self.config = ConfigDummy()
 91
 92		# ModuleCollector cannot be easily imported into a worker because it itself
 93		# imports all workers, so you get a recursive import that Python (rightly) blocks
 94		# so for workers, modules data is passed as a constructor argument
 95		self.modules = modules
 96
 97		database_appname = "%s-%s" % (self.type, self.job.data["id"])
 98		self.db = Database(logger=self.log, appname=database_appname,
 99						   dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, host=config.DB_HOST, port=config.DB_PORT)
100		self.queue = JobQueue(logger=self.log, database=self.db) if not queue else queue

Worker init

Set up object attributes, e.g. the worker queue and manager, and initialize a new database connection and connected job queue. We cannot share database connections between workers because they are not thread-safe.

Parameters
  • Logger logger: Logging interface
  • Job job: Job this worker is being run on
  • JobQueue queue: Job queue
  • WorkerManager manager: Scheduler instance that started this worker
  • modules: Module catalog
type = 'misc'
max_workers = 1
INTERRUPT_NONE = False
INTERRUPT_RETRY = 1
INTERRUPT_CANCEL = 2
queue = None
job = None
config = None
log = None
manager = None
interrupted = False
modules = None
init_time = 0
name
1146    @property
1147    def name(self):
1148        """A string used for identification purposes only.
1149
1150        It has no semantics. Multiple threads may be given the same name. The
1151        initial name is set by the constructor.
1152
1153        """
1154        assert self._initialized, "Thread.__init__() not called"
1155        return self._name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

db
def run(self):
102	def run(self):
103		"""
104		Run the worker
105
106		This calls the `work()` method, quite simply, but adds some
107		scaffolding to take care of any exceptions that occur during the
108		execution of the worker. The exception is then logged and the worker
109		is gracefully ended, but the job is *not* released to ensure that the
110		job is not run again immediately (which would probably instantly crash
111		in the exact same way).
112
113		You can configure the `WARN_SLACK_URL` configuration variable to make
114		reports of worker crashers be sent to a Slack channel, which is a good
115		way to monitor a running 4CAT instance!
116		"""
117		try:
118			self.work()
119		except WorkerInterruptedException:
120			self.log.info("Worker %s interrupted - cancelling." % self.type)
121
122			# interrupted - retry later or cancel job altogether?
123			if self.interrupted == self.INTERRUPT_RETRY:
124				self.job.release(delay=10)
125			elif self.interrupted == self.INTERRUPT_CANCEL:
126				self.job.finish()
127
128			self.abort()
129		except ProcessorException as e:
130			self.log.error(str(e), frame=e.frame)
131		except Exception as e:
132			frames = traceback.extract_tb(e.__traceback__)
133			frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames]
134			location = "->".join(frames)
135			self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location))
136
137		# Clean up after work successfully completed or terminates
138		self.clean_up()

Run the worker

This calls the work() method, quite simply, but adds some scaffolding to take care of any exceptions that occur during the execution of the worker. The exception is then logged and the worker is gracefully ended, but the job is not released to ensure that the job is not run again immediately (which would probably instantly crash in the exact same way).

You can configure the WARN_SLACK_URL configuration variable to make reports of worker crashers be sent to a Slack channel, which is a good way to monitor a running 4CAT instance!

def clean_up(self):
140	def clean_up(self):
141		"""
142		Clean up after a processor runs successfully or results in error.
143		Workers should override this method to implement any procedures
144		to run to clean up a worker; by default this does nothing.
145		"""
146		pass

Clean up after a processor runs successfully or results in error. Workers should override this method to implement any procedures to run to clean up a worker; by default this does nothing.

def abort(self):
148	def abort(self):
149		"""
150		Called when the application shuts down
151
152		Can be used to stop loops, for looping workers. Workers should override
153		this method to implement any procedures to run to clean up a worker
154		when it is interrupted; by default this does nothing.
155		"""
156		pass

Called when the application shuts down

Can be used to stop loops, for looping workers. Workers should override this method to implement any procedures to run to clean up a worker when it is interrupted; by default this does nothing.

def request_interrupt(self, level=1):
158	def request_interrupt(self, level=1):
159		"""
160		Set the 'abort requested' flag
161
162		Child workers should quit at their earliest convenience when this is
163		set. This can be done simply by checking the value of
164		`self.interrupted`.
165
166		:param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
167		  `self.INTERRUPT_CANCEL`.
168		"""
169		self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
170		self.interrupted = level

Set the 'abort requested' flag

Child workers should quit at their earliest convenience when this is set. This can be done simply by checking the value of self.interrupted.

Parameters
  • int level: Retry or cancel? Either self.INTERRUPT_RETRY or self.INTERRUPT_CANCEL.
@abc.abstractmethod
def work(self):
172	@abc.abstractmethod
173	def work(self):
174		"""
175		This is where the actual work happens
176
177		Whatever the worker is supposed to do, it should happen (or be
178		initiated from) this method. By default it does nothing, descending
179		classes should implement this method.
180		"""
181		pass

This is where the actual work happens

Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.

@staticmethod
def is_4cat_class():
183	@staticmethod
184	def is_4cat_class():
185		"""
186		Is this a 4CAT class?
187
188		This is used to determine whether a class is a 4CAT worker or a
189		processor. This method should always return True for workers.
190
191		:return:  True
192		"""
193		return True

Is this a 4CAT class?

This is used to determine whether a class is a 4CAT worker or a processor. This method should always return True for workers.

Returns

True

@staticmethod
def is_4cat_processor():
195	@staticmethod
196	def is_4cat_processor():
197		"""
198		Is this a 4CAT processor?
199
200		This is used to determine whether a class is a 4CAT
201		processor.
202		
203		:return:  False
204		"""
205		return False

Is this a 4CAT processor?

This is used to determine whether a class is a 4CAT processor.

Returns

False