Edit on GitHub

backend.lib.manager

The heart of the app - manages jobs and workers

  1"""
  2The heart of the app - manages jobs and workers
  3"""
  4import threading
  5import signal
  6import time
  7
  8from collections.abc import Generator
  9
 10from backend.lib.proxied_requests import DelegatedRequestHandler
 11from backend.lib.worker import BasicWorker
 12from common.lib.exceptions import JobClaimedException
 13
 14# for now, this is hardcoded - could be dynamic or depending on the queue ID in
 15# the future
 16MAX_JOBS_PER_QUEUE = 1
 17
 18class WorkerManager:
 19	"""
 20	Manages the job queue and worker pool
 21	"""
 22	queue = None
 23	db = None
 24	log = None
 25	modules = None
 26	proxy_delegator = None
 27
 28	worker_pool = {}
 29	job_mapping = {}
 30	pool = []
 31	looping = True
 32	unknown_jobs = set()
 33
 34	def __init__(self, queue, database, logger, modules, as_daemon=True):
 35		"""
 36		Initialize manager
 37
 38		:param queue:  Job queue
 39		:param database:  Database handler
 40		:param logger:  Logger object
 41		:param modules:  Modules cache via ModuleLoader()
 42		:param bool as_daemon:  Whether the manager is being run as a daemon
 43		"""
 44		self.queue = queue
 45		self.db = database
 46		self.log = logger
 47		self.modules = modules
 48		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
 49
 50		if as_daemon:
 51			signal.signal(signal.SIGTERM, self.abort)
 52
 53		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 54		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 55		self.validate_datasources()
 56
 57		# queue jobs for workers that always need one
 58		for worker_name, worker in self.modules.workers.items():
 59			if hasattr(worker, "ensure_job"):
 60				# ensure_job is a class method that returns a dict with job parameters if job should be added
 61				# pass config for some workers (e.g., web studies extensions)
 62				try:
 63					self.log.debug(f"Ensuring job exists for worker {worker_name}")
 64					job_params = worker.ensure_job(config=self.modules.config)
 65				except Exception as e:
 66					self.log.error(f"Error while ensuring job for worker {worker_name}: {e}")
 67					job_params = None
 68
 69				if job_params:
 70					self.queue.add_job(worker_or_type=worker, **job_params)
 71
 72		self.ident = threading.get_ident()
 73		self.log.info("4CAT Started")
 74
 75		# flush module collector log buffer
 76		# the logger is not available when this initialises
 77		# but it is now!
 78		if self.modules.log_buffer:
 79			self.log.warning(self.modules.log_buffer)
 80			self.modules.log_buffer = ""
 81
 82		# it's time
 83		self.loop()
 84
 85	def delegate(self):
 86		"""
 87		Delegate work
 88
 89		Checks for open jobs, and then passes those to dedicated workers, if
 90		slots are available for those workers.
 91		"""
 92		jobs = self.queue.get_all_jobs()
 93
 94		num_active = len(list(self.iterate_active_workers()))
 95		self.log.debug2(f"Running {num_active} active workers")
 96
 97		# clean up workers that have finished processing
 98		# not using iterate_active_workers() here because we're going to change
 99		# the dictionary while iterating through it
