Edit on GitHub

backend.lib.manager

The heart of the app - manages jobs and workers

  1"""
  2The heart of the app - manages jobs and workers
  3"""
  4import signal
  5import time
  6
  7from common.lib.module_loader import ModuleCollector
  8from common.lib.exceptions import JobClaimedException
  9
 10
 11class WorkerManager:
 12	"""
 13	Manages the job queue and worker pool
 14	"""
 15	queue = None
 16	db = None
 17	log = None
 18	modules = None
 19
 20	worker_pool = {}
 21	job_mapping = {}
 22	pool = []
 23	looping = True
 24
 25	def __init__(self, queue, database, logger, as_daemon=True):
 26		"""
 27		Initialize manager
 28
 29		:param queue:  Job queue
 30		:param database:  Database handler
 31		:param logger:  Logger object
 32		:param bool as_daemon:  Whether the manager is being run as a daemon
 33		"""
 34		self.queue = queue
 35		self.db = database
 36		self.log = logger
 37		self.modules = ModuleCollector(write_config=True)
 38
 39		if as_daemon:
 40			signal.signal(signal.SIGTERM, self.abort)
 41			signal.signal(signal.SIGINT, self.request_interrupt)
 42
 43		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 44		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 45		self.validate_datasources()
 46
 47		# queue jobs for workers that always need one
 48		for worker_name, worker in self.modules.workers.items():
 49			if hasattr(worker, "ensure_job"):
 50				self.queue.add_job(jobtype=worker_name, **worker.ensure_job)
 51
 52		self.log.info("4CAT Started")
 53
 54		# flush module collector log buffer
 55		# the logger is not available when this initialises
 56		# but it is now!
 57		if self.modules.log_buffer:
 58			self.log.warning(self.modules.log_buffer)
 59			self.modules.log_buffer = ""
 60
 61		# it's time
 62		self.loop()
 63
 64	def delegate(self):
 65		"""
 66		Delegate work
 67
 68		Checks for open jobs, and then passes those to dedicated workers, if
 69		slots are available for those workers.
 70		"""
 71		jobs = self.queue.get_all_jobs()
 72
 73		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 74		self.log.debug("Running workers: %i" % num_active)
 75
 76		# clean up workers that have finished processing
 77		for jobtype in self.worker_pool:
 78			all_workers = self.worker_pool[jobtype]
 79			for worker in all_workers:
 80				if not worker.is_alive():
 81					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 82					worker.join()
 83					self.worker_pool[jobtype].remove(worker)
 84
 85			del all_workers
 86
 87		# check if workers are available for unclaimed jobs
 88		for job in jobs:
 89			jobtype = job.data["jobtype"]
 90
 91			if jobtype in self.modules.workers:
 92				worker_class = self.modules.workers[jobtype]
 93				if jobtype not in self.worker_pool:
 94					self.worker_pool[jobtype] = []
 95
 96				# if a job is of a known type, and that job type has open
 97				# worker slots, start a new worker to run it
 98				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
 99					try:
100						job.claim()
101						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
102						worker.start()
103						self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}")
104						self.worker_pool[jobtype].append(worker)
105					except JobClaimedException:
106						# it's fine
107						pass
108			else:
109				self.log.error("Unknown job type: %s" % jobtype)
110
111		time.sleep(1)
112
113	def loop(self):
114		"""
115		Main loop
116
117		Constantly delegates work, until no longer looping, after which all
118		workers are asked to stop their work. Once that has happened, the loop
119		properly ends.
120		"""
121		while self.looping:
122			try:
123				self.delegate()
124			except KeyboardInterrupt:
125				self.looping = False
126
127		self.log.info("Telling all workers to stop doing whatever they're doing...")
128		# request shutdown from all workers except the API
129		# this allows us to use the API to figure out if a certain worker is
130		# hanging during shutdown, for example
131		for jobtype in self.worker_pool:
132			if jobtype == "api":
133				continue
134
135			for worker in self.worker_pool[jobtype]:
136				if hasattr(worker, "request_interrupt"):
137					worker.request_interrupt()
138				else:
139					worker.abort()
140
141		# wait for all workers that we just asked to quit to finish
142		self.log.info("Waiting for all workers to finish...")
143		for jobtype in self.worker_pool:
144			if jobtype == "api":
145				continue
146			for worker in self.worker_pool[jobtype]:
147				self.log.info("Waiting for worker %s..." % jobtype)
148				worker.join()
149
150		# shut down API last
151		for worker in self.worker_pool.get("api", []):
152			worker.request_interrupt()
153			worker.join()
154
155		# abort
156		time.sleep(1)
157		self.log.info("Bye!")
158
159	def validate_datasources(self):
160		"""
161		Validate data sources
162
163		Logs warnings if not all information is precent for the configured data
164		sources.
165		"""
166
167		for datasource in self.modules.datasources:
168			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
169				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
170
171			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource)
172
173	def abort(self, signal=None, stack=None):
174		"""
175		Stop looping the delegator, clean up, and prepare for shutdown
176		"""
177		self.log.info("Received SIGTERM")
178
179		# cancel all interruptible postgres queries
180		# this needs to be done before we stop looping since after that no new
181		# jobs will be claimed, and needs to be done here because the worker's
182		# own database connection is busy executing the query that it should
183		# cancel! so we can't use it to update the job and make it get claimed
184		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
185			# this will make all these jobs immediately claimable, i.e. queries
186			# will get cancelled asap
187			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
188			job.claim()
189			job.release(delay=0, claim_after=0)
190
191		# wait until all cancel jobs are completed
192		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
193			time.sleep(0.25)
194
195		# now stop looping (i.e. accepting new jobs)
196		self.looping = False
197
198	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
199		"""
200		Interrupt a job
201
202		This method can be called via e.g. the API, to interrupt a specific
203		job's worker. The worker can be targeted either with a Job object or
204		with a combination of job type and remote ID, since those uniquely
205		identify a job.
206
207		:param int interrupt_level:  Retry later or cancel?
208		:param Job job:  Job object to cancel worker for
209		:param str remote_id:  Remote ID for worker job to cancel
210		:param str jobtype:  Job type for worker job to cancel
211		"""
212
213		# find worker for given job
214		if job:
215			jobtype = job.data["jobtype"]
216
217		if jobtype not in self.worker_pool:
218			# no jobs of this type currently known
219			return
220
221		for worker in self.worker_pool[jobtype]:
222			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
223				# first cancel any interruptable queries for this job's worker
224				while True:
225					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
226					if not active_queries:
227						# all cancellation jobs have been run
228						break
229
230					for cancel_job in active_queries:
231						if cancel_job.is_claimed:
232							continue
233
234						# this will make the job be run asap
235						cancel_job.claim()
236						cancel_job.release(delay=0, claim_after=0)
237
238					# give the cancel job a moment to run
239					time.sleep(0.25)
240
241				# now all queries are interrupted, formally request an abort
242				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
243				worker.request_interrupt(interrupt_level)
244				return
class WorkerManager:
 12class WorkerManager:
 13	"""
 14	Manages the job queue and worker pool
 15	"""
 16	queue = None
 17	db = None
 18	log = None
 19	modules = None
 20
 21	worker_pool = {}
 22	job_mapping = {}
 23	pool = []
 24	looping = True
 25
 26	def __init__(self, queue, database, logger, as_daemon=True):
 27		"""
 28		Initialize manager
 29
 30		:param queue:  Job queue
 31		:param database:  Database handler
 32		:param logger:  Logger object
 33		:param bool as_daemon:  Whether the manager is being run as a daemon
 34		"""
 35		self.queue = queue
 36		self.db = database
 37		self.log = logger
 38		self.modules = ModuleCollector(write_config=True)
 39
 40		if as_daemon:
 41			signal.signal(signal.SIGTERM, self.abort)
 42			signal.signal(signal.SIGINT, self.request_interrupt)
 43
 44		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 45		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 46		self.validate_datasources()
 47
 48		# queue jobs for workers that always need one
 49		for worker_name, worker in self.modules.workers.items():
 50			if hasattr(worker, "ensure_job"):
 51				self.queue.add_job(jobtype=worker_name, **worker.ensure_job)
 52
 53		self.log.info("4CAT Started")
 54
 55		# flush module collector log buffer
 56		# the logger is not available when this initialises
 57		# but it is now!
 58		if self.modules.log_buffer:
 59			self.log.warning(self.modules.log_buffer)
 60			self.modules.log_buffer = ""
 61
 62		# it's time
 63		self.loop()
 64
 65	def delegate(self):
 66		"""
 67		Delegate work
 68
 69		Checks for open jobs, and then passes those to dedicated workers, if
 70		slots are available for those workers.
 71		"""
 72		jobs = self.queue.get_all_jobs()
 73
 74		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 75		self.log.debug("Running workers: %i" % num_active)
 76
 77		# clean up workers that have finished processing
 78		for jobtype in self.worker_pool:
 79			all_workers = self.worker_pool[jobtype]
 80			for worker in all_workers:
 81				if not worker.is_alive():
 82					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 83					worker.join()
 84					self.worker_pool[jobtype].remove(worker)
 85
 86			del all_workers
 87
 88		# check if workers are available for unclaimed jobs
 89		for job in jobs:
 90			jobtype = job.data["jobtype"]
 91
 92			if jobtype in self.modules.workers:
 93				worker_class = self.modules.workers[jobtype]
 94				if jobtype not in self.worker_pool:
 95					self.worker_pool[jobtype] = []
 96
 97				# if a job is of a known type, and that job type has open
 98				# worker slots, start a new worker to run it
 99				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
