Edit on GitHub

backend.lib.manager

The heart of the app - manages jobs and workers

  1"""
  2The heart of the app - manages jobs and workers
  3"""
  4import signal
  5import time
  6
  7from backend.lib.proxied_requests import DelegatedRequestHandler
  8from common.lib.exceptions import JobClaimedException
  9
 10
 11class WorkerManager:
 12	"""
 13	Manages the job queue and worker pool
 14	"""
 15	queue = None
 16	db = None
 17	log = None
 18	modules = None
 19	proxy_delegator = None
 20
 21	worker_pool = {}
 22	job_mapping = {}
 23	pool = []
 24	looping = True
 25	unknown_jobs = set()
 26
 27	def __init__(self, queue, database, logger, modules, as_daemon=True):
 28		"""
 29		Initialize manager
 30
 31		:param queue:  Job queue
 32		:param database:  Database handler
 33		:param logger:  Logger object
 34		:param modules:  Modules cache via ModuleLoader()
 35		:param bool as_daemon:  Whether the manager is being run as a daemon
 36		"""
 37		self.queue = queue
 38		self.db = database
 39		self.log = logger
 40		self.modules = modules
 41		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
 42
 43		if as_daemon:
 44			signal.signal(signal.SIGTERM, self.abort)
 45			signal.signal(signal.SIGINT, self.request_interrupt)
 46
 47		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 48		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 49		self.validate_datasources()
 50
 51		# queue jobs for workers that always need one
 52		for worker_name, worker in self.modules.workers.items():
 53			if hasattr(worker, "ensure_job"):
 54				self.queue.add_job(jobtype=worker_name, **worker.ensure_job)
 55
 56		self.log.info("4CAT Started")
 57
 58		# flush module collector log buffer
 59		# the logger is not available when this initialises
 60		# but it is now!
 61		if self.modules.log_buffer:
 62			self.log.warning(self.modules.log_buffer)
 63			self.modules.log_buffer = ""
 64
 65		# it's time
 66		self.loop()
 67
 68	def delegate(self):
 69		"""
 70		Delegate work
 71
 72		Checks for open jobs, and then passes those to dedicated workers, if
 73		slots are available for those workers.
 74		"""
 75		jobs = self.queue.get_all_jobs()
 76
 77		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 78		self.log.debug("Running workers: %i" % num_active)
 79
 80		# clean up workers that have finished processing
 81		for jobtype in self.worker_pool:
 82			all_workers = self.worker_pool[jobtype]
 83			for worker in all_workers:
 84				if not worker.is_alive():
 85					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 86					worker.join()
 87					self.worker_pool[jobtype].remove(worker)
 88
 89			del all_workers
 90
 91		# check if workers are available for unclaimed jobs
 92		for job in jobs:
 93			jobtype = job.data["jobtype"]
 94
 95			if jobtype in self.modules.workers:
 96				worker_class = self.modules.workers[jobtype]
 97				if jobtype not in self.worker_pool:
 98					self.worker_pool[jobtype] = []
 99