100		for queue_id in self.worker_pool:
101			all_workers = self.worker_pool[queue_id]
102			for worker in all_workers:
103				if not worker.is_alive():
104					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
105					worker.join()
106					self.worker_pool[queue_id].remove(worker)
107
108			del all_workers
109
110		# check if workers are available for unclaimed jobs
111		for job in jobs:
112			queue_id = job.data["queue_id"]
113			jobtype = job.data["jobtype"]
114
115			if jobtype in self.modules.workers:
116				worker_class = self.modules.workers[jobtype]
117				if queue_id not in self.worker_pool:
118					self.worker_pool[queue_id] = []
119
120				# if a job is of a known type, and that job type has open
121				# worker slots, start a new worker to run it
122				if len(self.worker_pool[queue_id]) < MAX_JOBS_PER_QUEUE:
123					try:
124						job.claim()
125						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
126						worker.start()
127						log_level = self.log.levels["DEBUG"] if job.data["interval"] else self.log.levels["INFO"]
128						self.log.log(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}", log_level)
129						self.worker_pool[queue_id].append(worker)
130					except JobClaimedException:
131						# it's fine
132						pass
133			else:
134				if jobtype not in self.unknown_jobs:
135					self.log.error(f"Unknown job type: {jobtype}")
136					self.unknown_jobs.add(jobtype)
137
138		time.sleep(1)
139
140	def loop(self):
141		"""
142		Main loop
143
144		Constantly delegates work, until no longer looping, after which all
145		workers are asked to stop their work. Once that has happened, the loop
146		properly ends.
147		"""
148		while self.looping:
149			try:
150				self.delegate()
151			except KeyboardInterrupt:
152				self.looping = False
153
154		self.log.info("Telling all workers to stop doing whatever they're doing...")
155
156		# request shutdown from all workers except the API
157		# this allows us to use the API to figure out if a certain worker is
158		# hanging during shutdown, for example
159		for queue_id, worker in self.iterate_active_workers():
160			if worker.type == "api":
161				continue
162
163			if hasattr(worker, "request_interrupt"):
164				worker.request_interrupt()
165			else:
166				worker.abort()
167
168		# wait for all workers that we just asked to quit to finish
169		self.log.info("Waiting for all workers to finish...")
170		for queue_id, worker in self.iterate_active_workers():
171			if worker.type == "api":
172				continue
173
174			self.log.info(f"Waiting for worker of type {worker.type}...")
175			worker.join()
176
177		# shut down any remaining workers (i.e. the API)
178		for queue_id, worker in self.iterate_active_workers():
179			worker.request_interrupt()
180			worker.join()
181
182		# abort
183		time.sleep(1)
184		self.log.info("Bye!")
185
186	def validate_datasources(self):
187		"""
188		Validate data sources
189
190		Logs warnings if not all information is present for the configured data
191		sources.
192		"""
193		for datasource in self.modules.datasources:
194			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
195				self.log.error(f"No search worker defined for data source {datasource} or its modules are missing. "
196				               f"Datasets cannot be created for it.")
197
198			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
199
200	def abort(self, signal=None, stack=None):
201		"""
202		Stop looping the delegator, clean up, and prepare for shutdown
203		"""
204		self.log.info("Received SIGTERM")
205
206		# cancel all interruptible postgres queries
207		# this needs to be done before we stop looping since after that no new
208		# jobs will be claimed, and needs to be done here because the worker's
209		# own database connection is busy executing the query that it should
210		# cancel! so we can't use it to update the job and make it get claimed
211		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
212			# this will make all these jobs immediately claimable, i.e. queries
213			# will get cancelled asap
214			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
215			job.claim()
216			job.release(delay=0, claim_after=0)
217
218		# wait until all cancel jobs are completed
219		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
220			time.sleep(0.25)
221
222		# now stop looping (i.e. accepting new jobs)
223		self.looping = False
224
225	def request_interrupt(self, interrupt_level, job):
226		"""
227		Interrupt a specific job
228
229		This method can be called via e.g. the API, to interrupt a specific
230		job's worker. The worker for the given Job object is searched for and
231		if it exists, its `request_interrupt()` method is called.
232
233		:param int interrupt_level:  Retry later or cancel?
234		:param Job job:  Job object to cancel worker for
235		"""
236		for queue_id, worker in self.iterate_active_workers():
237			if worker.job.data["id"] == job.data["id"]:
238				# first cancel any interruptable postgres queries for this job's worker
239				while True:
240					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
241					if not active_queries:
242						# all cancellation jobs have been run
243						break
244
245					for cancel_job in active_queries:
246						if cancel_job.is_claimed:
247							continue
248
249						# this will cause the job be run asap
250						cancel_job.claim()
251						cancel_job.release(delay=0, claim_after=0)
252
253					# give the cancel job a moment to run
254					time.sleep(0.25)
255
256				# now all queries are interrupted, formally request an abort
257				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
258				worker.request_interrupt(interrupt_level)
259				return
260
261	def iterate_active_workers(self) -> Generator[tuple[str, BasicWorker]]:
262		"""
263		Return all active workers
264
265		Convenience function to avoid having to always use two nested for
266		loops.
267
268		:return:  Generator that yields tuples of (queue_id, worker)
269		"""
270		for queue_id in self.worker_pool:
271			for worker in self.worker_pool[queue_id]:
272				yield queue_id, worker
MAX_JOBS_PER_QUEUE = 1
class WorkerManager:
 19class WorkerManager:
 20	"""
 21	Manages the job queue and worker pool
 22	"""
 23	queue = None
 24	db = None
 25	log = None
 26	modules = None
 27	proxy_delegator = None
 28
 29	worker_pool = {}
 30	job_mapping = {}
 31	pool = []
 32	looping = True
 33	unknown_jobs = set()
 34
 35	def __init__(self, queue, database, logger, modules, as_daemon=True):
 36		"""
 37		Initialize manager
 38
 39		:param queue:  Job queue
 40		:param database:  Database handler
 41		:param logger:  Logger object
 42		:param modules:  Modules cache via ModuleLoader()
 43		:param bool as_daemon:  Whether the manager is being run as a daemon
 44		"""
 45		self.queue = queue
 46		self.db = database
 47		self.log = logger
 48		self.modules = modules
 49		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
 50
 51		if as_daemon:
 52			signal.signal(signal.SIGTERM, self.abort)
 53
 54		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 55		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 56		self.validate_datasources()
 57
 58		# queue jobs for workers that always need one
 59		for worker_name, worker in self.modules.workers.items():
 60			if hasattr(worker, "ensure_job"):
 61				# ensure_job is a class method that returns a dict with job parameters if job should be added
 62				# pass config for some workers (e.g., web studies extensions)
 63				try:
 64					self.log.debug(f"Ensuring job exists for worker {worker_name}")
 65					job_params = worker.ensure_job(config=self.modules.config)
 66				except Exception as e:
 67					self.log.error(f"Error while ensuring job for worker {worker_name}: {e}")
 68					job_params = None
 69
 70				if job_params:
 71					self.queue.add_job(worker_or_type=worker, **job_params)
 72
 73		self.ident = threading.get_ident()
 74		self.log.info("4CAT Started")
 75
 76		# flush module collector log buffer
 77		# the logger is not available when this initialises
 78		# but it is now!
 79		if self.modules.log_buffer:
 80			self.log.warning(self.modules.log_buffer)
 81			self.modules.log_buffer = ""
 82
 83		# it's time
 84		self.loop()
 85
 86	def delegate(self):
 87		"""
 88		Delegate work
 89
 90		Checks for open jobs, and then passes those to dedicated workers, if
 91		slots are available for those workers.
 92		"""
 93		jobs = self.queue.get_all_jobs()
 94
 95		num_active = len(list(self.iterate_active_workers()))
 96		self.log.debug2(f"Running {num_active} active workers")
 97
 98		# clean up workers that have finished processing
 99		# not using iterate_active_workers() here because we're going to change