100					try:
101						job.claim()
102						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
103						worker.start()
104						self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}")
105						self.worker_pool[jobtype].append(worker)
106					except JobClaimedException:
107						# it's fine
108						pass
109			else:
110				self.log.error("Unknown job type: %s" % jobtype)
111
112		time.sleep(1)
113
114	def loop(self):
115		"""
116		Main loop
117
118		Constantly delegates work, until no longer looping, after which all
119		workers are asked to stop their work. Once that has happened, the loop
120		properly ends.
121		"""
122		while self.looping:
123			try:
124				self.delegate()
125			except KeyboardInterrupt:
126				self.looping = False
127
128		self.log.info("Telling all workers to stop doing whatever they're doing...")
129		# request shutdown from all workers except the API
130		# this allows us to use the API to figure out if a certain worker is
131		# hanging during shutdown, for example
132		for jobtype in self.worker_pool:
133			if jobtype == "api":
134				continue
135
136			for worker in self.worker_pool[jobtype]:
137				if hasattr(worker, "request_interrupt"):
138					worker.request_interrupt()
139				else:
140					worker.abort()
141
142		# wait for all workers that we just asked to quit to finish
143		self.log.info("Waiting for all workers to finish...")
144		for jobtype in self.worker_pool:
145			if jobtype == "api":
146				continue
147			for worker in self.worker_pool[jobtype]:
148				self.log.info("Waiting for worker %s..." % jobtype)
149				worker.join()
150
151		# shut down API last
152		for worker in self.worker_pool.get("api", []):
153			worker.request_interrupt()
154			worker.join()
155
156		# abort
157		time.sleep(1)
158		self.log.info("Bye!")
159
160	def validate_datasources(self):
161		"""
162		Validate data sources
163
164		Logs warnings if not all information is precent for the configured data
165		sources.
166		"""
167
168		for datasource in self.modules.datasources:
169			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
170				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
171
172			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource)
173
174	def abort(self, signal=None, stack=None):
175		"""
176		Stop looping the delegator, clean up, and prepare for shutdown
177		"""
178		self.log.info("Received SIGTERM")
179
180		# cancel all interruptible postgres queries
181		# this needs to be done before we stop looping since after that no new
182		# jobs will be claimed, and needs to be done here because the worker's
183		# own database connection is busy executing the query that it should
184		# cancel! so we can't use it to update the job and make it get claimed
185		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
186			# this will make all these jobs immediately claimable, i.e. queries
187			# will get cancelled asap
188			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
189			job.claim()
190			job.release(delay=0, claim_after=0)
191
192		# wait until all cancel jobs are completed
193		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
194			time.sleep(0.25)
195
196		# now stop looping (i.e. accepting new jobs)
197		self.looping = False
198
199	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
200		"""
201		Interrupt a job
202
203		This method can be called via e.g. the API, to interrupt a specific
204		job's worker. The worker can be targeted either with a Job object or
205		with a combination of job type and remote ID, since those uniquely
206		identify a job.
207
208		:param int interrupt_level:  Retry later or cancel?
209		:param Job job:  Job object to cancel worker for
210		:param str remote_id:  Remote ID for worker job to cancel
211		:param str jobtype:  Job type for worker job to cancel
212		"""
213
214		# find worker for given job
215		if job:
216			jobtype = job.data["jobtype"]
217
218		if jobtype not in self.worker_pool:
219			# no jobs of this type currently known
220			return
221
222		for worker in self.worker_pool[jobtype]:
223			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
224				# first cancel any interruptable queries for this job's worker
225				while True:
226					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
227					if not active_queries:
228						# all cancellation jobs have been run
229						break
230
231					for cancel_job in active_queries:
232						if cancel_job.is_claimed:
233							continue
234
235						# this will make the job be run asap
236						cancel_job.claim()
237						cancel_job.release(delay=0, claim_after=0)
238
239					# give the cancel job a moment to run
240					time.sleep(0.25)
241
242				# now all queries are interrupted, formally request an abort
243				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
244				worker.request_interrupt(interrupt_level)
245				return