100				# if a job is of a known type, and that job type has open
101				# worker slots, start a new worker to run it
102				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
103					try:
104						job.claim()
105						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
106						worker.start()
107						self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}")
108						self.worker_pool[jobtype].append(worker)
109					except JobClaimedException:
110						# it's fine
111						pass
112			else:
113				if jobtype not in self.unknown_jobs:
114					self.log.error("Unknown job type: %s" % jobtype)
115					self.unknown_jobs.add(jobtype)
116
117		time.sleep(1)
118
119	def loop(self):
120		"""
121		Main loop
122
123		Constantly delegates work, until no longer looping, after which all
124		workers are asked to stop their work. Once that has happened, the loop
125		properly ends.
126		"""
127		while self.looping:
128			try:
129				self.delegate()
130			except KeyboardInterrupt:
131				self.looping = False
132
133		self.log.info("Telling all workers to stop doing whatever they're doing...")
134
135		# request shutdown from all workers except the API
136		# this allows us to use the API to figure out if a certain worker is
137		# hanging during shutdown, for example
138		for jobtype in self.worker_pool:
139			if jobtype == "api":
140				continue
141
142			for worker in self.worker_pool[jobtype]:
143				if hasattr(worker, "request_interrupt"):
144					worker.request_interrupt()
145				else:
146					worker.abort()
147
148		# wait for all workers that we just asked to quit to finish
149		self.log.info("Waiting for all workers to finish...")
150		for jobtype in self.worker_pool:
151			if jobtype == "api":
152				continue
153			for worker in self.worker_pool[jobtype]:
154				self.log.info("Waiting for worker %s..." % jobtype)
155				worker.join()
156
157		# shut down API last
158		for worker in self.worker_pool.get("api", []):
159			worker.request_interrupt()
160			worker.join()
161
162		# abort
163		time.sleep(1)
164		self.log.info("Bye!")
165
166	def validate_datasources(self):
167		"""
168		Validate data sources
169
170		Logs warnings if not all information is precent for the configured data
171		sources.
172		"""
173
174		for datasource in self.modules.datasources:
175			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
176				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
177
178			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
179
180	def abort(self, signal=None, stack=None):
181		"""
182		Stop looping the delegator, clean up, and prepare for shutdown
183		"""
184		self.log.info("Received SIGTERM")
185
186		# cancel all interruptible postgres queries
187		# this needs to be done before we stop looping since after that no new
188		# jobs will be claimed, and needs to be done here because the worker's
189		# own database connection is busy executing the query that it should
190		# cancel! so we can't use it to update the job and make it get claimed
191		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
192			# this will make all these jobs immediately claimable, i.e. queries
193			# will get cancelled asap
194			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
195			job.claim()
196			job.release(delay=0, claim_after=0)
197
198		# wait until all cancel jobs are completed
199		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
200			time.sleep(0.25)
201
202		# now stop looping (i.e. accepting new jobs)
203		self.looping = False
204
205	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
206		"""
207		Interrupt a job
208
209		This method can be called via e.g. the API, to interrupt a specific
210		job's worker. The worker can be targeted either with a Job object or
211		with a combination of job type and remote ID, since those uniquely
212		identify a job.
213
214		:param int interrupt_level:  Retry later or cancel?
215		:param Job job:  Job object to cancel worker for
216		:param str remote_id:  Remote ID for worker job to cancel
217		:param str jobtype:  Job type for worker job to cancel
218		"""
219
220		# find worker for given job
221		if job:
222			jobtype = job.data["jobtype"]
223
224		if jobtype not in self.worker_pool:
225			# no jobs of this type currently known
226			return
227
228		for worker in self.worker_pool[jobtype]:
229			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
230				# first cancel any interruptable queries for this job's worker
231				while True:
232					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
233					if not active_queries:
234						# all cancellation jobs have been run
235						break
236
237					for cancel_job in active_queries:
238						if cancel_job.is_claimed:
239							continue
240
241						# this will make the job be run asap
242						cancel_job.claim()
243						cancel_job.release(delay=0, claim_after=0)
244
245					# give the cancel job a moment to run
246					time.sleep(0.25)
247
248				# now all queries are interrupted, formally request an abort
249				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
250				worker.request_interrupt(interrupt_level)
251				return
class WorkerManager:
 12class WorkerManager:
 13	"""
 14	Manages the job queue and worker pool
 15	"""
 16	queue = None
 17	db = None
 18	log = None
 19	modules = None
 20	proxy_delegator = None
 21
 22	worker_pool = {}
 23	job_mapping = {}
 24	pool = []
 25	looping = True
 26	unknown_jobs = set()
 27
 28	def __init__(self, queue, database, logger, modules, as_daemon=True):
 29		"""
 30		Initialize manager
 31
 32		:param queue:  Job queue
 33		:param database:  Database handler
 34		:param logger:  Logger object
 35		:param modules:  Modules cache via ModuleLoader()
 36		:param bool as_daemon:  Whether the manager is being run as a daemon
 37		"""
 38		self.queue = queue
 39		self.db = database
 40		self.log = logger
 41		self.modules = modules
 42		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
 43
 44		if as_daemon:
 45			signal.signal(signal.SIGTERM, self.abort)
 46			signal.signal(signal.SIGINT, self.request_interrupt)
 47
 48		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 49		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 50		self.validate_datasources()
 51
 52		# queue jobs for workers that always need one
 53		for worker_name, worker in self.modules.workers.items():
 54			if hasattr(worker, "ensure_job"):
 55				self.queue.add_job(jobtype=worker_name, **worker.ensure_job)
 56
 57		self.log.info("4CAT Started")
 58
 59		# flush module collector log buffer
 60		# the logger is not available when this initialises
 61		# but it is now!
 62		if self.modules.log_buffer:
 63			self.log.warning(self.modules.log_buffer)
 64			self.modules.log_buffer = ""
 65
 66		# it's time
 67		self.loop()
 68
 69	def delegate(self):
 70		"""
 71		Delegate work
 72
 73		Checks for open jobs, and then passes those to dedicated workers, if
 74		slots are available for those workers.
 75		"""
 76		jobs = self.queue.get_all_jobs()
 77
 78		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 79		self.log.debug("Running workers: %i" % num_active)
 80
 81		# clean up workers that have finished processing
 82		for jobtype in self.worker_pool:
 83			all_workers = self.worker_pool[jobtype]
 84			for worker in all_workers:
 85				if not worker.is_alive():
 86					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 87					worker.join()
 88					self.worker_pool[jobtype].remove(worker)
 89
 90			del all_workers
 91
 92		# check if workers are available for unclaimed jobs
 93		for job in jobs:
 94			jobtype = job.data["jobtype"]
 95
 96			if jobtype in self.modules.workers:
 97				worker_class = self.modules.workers[jobtype]
 98				if jobtype not in self.worker_pool:
 99					self.worker_pool[jobtype] = []