100		# the dictionary while iterating through it
101		for queue_id in self.worker_pool:
102			all_workers = self.worker_pool[queue_id]
103			for worker in all_workers:
104				if not worker.is_alive():
105					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
106					worker.join()
107					self.worker_pool[queue_id].remove(worker)
108
109			del all_workers
110
111		# check if workers are available for unclaimed jobs
112		for job in jobs:
113			queue_id = job.data["queue_id"]
114			jobtype = job.data["jobtype"]
115
116			if jobtype in self.modules.workers:
117				worker_class = self.modules.workers[jobtype]
118				if queue_id not in self.worker_pool:
119					self.worker_pool[queue_id] = []
120
121				# if a job is of a known type, and that job type has open
122				# worker slots, start a new worker to run it
123				if len(self.worker_pool[queue_id]) < MAX_JOBS_PER_QUEUE:
124					try:
125						job.claim()
126						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
127						worker.start()
128						log_level = self.log.levels["DEBUG"] if job.data["interval"] else self.log.levels["INFO"]
129						self.log.log(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}", log_level)
130						self.worker_pool[queue_id].append(worker)
131					except JobClaimedException:
132						# it's fine
133						pass
134			else:
135				if jobtype not in self.unknown_jobs:
136					self.log.error(f"Unknown job type: {jobtype}")
137					self.unknown_jobs.add(jobtype)
138
139		time.sleep(1)
140
141	def loop(self):
142		"""
143		Main loop
144
145		Constantly delegates work, until no longer looping, after which all
146		workers are asked to stop their work. Once that has happened, the loop
147		properly ends.
148		"""
149		while self.looping:
150			try:
151				self.delegate()
152			except KeyboardInterrupt:
153				self.looping = False
154
155		self.log.info("Telling all workers to stop doing whatever they're doing...")
156
157		# request shutdown from all workers except the API
158		# this allows us to use the API to figure out if a certain worker is
159		# hanging during shutdown, for example
160		for queue_id, worker in self.iterate_active_workers():
161			if worker.type == "api":
162				continue
163
164			if hasattr(worker, "request_interrupt"):
165				worker.request_interrupt()
166			else:
167				worker.abort()
168
169		# wait for all workers that we just asked to quit to finish
170		self.log.info("Waiting for all workers to finish...")
171		for queue_id, worker in self.iterate_active_workers():
172			if worker.type == "api":
173				continue
174
175			self.log.info(f"Waiting for worker of type {worker.type}...")
176			worker.join()
177
178		# shut down any remaining workers (i.e. the API)
179		for queue_id, worker in self.iterate_active_workers():
180			worker.request_interrupt()
181			worker.join()
182
183		# abort
184		time.sleep(1)
185		self.log.info("Bye!")
186
187	def validate_datasources(self):
188		"""
189		Validate data sources
190
191		Logs warnings if not all information is present for the configured data
192		sources.
193		"""
194		for datasource in self.modules.datasources:
195			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
196				self.log.error(f"No search worker defined for data source {datasource} or its modules are missing. "
197				               f"Datasets cannot be created for it.")
198
199			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
200
201	def abort(self, signal=None, stack=None):
202		"""
203		Stop looping the delegator, clean up, and prepare for shutdown
204		"""
205		self.log.info("Received SIGTERM")
206
207		# cancel all interruptible postgres queries
208		# this needs to be done before we stop looping since after that no new
209		# jobs will be claimed, and needs to be done here because the worker's
210		# own database connection is busy executing the query that it should
211		# cancel! so we can't use it to update the job and make it get claimed
212		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
213			# this will make all these jobs immediately claimable, i.e. queries
214			# will get cancelled asap
215			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
216			job.claim()
217			job.release(delay=0, claim_after=0)
218
219		# wait until all cancel jobs are completed
220		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
221			time.sleep(0.25)
222
223		# now stop looping (i.e. accepting new jobs)
224		self.looping = False
225
226	def request_interrupt(self, interrupt_level, job):
227		"""
228		Interrupt a specific job
229
230		This method can be called via e.g. the API, to interrupt a specific
231		job's worker. The worker for the given Job object is searched for and
232		if it exists, its `request_interrupt()` method is called.
233
234		:param int interrupt_level:  Retry later or cancel?
235		:param Job job:  Job object to cancel worker for
236		"""
237		for queue_id, worker in self.iterate_active_workers():
238			if worker.job.data["id"] == job.data["id"]:
239				# first cancel any interruptable postgres queries for this job's worker
240				while True:
241					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
242					if not active_queries:
243						# all cancellation jobs have been run
244						break
245
246					for cancel_job in active_queries:
247						if cancel_job.is_claimed:
248							continue
249
250						# this will cause the job be run asap
251						cancel_job.claim()
252						cancel_job.release(delay=0, claim_after=0)
253
254					# give the cancel job a moment to run
255					time.sleep(0.25)
256
257				# now all queries are interrupted, formally request an abort
258				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
259				worker.request_interrupt(interrupt_level)
260				return
261
262	def iterate_active_workers(self) -> Generator[tuple[str, BasicWorker]]:
263		"""
264		Return all active workers
265
266		Convenience function to avoid having to always use two nested for
267		loops.
268
269		:return:  Generator that yields tuples of (queue_id, worker)
270		"""
271		for queue_id in self.worker_pool:
272			for worker in self.worker_pool[queue_id]:
273				yield queue_id, worker