Manages the job queue and worker pool

WorkerManager(queue, database, logger, as_daemon=True)
26	def __init__(self, queue, database, logger, as_daemon=True):
27		"""
28		Initialize manager
29
30		:param queue:  Job queue
31		:param database:  Database handler
32		:param logger:  Logger object
33		:param bool as_daemon:  Whether the manager is being run as a daemon
34		"""
35		self.queue = queue
36		self.db = database
37		self.log = logger
38		self.modules = ModuleCollector(write_config=True)
39
40		if as_daemon:
41			signal.signal(signal.SIGTERM, self.abort)
42			signal.signal(signal.SIGINT, self.request_interrupt)
43
44		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
45		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
46		self.validate_datasources()
47
48		# queue jobs for workers that always need one
49		for worker_name, worker in self.modules.workers.items():
50			if hasattr(worker, "ensure_job"):
51				self.queue.add_job(jobtype=worker_name, **worker.ensure_job)
52
53		self.log.info("4CAT Started")
54
55		# flush module collector log buffer
56		# the logger is not available when this initialises
57		# but it is now!
58		if self.modules.log_buffer:
59			self.log.warning(self.modules.log_buffer)
60			self.modules.log_buffer = ""
61
62		# it's time
63		self.loop()

Initialize manager

Parameters
  • queue: Job queue
  • database: Database handler
  • logger: Logger object
  • bool as_daemon: Whether the manager is being run as a daemon
queue = None
db = None
log = None
modules = None
worker_pool = {}
job_mapping = {}
pool = []
looping = True
def delegate(self):
 65	def delegate(self):
 66		"""
 67		Delegate work
 68
 69		Checks for open jobs, and then passes those to dedicated workers, if
 70		slots are available for those workers.
 71		"""
 72		jobs = self.queue.get_all_jobs()
 73
 74		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 75		self.log.debug("Running workers: %i" % num_active)
 76
 77		# clean up workers that have finished processing
 78		for jobtype in self.worker_pool:
 79			all_workers = self.worker_pool[jobtype]
 80			for worker in all_workers:
 81				if not worker.is_alive():
 82					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 83					worker.join()
 84					self.worker_pool[jobtype].remove(worker)
 85
 86			del all_workers
 87
 88		# check if workers are available for unclaimed jobs
 89		for job in jobs:
 90			jobtype = job.data["jobtype"]
 91
 92			if jobtype in self.modules.workers:
 93				worker_class = self.modules.workers[jobtype]
 94				if jobtype not in self.worker_pool:
 95					self.worker_pool[jobtype] = []
 96
 97				# if a job is of a known type, and that job type has open
 98				# worker slots, start a new worker to run it
 99				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
100					try:
101						job.claim()
102						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
103						worker.start()
104						self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}")
105						self.worker_pool[jobtype].append(worker)
106					except JobClaimedException:
107						# it's fine
108						pass
109			else:
110				self.log.error("Unknown job type: %s" % jobtype)
111
112		time.sleep(1)

Delegate work

Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.