100
101				# if a job is of a known type, and that job type has open
102				# worker slots, start a new worker to run it
103				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
104					try:
105						job.claim()
106						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
107						worker.start()
108						self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}")
109						self.worker_pool[jobtype].append(worker)
110					except JobClaimedException:
111						# it's fine
112						pass
113			else:
114				if jobtype not in self.unknown_jobs:
115					self.log.error("Unknown job type: %s" % jobtype)
116					self.unknown_jobs.add(jobtype)
117
118		time.sleep(1)
119
120	def loop(self):
121		"""
122		Main loop
123
124		Constantly delegates work, until no longer looping, after which all
125		workers are asked to stop their work. Once that has happened, the loop
126		properly ends.
127		"""
128		while self.looping:
129			try:
130				self.delegate()
131			except KeyboardInterrupt:
132				self.looping = False
133
134		self.log.info("Telling all workers to stop doing whatever they're doing...")
135
136		# request shutdown from all workers except the API
137		# this allows us to use the API to figure out if a certain worker is
138		# hanging during shutdown, for example
139		for jobtype in self.worker_pool:
140			if jobtype == "api":
141				continue
142
143			for worker in self.worker_pool[jobtype]:
144				if hasattr(worker, "request_interrupt"):
145					worker.request_interrupt()
146				else:
147					worker.abort()
148
149		# wait for all workers that we just asked to quit to finish
150		self.log.info("Waiting for all workers to finish...")
151		for jobtype in self.worker_pool:
152			if jobtype == "api":
153				continue
154			for worker in self.worker_pool[jobtype]:
155				self.log.info("Waiting for worker %s..." % jobtype)
156				worker.join()
157
158		# shut down API last
159		for worker in self.worker_pool.get("api", []):
160			worker.request_interrupt()
161			worker.join()
162
163		# abort
164		time.sleep(1)
165		self.log.info("Bye!")
166
167	def validate_datasources(self):
168		"""
169		Validate data sources
170
171		Logs warnings if not all information is precent for the configured data
172		sources.
173		"""
174
175		for datasource in self.modules.datasources:
176			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
177				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
178
179			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
180
181	def abort(self, signal=None, stack=None):
182		"""
183		Stop looping the delegator, clean up, and prepare for shutdown
184		"""
185		self.log.info("Received SIGTERM")
186
187		# cancel all interruptible postgres queries
188		# this needs to be done before we stop looping since after that no new
189		# jobs will be claimed, and needs to be done here because the worker's
190		# own database connection is busy executing the query that it should
191		# cancel! so we can't use it to update the job and make it get claimed
192		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
193			# this will make all these jobs immediately claimable, i.e. queries
194			# will get cancelled asap
195			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
196			job.claim()
197			job.release(delay=0, claim_after=0)
198
199		# wait until all cancel jobs are completed
200		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
201			time.sleep(0.25)
202
203		# now stop looping (i.e. accepting new jobs)
204		self.looping = False
205
206	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
207		"""
208		Interrupt a job
209
210		This method can be called via e.g. the API, to interrupt a specific
211		job's worker. The worker can be targeted either with a Job object or
212		with a combination of job type and remote ID, since those uniquely
213		identify a job.
214
215		:param int interrupt_level:  Retry later or cancel?
216		:param Job job:  Job object to cancel worker for
217		:param str remote_id:  Remote ID for worker job to cancel
218		:param str jobtype:  Job type for worker job to cancel
219		"""
220
221		# find worker for given job
222		if job:
223			jobtype = job.data["jobtype"]
224
225		if jobtype not in self.worker_pool:
226			# no jobs of this type currently known
227			return
228
229		for worker in self.worker_pool[jobtype]:
230			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
231				# first cancel any interruptable queries for this job's worker
232				while True:
233					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
234					if not active_queries:
235						# all cancellation jobs have been run
236						break
237
238					for cancel_job in active_queries:
239						if cancel_job.is_claimed:
240							continue
241
242						# this will make the job be run asap
243						cancel_job.claim()
244						cancel_job.release(delay=0, claim_after=0)
245
246					# give the cancel job a moment to run
247					time.sleep(0.25)
248
249				# now all queries are interrupted, formally request an abort
250				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
251				worker.request_interrupt(interrupt_level)
252				return