Manages the job queue and worker pool

WorkerManager(queue, database, logger, modules, as_daemon=True)
35	def __init__(self, queue, database, logger, modules, as_daemon=True):
36		"""
37		Initialize manager
38
39		:param queue:  Job queue
40		:param database:  Database handler
41		:param logger:  Logger object
42		:param modules:  Modules cache via ModuleLoader()
43		:param bool as_daemon:  Whether the manager is being run as a daemon
44		"""
45		self.queue = queue
46		self.db = database
47		self.log = logger
48		self.modules = modules
49		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
50
51		if as_daemon:
52			signal.signal(signal.SIGTERM, self.abort)
53
54		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
55		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
56		self.validate_datasources()
57
58		# queue jobs for workers that always need one
59		for worker_name, worker in self.modules.workers.items():
60			if hasattr(worker, "ensure_job"):
61				# ensure_job is a class method that returns a dict with job parameters if job should be added
62				# pass config for some workers (e.g., web studies extensions)
63				try:
64					self.log.debug(f"Ensuring job exists for worker {worker_name}")
65					job_params = worker.ensure_job(config=self.modules.config)
66				except Exception as e:
67					self.log.error(f"Error while ensuring job for worker {worker_name}: {e}")
68					job_params = None
69
70				if job_params:
71					self.queue.add_job(worker_or_type=worker, **job_params)
72
73		self.ident = threading.get_ident()
74		self.log.info("4CAT Started")
75
76		# flush module collector log buffer
77		# the logger is not available when this initialises
78		# but it is now!
79		if self.modules.log_buffer:
80			self.log.warning(self.modules.log_buffer)
81			self.modules.log_buffer = ""
82
83		# it's time
84		self.loop()