def loop(self):
114	def loop(self):
115		"""
116		Main loop
117
118		Constantly delegates work, until no longer looping, after which all
119		workers are asked to stop their work. Once that has happened, the loop
120		properly ends.
121		"""
122		while self.looping:
123			try:
124				self.delegate()
125			except KeyboardInterrupt:
126				self.looping = False
127
128		self.log.info("Telling all workers to stop doing whatever they're doing...")
129		# request shutdown from all workers except the API
130		# this allows us to use the API to figure out if a certain worker is
131		# hanging during shutdown, for example
132		for jobtype in self.worker_pool:
133			if jobtype == "api":
134				continue
135
136			for worker in self.worker_pool[jobtype]:
137				if hasattr(worker, "request_interrupt"):
138					worker.request_interrupt()
139				else:
140					worker.abort()
141
142		# wait for all workers that we just asked to quit to finish
143		self.log.info("Waiting for all workers to finish...")
144		for jobtype in self.worker_pool:
145			if jobtype == "api":
146				continue
147			for worker in self.worker_pool[jobtype]:
148				self.log.info("Waiting for worker %s..." % jobtype)
149				worker.join()
150
151		# shut down API last
152		for worker in self.worker_pool.get("api", []):
153			worker.request_interrupt()
154			worker.join()
155
156		# abort
157		time.sleep(1)
158		self.log.info("Bye!")

Main loop

Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.

def validate_datasources(self):
160	def validate_datasources(self):
161		"""
162		Validate data sources
163
164		Logs warnings if not all information is precent for the configured data
165		sources.
166		"""
167
168		for datasource in self.modules.datasources:
169			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
170				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
171
172			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource)

Validate data sources

Logs warnings if not all information is precent for the configured data sources.

def abort(self, signal=None, stack=None):
174	def abort(self, signal=None, stack=None):
175		"""
176		Stop looping the delegator, clean up, and prepare for shutdown
177		"""
178		self.log.info("Received SIGTERM")
179
180		# cancel all interruptible postgres queries
181		# this needs to be done before we stop looping since after that no new
182		# jobs will be claimed, and needs to be done here because the worker's
183		# own database connection is busy executing the query that it should
184		# cancel! so we can't use it to update the job and make it get claimed
185		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
186			# this will make all these jobs immediately claimable, i.e. queries
187			# will get cancelled asap
188			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
189			job.claim()
190			job.release(delay=0, claim_after=0)
191
192		# wait until all cancel jobs are completed
193		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
194			time.sleep(0.25)
195
196		# now stop looping (i.e. accepting new jobs)
197		self.looping = False

Stop looping the delegator, clean up, and prepare for shutdown

def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
199	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
200		"""
201		Interrupt a job
202
203		This method can be called via e.g. the API, to interrupt a specific
204		job's worker. The worker can be targeted either with a Job object or
205		with a combination of job type and remote ID, since those uniquely
206		identify a job.
207
208		:param int interrupt_level:  Retry later or cancel?
209		:param Job job:  Job object to cancel worker for
210		:param str remote_id:  Remote ID for worker job to cancel
211		:param str jobtype:  Job type for worker job to cancel
212		"""
213
214		# find worker for given job
215		if job:
216			jobtype = job.data["jobtype"]
217
218		if jobtype not in self.worker_pool:
219			# no jobs of this type currently known
220			return
221
222		for worker in self.worker_pool[jobtype]:
223			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
224				# first cancel any interruptable queries for this job's worker
225				while True:
226					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
227					if not active_queries:
228						# all cancellation jobs have been run
229						break
230
231					for cancel_job in active_queries:
232						if cancel_job.is_claimed:
233							continue
234
235						# this will make the job be run asap
236						cancel_job.claim()
237						cancel_job.release(delay=0, claim_after=0)
238
239					# give the cancel job a moment to run
240					time.sleep(0.25)
241
242				# now all queries are interrupted, formally request an abort
243				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
244				worker.request_interrupt(interrupt_level)
245				return

Interrupt a job

This method can be called via e.g. the API, to interrupt a specific job's worker. The worker can be targeted either with a Job object or with a combination of job type and remote ID, since those uniquely identify a job.

Parameters
  • int interrupt_level: Retry later or cancel?
  • Job job: Job object to cancel worker for
  • str remote_id: Remote ID for worker job to cancel
  • str jobtype: Job type for worker job to cancel