Manages the job queue and worker pool

WorkerManager(queue, database, logger, modules, as_daemon=True)
28	def __init__(self, queue, database, logger, modules, as_daemon=True):
29		"""
30		Initialize manager
31
32		:param queue:  Job queue
33		:param database:  Database handler
34		:param logger:  Logger object
35		:param modules:  Modules cache via ModuleLoader()
36		:param bool as_daemon:  Whether the manager is being run as a daemon
37		"""
38		self.queue = queue
39		self.db = database
40		self.log = logger
41		self.modules = modules
42		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
43
44		if as_daemon:
45			signal.signal(signal.SIGTERM, self.abort)
46			signal.signal(signal.SIGINT, self.request_interrupt)
47
48		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
49		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
50		self.validate_datasources()
51
52		# queue jobs for workers that always need one
53		for worker_name, worker in self.modules.workers.items():
54			if hasattr(worker, "ensure_job"):
55				self.queue.add_job(jobtype=worker_name, **worker.ensure_job)
56
57		self.log.info("4CAT Started")
58
59		# flush module collector log buffer
60		# the logger is not available when this initialises
61		# but it is now!
62		if self.modules.log_buffer:
63			self.log.warning(self.modules.log_buffer)
64			self.modules.log_buffer = ""
65
66		# it's time
67		self.loop()

Initialize manager

Parameters
  • queue: Job queue
  • database: Database handler
  • logger: Logger object
  • modules: Modules cache via ModuleLoader()
  • bool as_daemon: Whether the manager is being run as a daemon
queue = None
db = None
log = None
modules = None
proxy_delegator = None
worker_pool = {}
job_mapping = {}
pool = []
looping = True
unknown_jobs = set()
def delegate(self):
 69	def delegate(self):
 70		"""
 71		Delegate work
 72
 73		Checks for open jobs, and then passes those to dedicated workers, if
 74		slots are available for those workers.
 75		"""
 76		jobs = self.queue.get_all_jobs()
 77
 78		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 79		self.log.debug("Running workers: %i" % num_active)
 80
 81		# clean up workers that have finished processing
 82		for jobtype in self.worker_pool:
 83			all_workers = self.worker_pool[jobtype]
 84			for worker in all_workers:
 85				if not worker.is_alive():
 86					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 87					worker.join()
 88					self.worker_pool[jobtype].remove(worker)
 89
 90			del all_workers
 91
 92		# check if workers are available for unclaimed jobs
 93		for job in jobs:
 94			jobtype = job.data["jobtype"]
 95
 96			if jobtype in self.modules.workers:
 97				worker_class = self.modules.workers[jobtype]
 98				if jobtype not in self.worker_pool:
 99					self.worker_pool[jobtype] = []
100
101				# if a job is of a known type, and that job type has open
102				# worker slots, start a new worker to run it
103				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
104					try:
105						job.claim()
106						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
107						worker.start()
108						self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}")
109						self.worker_pool[jobtype].append(worker)
110					except JobClaimedException:
111						# it's fine
112						pass
113			else:
114				if jobtype not in self.unknown_jobs:
115					self.log.error("Unknown job type: %s" % jobtype)
116					self.unknown_jobs.add(jobtype)
117
118		time.sleep(1)

Delegate work

Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.