Initialize manager

Parameters
  • queue: Job queue
  • database: Database handler
  • logger: Logger object
  • modules: Modules cache via ModuleLoader()
  • bool as_daemon: Whether the manager is being run as a daemon
queue = None
db = None
log = None
modules = None
proxy_delegator = None
worker_pool = {}
job_mapping = {}
pool = []
looping = True
unknown_jobs = set()
ident
def delegate(self):
 86	def delegate(self):
 87		"""
 88		Delegate work
 89
 90		Checks for open jobs, and then passes those to dedicated workers, if
 91		slots are available for those workers.
 92		"""
 93		jobs = self.queue.get_all_jobs()
 94
 95		num_active = len(list(self.iterate_active_workers()))
 96		self.log.debug2(f"Running {num_active} active workers")
 97
 98		# clean up workers that have finished processing
 99		# not using iterate_active_workers() here because we're going to change
100		# the dictionary while iterating through it
101		for queue_id in self.worker_pool:
102			all_workers = self.worker_pool[queue_id]
103			for worker in all_workers:
104				if not worker.is_alive():
105					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
106					worker.join()
107					self.worker_pool[queue_id].remove(worker)
108
109			del all_workers
110
111		# check if workers are available for unclaimed jobs
112		for job in jobs:
113			queue_id = job.data["queue_id"]
114			jobtype = job.data["jobtype"]
115
116			if jobtype in self.modules.workers:
117				worker_class = self.modules.workers[jobtype]
118				if queue_id not in self.worker_pool:
119					self.worker_pool[queue_id] = []
120
121				# if a job is of a known type, and that job type has open
122				# worker slots, start a new worker to run it
123				if len(self.worker_pool[queue_id]) < MAX_JOBS_PER_QUEUE:
124					try:
125						job.claim()
126						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
127						worker.start()
128						log_level = self.log.levels["DEBUG"] if job.data["interval"] else self.log.levels["INFO"]
129						self.log.log(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}", log_level)
130						self.worker_pool[queue_id].append(worker)
131					except JobClaimedException:
132						# it's fine
133						pass
134			else:
135				if jobtype not in self.unknown_jobs:
136					self.log.error(f"Unknown job type: {jobtype}")
137					self.unknown_jobs.add(jobtype)
138
139		time.sleep(1)

Delegate work

Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.

def loop(self):
141	def loop(self):
142		"""
143		Main loop
144
145		Constantly delegates work, until no longer looping, after which all
146		workers are asked to stop their work. Once that has happened, the loop
147		properly ends.
148		"""
149		while self.looping:
150			try:
151				self.delegate()
152			except KeyboardInterrupt:
153				self.looping = False
154
155		self.log.info("Telling all workers to stop doing whatever they're doing...")
156
157		# request shutdown from all workers except the API
158		# this allows us to use the API to figure out if a certain worker is
159		# hanging during shutdown, for example
160		for queue_id, worker in self.iterate_active_workers():
161			if worker.type == "api":
162				continue
163
164			if hasattr(worker, "request_interrupt"):
165				worker.request_interrupt()
166			else:
167				worker.abort()
168
169		# wait for all workers that we just asked to quit to finish
170		self.log.info("Waiting for all workers to finish...")
171		for queue_id, worker in self.iterate_active_workers():
172			if worker.type == "api":
173				continue
174
175			self.log.info(f"Waiting for worker of type {worker.type}...")
176			worker.join()
177
178		# shut down any remaining workers (i.e. the API)
179		for queue_id, worker in self.iterate_active_workers():
180			worker.request_interrupt()
181			worker.join()
182
183		# abort
184		time.sleep(1)
185		self.log.info("Bye!")

Main loop

Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.

def validate_datasources(self):
187	def validate_datasources(self):
188		"""
189		Validate data sources
190
191		Logs warnings if not all information is present for the configured data
192		sources.
193		"""
194		for datasource in self.modules.datasources:
195			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
196				self.log.error(f"No search worker defined for data source {datasource} or its modules are missing. "
197				               f"Datasets cannot be created for it.")
198
199			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)

Validate data sources

Logs warnings if not all information is present for the configured data sources.

def abort(self, signal=None, stack=None):
201	def abort(self, signal=None, stack=None):
202		"""
203		Stop looping the delegator, clean up, and prepare for shutdown
204		"""
205		self.log.info("Received SIGTERM")
206
207		# cancel all interruptible postgres queries
208		# this needs to be done before we stop looping since after that no new
209		# jobs will be claimed, and needs to be done here because the worker's
210		# own database connection is busy executing the query that it should
211		# cancel! so we can't use it to update the job and make it get claimed
212		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
213			# this will make all these jobs immediately claimable, i.e. queries
214			# will get cancelled asap
215			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
216			job.claim()
217			job.release(delay=0, claim_after=0)
218
219		# wait until all cancel jobs are completed
220		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
221			time.sleep(0.25)
222
223		# now stop looping (i.e. accepting new jobs)
224		self.looping = False

Stop looping the delegator, clean up, and prepare for shutdown

def request_interrupt(self, interrupt_level, job):
226	def request_interrupt(self, interrupt_level, job):
227		"""
228		Interrupt a specific job
229
230		This method can be called via e.g. the API, to interrupt a specific
231		job's worker. The worker for the given Job object is searched for and
232		if it exists, its `request_interrupt()` method is called.
233
234		:param int interrupt_level:  Retry later or cancel?
235		:param Job job:  Job object to cancel worker for
236		"""
237		for queue_id, worker in self.iterate_active_workers():
238			if worker.job.data["id"] == job.data["id"]:
239				# first cancel any interruptable postgres queries for this job's worker
240				while True:
241					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
242					if not active_queries:
243						# all cancellation jobs have been run
244						break
245
246					for cancel_job in active_queries:
247						if cancel_job.is_claimed:
248							continue
249
250						# this will cause the job be run asap
251						cancel_job.claim()
252						cancel_job.release(delay=0, claim_after=0)
253
254					# give the cancel job a moment to run
255					time.sleep(0.25)
256
257				# now all queries are interrupted, formally request an abort
258				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
259				worker.request_interrupt(interrupt_level)
260				return

Interrupt a specific job

This method can be called via e.g. the API, to interrupt a specific job's worker. The worker for the given Job object is searched for and if it exists, its request_interrupt() method is called.

Parameters
  • int interrupt_level: Retry later or cancel?
  • Job job: Job object to cancel worker for
def iterate_active_workers(self) -> Generator[tuple[str, backend.lib.worker.BasicWorker]]:
262	def iterate_active_workers(self) -> Generator[tuple[str, BasicWorker]]:
263		"""
264		Return all active workers
265
266		Convenience function to avoid having to always use two nested for
267		loops.
268
269		:return:  Generator that yields tuples of (queue_id, worker)
270		"""
271		for queue_id in self.worker_pool:
272			for worker in self.worker_pool[queue_id]:
273				yield queue_id, worker

Return all active workers

Convenience function to avoid having to always use two nested for loops.

Returns

Generator that yields tuples of (queue_id, worker)