def loop(self):
120	def loop(self):
121		"""
122		Main loop
123
124		Constantly delegates work, until no longer looping, after which all
125		workers are asked to stop their work. Once that has happened, the loop
126		properly ends.
127		"""
128		while self.looping:
129			try:
130				self.delegate()
131			except KeyboardInterrupt:
132				self.looping = False
133
134		self.log.info("Telling all workers to stop doing whatever they're doing...")
135
136		# request shutdown from all workers except the API
137		# this allows us to use the API to figure out if a certain worker is
138		# hanging during shutdown, for example
139		for jobtype in self.worker_pool:
140			if jobtype == "api":
141				continue
142
143			for worker in self.worker_pool[jobtype]:
144				if hasattr(worker, "request_interrupt"):
145					worker.request_interrupt()
146				else:
147					worker.abort()
148
149		# wait for all workers that we just asked to quit to finish
150		self.log.info("Waiting for all workers to finish...")
151		for jobtype in self.worker_pool:
152			if jobtype == "api":
153				continue
154			for worker in self.worker_pool[jobtype]:
155				self.log.info("Waiting for worker %s..." % jobtype)
156				worker.join()
157
158		# shut down API last
159		for worker in self.worker_pool.get("api", []):
160			worker.request_interrupt()
161			worker.join()
162
163		# abort
164		time.sleep(1)
165		self.log.info("Bye!")

Main loop

Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.

def validate_datasources(self):
167	def validate_datasources(self):
168		"""
169		Validate data sources
170
171		Logs warnings if not all information is precent for the configured data
172		sources.
173		"""
174
175		for datasource in self.modules.datasources:
176			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
177				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
178
179			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)

Validate data sources

Logs warnings if not all information is precent for the configured data sources.

def abort(self, signal=None, stack=None):
181	def abort(self, signal=None, stack=None):
182		"""
183		Stop looping the delegator, clean up, and prepare for shutdown
184		"""
185		self.log.info("Received SIGTERM")
186
187		# cancel all interruptible postgres queries
188		# this needs to be done before we stop looping since after that no new
189		# jobs will be claimed, and needs to be done here because the worker's
190		# own database connection is busy executing the query that it should
191		# cancel! so we can't use it to update the job and make it get claimed
192		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
193			# this will make all these jobs immediately claimable, i.e. queries
194			# will get cancelled asap
195			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
196			job.claim()
197			job.release(delay=0, claim_after=0)
198
199		# wait until all cancel jobs are completed
200		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
201			time.sleep(0.25)
202
203		# now stop looping (i.e. accepting new jobs)
204		self.looping = False

Stop looping the delegator, clean up, and prepare for shutdown

def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
206	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
207		"""
208		Interrupt a job
209
210		This method can be called via e.g. the API, to interrupt a specific
211		job's worker. The worker can be targeted either with a Job object or
212		with a combination of job type and remote ID, since those uniquely
213		identify a job.
214
215		:param int interrupt_level:  Retry later or cancel?
216		:param Job job:  Job object to cancel worker for
217		:param str remote_id:  Remote ID for worker job to cancel
218		:param str jobtype:  Job type for worker job to cancel
219		"""
220
221		# find worker for given job
222		if job:
223			jobtype = job.data["jobtype"]
224
225		if jobtype not in self.worker_pool:
226			# no jobs of this type currently known
227			return
228
229		for worker in self.worker_pool[jobtype]:
230			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
231				# first cancel any interruptable queries for this job's worker
232				while True:
233					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
234					if not active_queries:
235						# all cancellation jobs have been run
236						break
237
238					for cancel_job in active_queries:
239						if cancel_job.is_claimed:
240							continue
241
242						# this will make the job be run asap
243						cancel_job.claim()
244						cancel_job.release(delay=0, claim_after=0)
245
246					# give the cancel job a moment to run
247					time.sleep(0.25)
248
249				# now all queries are interrupted, formally request an abort
250				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
251				worker.request_interrupt(interrupt_level)
252				return

Interrupt a job

This method can be called via e.g. the API, to interrupt a specific job's worker. The worker can be targeted either with a Job object or with a combination of job type and remote ID, since those uniquely identify a job.

Parameters
  • int interrupt_level: Retry later or cancel?
  • Job job: Job object to cancel worker for
  • str remote_id: Remote ID for worker job to cancel
  • str jobtype: Job type for worker job to cancel