Edit on GitHub

backend.lib.manager

The heart of the app - manages jobs and workers

  1"""
  2The heart of the app - manages jobs and workers
  3"""
  4import signal
  5import time
  6
  7from backend.lib.proxied_requests import DelegatedRequestHandler
  8from common.lib.exceptions import JobClaimedException
  9
 10
 11class WorkerManager:
 12	"""
 13	Manages the job queue and worker pool
 14	"""
 15	queue = None
 16	db = None
 17	log = None
 18	modules = None
 19	proxy_delegator = None
 20
 21	worker_pool = {}
 22	job_mapping = {}
 23	pool = []
 24	looping = True
 25	unknown_jobs = set()
 26
 27	def __init__(self, queue, database, logger, modules, as_daemon=True):
 28		"""
 29		Initialize manager
 30
 31		:param queue:  Job queue
 32		:param database:  Database handler
 33		:param logger:  Logger object
 34		:param modules:  Modules cache via ModuleLoader()
 35		:param bool as_daemon:  Whether the manager is being run as a daemon
 36		"""
 37		self.queue = queue
 38		self.db = database
 39		self.log = logger
 40		self.modules = modules
 41		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
 42
 43		if as_daemon:
 44			signal.signal(signal.SIGTERM, self.abort)
 45			signal.signal(signal.SIGINT, self.request_interrupt)
 46
 47		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 48		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 49		self.validate_datasources()
 50
 51		# queue jobs for workers that always need one
 52		for worker_name, worker in self.modules.workers.items():
 53			if hasattr(worker, "ensure_job"):
 54				# ensure_job is a class method that returns a dict with job parameters if job should be added
 55				# pass config for some workers (e.g., web studies extensions)
 56				try:
 57					job_params = worker.ensure_job(config=self.modules.config)
 58				except Exception as e:
 59					self.log.error(f"Error while ensuring job for worker {worker_name}: {e}")
 60					job_params = None
 61				
 62				if job_params:
 63					self.queue.add_job(jobtype=worker_name, **job_params)
 64
 65
 66		self.log.info("4CAT Started")
 67
 68		# flush module collector log buffer
 69		# the logger is not available when this initialises
 70		# but it is now!
 71		if self.modules.log_buffer:
 72			self.log.warning(self.modules.log_buffer)
 73			self.modules.log_buffer = ""
 74
 75		# it's time
 76		self.loop()
 77
 78	def delegate(self):
 79		"""
 80		Delegate work
 81
 82		Checks for open jobs, and then passes those to dedicated workers, if
 83		slots are available for those workers.
 84		"""
 85		jobs = self.queue.get_all_jobs()
 86
 87		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 88		self.log.debug("Running workers: %i" % num_active)
 89
 90		# clean up workers that have finished processing
 91		for jobtype in self.worker_pool:
 92			all_workers = self.worker_pool[jobtype]
 93			for worker in all_workers:
 94				if not worker.is_alive():
 95					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 96					worker.join()
 97					self.worker_pool[jobtype].remove(worker)
 98
 99			del all_workers
100
101		# check if workers are available for unclaimed jobs
102		for job in jobs:
103			jobtype = job.data["jobtype"]
104
105			if jobtype in self.modules.workers:
106				worker_class = self.modules.workers[jobtype]
107				if jobtype not in self.worker_pool:
108					self.worker_pool[jobtype] = []
109
110				# if a job is of a known type, and that job type has open
111				# worker slots, start a new worker to run it
112				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
113					try:
114						job.claim()
115						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
116						worker.start()
117						self.log.info(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}")
118						self.worker_pool[jobtype].append(worker)
119					except JobClaimedException:
120						# it's fine
121						pass
122			else:
123				if jobtype not in self.unknown_jobs:
124					self.log.error("Unknown job type: %s" % jobtype)
125					self.unknown_jobs.add(jobtype)
126
127		time.sleep(1)
128
129	def loop(self):
130		"""
131		Main loop
132
133		Constantly delegates work, until no longer looping, after which all
134		workers are asked to stop their work. Once that has happened, the loop
135		properly ends.
136		"""
137		while self.looping:
138			try:
139				self.delegate()
140			except KeyboardInterrupt:
141				self.looping = False
142
143		self.log.info("Telling all workers to stop doing whatever they're doing...")
144
145		# request shutdown from all workers except the API
146		# this allows us to use the API to figure out if a certain worker is
147		# hanging during shutdown, for example
148		for jobtype in self.worker_pool:
149			if jobtype == "api":
150				continue
151
152			for worker in self.worker_pool[jobtype]:
153				if hasattr(worker, "request_interrupt"):
154					worker.request_interrupt()
155				else:
156					worker.abort()
157
158		# wait for all workers that we just asked to quit to finish
159		self.log.info("Waiting for all workers to finish...")
160		for jobtype in self.worker_pool:
161			if jobtype == "api":
162				continue
163			for worker in self.worker_pool[jobtype]:
164				self.log.info("Waiting for worker %s..." % jobtype)
165				worker.join()
166
167		# shut down API last
168		for worker in self.worker_pool.get("api", []):
169			worker.request_interrupt()
170			worker.join()
171
172		# abort
173		time.sleep(1)
174		self.log.info("Bye!")
175
176	def validate_datasources(self):
177		"""
178		Validate data sources
179
180		Logs warnings if not all information is precent for the configured data
181		sources.
182		"""
183
184		for datasource in self.modules.datasources:
185			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
186				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
187
188			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
189
190	def abort(self, signal=None, stack=None):
191		"""
192		Stop looping the delegator, clean up, and prepare for shutdown
193		"""
194		self.log.info("Received SIGTERM")
195
196		# cancel all interruptible postgres queries
197		# this needs to be done before we stop looping since after that no new
198		# jobs will be claimed, and needs to be done here because the worker's
199		# own database connection is busy executing the query that it should
200		# cancel! so we can't use it to update the job and make it get claimed
201		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
202			# this will make all these jobs immediately claimable, i.e. queries
203			# will get cancelled asap
204			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
205			job.claim()
206			job.release(delay=0, claim_after=0)
207
208		# wait until all cancel jobs are completed
209		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
210			time.sleep(0.25)
211
212		# now stop looping (i.e. accepting new jobs)
213		self.looping = False
214
215	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
216		"""
217		Interrupt a job
218
219		This method can be called via e.g. the API, to interrupt a specific
220		job's worker. The worker can be targeted either with a Job object or
221		with a combination of job type and remote ID, since those uniquely
222		identify a job.
223
224		:param int interrupt_level:  Retry later or cancel?
225		:param Job job:  Job object to cancel worker for
226		:param str remote_id:  Remote ID for worker job to cancel
227		:param str jobtype:  Job type for worker job to cancel
228		"""
229
230		# find worker for given job
231		if job:
232			jobtype = job.data["jobtype"]
233
234		if jobtype not in self.worker_pool:
235			# no jobs of this type currently known
236			return
237
238		for worker in self.worker_pool[jobtype]:
239			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
240				# first cancel any interruptable queries for this job's worker
241				while True:
242					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
243					if not active_queries:
244						# all cancellation jobs have been run
245						break
246
247					for cancel_job in active_queries:
248						if cancel_job.is_claimed:
249							continue
250
251						# this will make the job be run asap
252						cancel_job.claim()
253						cancel_job.release(delay=0, claim_after=0)
254
255					# give the cancel job a moment to run
256					time.sleep(0.25)
257
258				# now all queries are interrupted, formally request an abort
259				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
260				worker.request_interrupt(interrupt_level)
261				return
class WorkerManager:
 12class WorkerManager:
 13	"""
 14	Manages the job queue and worker pool
 15	"""
 16	queue = None
 17	db = None
 18	log = None
 19	modules = None
 20	proxy_delegator = None
 21
 22	worker_pool = {}
 23	job_mapping = {}
 24	pool = []
 25	looping = True
 26	unknown_jobs = set()
 27
 28	def __init__(self, queue, database, logger, modules, as_daemon=True):
 29		"""
 30		Initialize manager
 31
 32		:param queue:  Job queue
 33		:param database:  Database handler
 34		:param logger:  Logger object
 35		:param modules:  Modules cache via ModuleLoader()
 36		:param bool as_daemon:  Whether the manager is being run as a daemon
 37		"""
 38		self.queue = queue
 39		self.db = database
 40		self.log = logger
 41		self.modules = modules
 42		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
 43
 44		if as_daemon:
 45			signal.signal(signal.SIGTERM, self.abort)
 46			signal.signal(signal.SIGINT, self.request_interrupt)
 47
 48		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
 49		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
 50		self.validate_datasources()
 51
 52		# queue jobs for workers that always need one
 53		for worker_name, worker in self.modules.workers.items():
 54			if hasattr(worker, "ensure_job"):
 55				# ensure_job is a class method that returns a dict with job parameters if job should be added
 56				# pass config for some workers (e.g., web studies extensions)
 57				try:
 58					job_params = worker.ensure_job(config=self.modules.config)
 59				except Exception as e:
 60					self.log.error(f"Error while ensuring job for worker {worker_name}: {e}")
 61					job_params = None
 62				
 63				if job_params:
 64					self.queue.add_job(jobtype=worker_name, **job_params)
 65
 66
 67		self.log.info("4CAT Started")
 68
 69		# flush module collector log buffer
 70		# the logger is not available when this initialises
 71		# but it is now!
 72		if self.modules.log_buffer:
 73			self.log.warning(self.modules.log_buffer)
 74			self.modules.log_buffer = ""
 75
 76		# it's time
 77		self.loop()
 78
 79	def delegate(self):
 80		"""
 81		Delegate work
 82
 83		Checks for open jobs, and then passes those to dedicated workers, if
 84		slots are available for those workers.
 85		"""
 86		jobs = self.queue.get_all_jobs()
 87
 88		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 89		self.log.debug("Running workers: %i" % num_active)
 90
 91		# clean up workers that have finished processing
 92		for jobtype in self.worker_pool:
 93			all_workers = self.worker_pool[jobtype]
 94			for worker in all_workers:
 95				if not worker.is_alive():
 96					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 97					worker.join()
 98					self.worker_pool[jobtype].remove(worker)
 99
100			del all_workers
101
102		# check if workers are available for unclaimed jobs
103		for job in jobs:
104			jobtype = job.data["jobtype"]
105
106			if jobtype in self.modules.workers:
107				worker_class = self.modules.workers[jobtype]
108				if jobtype not in self.worker_pool:
109					self.worker_pool[jobtype] = []
110
111				# if a job is of a known type, and that job type has open
112				# worker slots, start a new worker to run it
113				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
114					try:
115						job.claim()
116						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
117						worker.start()
118						self.log.info(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}")
119						self.worker_pool[jobtype].append(worker)
120					except JobClaimedException:
121						# it's fine
122						pass
123			else:
124				if jobtype not in self.unknown_jobs:
125					self.log.error("Unknown job type: %s" % jobtype)
126					self.unknown_jobs.add(jobtype)
127
128		time.sleep(1)
129
130	def loop(self):
131		"""
132		Main loop
133
134		Constantly delegates work, until no longer looping, after which all
135		workers are asked to stop their work. Once that has happened, the loop
136		properly ends.
137		"""
138		while self.looping:
139			try:
140				self.delegate()
141			except KeyboardInterrupt:
142				self.looping = False
143
144		self.log.info("Telling all workers to stop doing whatever they're doing...")
145
146		# request shutdown from all workers except the API
147		# this allows us to use the API to figure out if a certain worker is
148		# hanging during shutdown, for example
149		for jobtype in self.worker_pool:
150			if jobtype == "api":
151				continue
152
153			for worker in self.worker_pool[jobtype]:
154				if hasattr(worker, "request_interrupt"):
155					worker.request_interrupt()
156				else:
157					worker.abort()
158
159		# wait for all workers that we just asked to quit to finish
160		self.log.info("Waiting for all workers to finish...")
161		for jobtype in self.worker_pool:
162			if jobtype == "api":
163				continue
164			for worker in self.worker_pool[jobtype]:
165				self.log.info("Waiting for worker %s..." % jobtype)
166				worker.join()
167
168		# shut down API last
169		for worker in self.worker_pool.get("api", []):
170			worker.request_interrupt()
171			worker.join()
172
173		# abort
174		time.sleep(1)
175		self.log.info("Bye!")
176
177	def validate_datasources(self):
178		"""
179		Validate data sources
180
181		Logs warnings if not all information is precent for the configured data
182		sources.
183		"""
184
185		for datasource in self.modules.datasources:
186			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
187				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
188
189			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
190
191	def abort(self, signal=None, stack=None):
192		"""
193		Stop looping the delegator, clean up, and prepare for shutdown
194		"""
195		self.log.info("Received SIGTERM")
196
197		# cancel all interruptible postgres queries
198		# this needs to be done before we stop looping since after that no new
199		# jobs will be claimed, and needs to be done here because the worker's
200		# own database connection is busy executing the query that it should
201		# cancel! so we can't use it to update the job and make it get claimed
202		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
203			# this will make all these jobs immediately claimable, i.e. queries
204			# will get cancelled asap
205			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
206			job.claim()
207			job.release(delay=0, claim_after=0)
208
209		# wait until all cancel jobs are completed
210		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
211			time.sleep(0.25)
212
213		# now stop looping (i.e. accepting new jobs)
214		self.looping = False
215
216	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
217		"""
218		Interrupt a job
219
220		This method can be called via e.g. the API, to interrupt a specific
221		job's worker. The worker can be targeted either with a Job object or
222		with a combination of job type and remote ID, since those uniquely
223		identify a job.
224
225		:param int interrupt_level:  Retry later or cancel?
226		:param Job job:  Job object to cancel worker for
227		:param str remote_id:  Remote ID for worker job to cancel
228		:param str jobtype:  Job type for worker job to cancel
229		"""
230
231		# find worker for given job
232		if job:
233			jobtype = job.data["jobtype"]
234
235		if jobtype not in self.worker_pool:
236			# no jobs of this type currently known
237			return
238
239		for worker in self.worker_pool[jobtype]:
240			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
241				# first cancel any interruptable queries for this job's worker
242				while True:
243					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
244					if not active_queries:
245						# all cancellation jobs have been run
246						break
247
248					for cancel_job in active_queries:
249						if cancel_job.is_claimed:
250							continue
251
252						# this will make the job be run asap
253						cancel_job.claim()
254						cancel_job.release(delay=0, claim_after=0)
255
256					# give the cancel job a moment to run
257					time.sleep(0.25)
258
259				# now all queries are interrupted, formally request an abort
260				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
261				worker.request_interrupt(interrupt_level)
262				return

Manages the job queue and worker pool

WorkerManager(queue, database, logger, modules, as_daemon=True)
28	def __init__(self, queue, database, logger, modules, as_daemon=True):
29		"""
30		Initialize manager
31
32		:param queue:  Job queue
33		:param database:  Database handler
34		:param logger:  Logger object
35		:param modules:  Modules cache via ModuleLoader()
36		:param bool as_daemon:  Whether the manager is being run as a daemon
37		"""
38		self.queue = queue
39		self.db = database
40		self.log = logger
41		self.modules = modules
42		self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config)
43
44		if as_daemon:
45			signal.signal(signal.SIGTERM, self.abort)
46			signal.signal(signal.SIGINT, self.request_interrupt)
47
48		# datasources are initialized here; the init_datasource functions found in their respective __init__.py files
49		# are called which, in the case of scrapers, also adds the scrape jobs to the queue.
50		self.validate_datasources()
51
52		# queue jobs for workers that always need one
53		for worker_name, worker in self.modules.workers.items():
54			if hasattr(worker, "ensure_job"):
55				# ensure_job is a class method that returns a dict with job parameters if job should be added
56				# pass config for some workers (e.g., web studies extensions)
57				try:
58					job_params = worker.ensure_job(config=self.modules.config)
59				except Exception as e:
60					self.log.error(f"Error while ensuring job for worker {worker_name}: {e}")
61					job_params = None
62				
63				if job_params:
64					self.queue.add_job(jobtype=worker_name, **job_params)
65
66
67		self.log.info("4CAT Started")
68
69		# flush module collector log buffer
70		# the logger is not available when this initialises
71		# but it is now!
72		if self.modules.log_buffer:
73			self.log.warning(self.modules.log_buffer)
74			self.modules.log_buffer = ""
75
76		# it's time
77		self.loop()

Initialize manager

Parameters
  • queue: Job queue
  • database: Database handler
  • logger: Logger object
  • modules: Modules cache via ModuleLoader()
  • bool as_daemon: Whether the manager is being run as a daemon
queue = None
db = None
log = None
modules = None
proxy_delegator = None
worker_pool = {}
job_mapping = {}
pool = []
looping = True
unknown_jobs = set()
def delegate(self):
 79	def delegate(self):
 80		"""
 81		Delegate work
 82
 83		Checks for open jobs, and then passes those to dedicated workers, if
 84		slots are available for those workers.
 85		"""
 86		jobs = self.queue.get_all_jobs()
 87
 88		num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool])
 89		self.log.debug("Running workers: %i" % num_active)
 90
 91		# clean up workers that have finished processing
 92		for jobtype in self.worker_pool:
 93			all_workers = self.worker_pool[jobtype]
 94			for worker in all_workers:
 95				if not worker.is_alive():
 96					self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}")
 97					worker.join()
 98					self.worker_pool[jobtype].remove(worker)
 99
100			del all_workers
101
102		# check if workers are available for unclaimed jobs
103		for job in jobs:
104			jobtype = job.data["jobtype"]
105
106			if jobtype in self.modules.workers:
107				worker_class = self.modules.workers[jobtype]
108				if jobtype not in self.worker_pool:
109					self.worker_pool[jobtype] = []
110
111				# if a job is of a known type, and that job type has open
112				# worker slots, start a new worker to run it
113				if len(self.worker_pool[jobtype]) < worker_class.max_workers:
114					try:
115						job.claim()
116						worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules)
117						worker.start()
118						self.log.info(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}")
119						self.worker_pool[jobtype].append(worker)
120					except JobClaimedException:
121						# it's fine
122						pass
123			else:
124				if jobtype not in self.unknown_jobs:
125					self.log.error("Unknown job type: %s" % jobtype)
126					self.unknown_jobs.add(jobtype)
127
128		time.sleep(1)

Delegate work

Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.

def loop(self):
130	def loop(self):
131		"""
132		Main loop
133
134		Constantly delegates work, until no longer looping, after which all
135		workers are asked to stop their work. Once that has happened, the loop
136		properly ends.
137		"""
138		while self.looping:
139			try:
140				self.delegate()
141			except KeyboardInterrupt:
142				self.looping = False
143
144		self.log.info("Telling all workers to stop doing whatever they're doing...")
145
146		# request shutdown from all workers except the API
147		# this allows us to use the API to figure out if a certain worker is
148		# hanging during shutdown, for example
149		for jobtype in self.worker_pool:
150			if jobtype == "api":
151				continue
152
153			for worker in self.worker_pool[jobtype]:
154				if hasattr(worker, "request_interrupt"):
155					worker.request_interrupt()
156				else:
157					worker.abort()
158
159		# wait for all workers that we just asked to quit to finish
160		self.log.info("Waiting for all workers to finish...")
161		for jobtype in self.worker_pool:
162			if jobtype == "api":
163				continue
164			for worker in self.worker_pool[jobtype]:
165				self.log.info("Waiting for worker %s..." % jobtype)
166				worker.join()
167
168		# shut down API last
169		for worker in self.worker_pool.get("api", []):
170			worker.request_interrupt()
171			worker.join()
172
173		# abort
174		time.sleep(1)
175		self.log.info("Bye!")

Main loop

Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.

def validate_datasources(self):
177	def validate_datasources(self):
178		"""
179		Validate data sources
180
181		Logs warnings if not all information is precent for the configured data
182		sources.
183		"""
184
185		for datasource in self.modules.datasources:
186			if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers:
187				self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource)
188
189			self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)

Validate data sources

Logs warnings if not all information is precent for the configured data sources.

def abort(self, signal=None, stack=None):
191	def abort(self, signal=None, stack=None):
192		"""
193		Stop looping the delegator, clean up, and prepare for shutdown
194		"""
195		self.log.info("Received SIGTERM")
196
197		# cancel all interruptible postgres queries
198		# this needs to be done before we stop looping since after that no new
199		# jobs will be claimed, and needs to be done here because the worker's
200		# own database connection is busy executing the query that it should
201		# cancel! so we can't use it to update the job and make it get claimed
202		for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
203			# this will make all these jobs immediately claimable, i.e. queries
204			# will get cancelled asap
205			self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"])
206			job.claim()
207			job.release(delay=0, claim_after=0)
208
209		# wait until all cancel jobs are completed
210		while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False):
211			time.sleep(0.25)
212
213		# now stop looping (i.e. accepting new jobs)
214		self.looping = False

Stop looping the delegator, clean up, and prepare for shutdown

def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
216	def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None):
217		"""
218		Interrupt a job
219
220		This method can be called via e.g. the API, to interrupt a specific
221		job's worker. The worker can be targeted either with a Job object or
222		with a combination of job type and remote ID, since those uniquely
223		identify a job.
224
225		:param int interrupt_level:  Retry later or cancel?
226		:param Job job:  Job object to cancel worker for
227		:param str remote_id:  Remote ID for worker job to cancel
228		:param str jobtype:  Job type for worker job to cancel
229		"""
230
231		# find worker for given job
232		if job:
233			jobtype = job.data["jobtype"]
234
235		if jobtype not in self.worker_pool:
236			# no jobs of this type currently known
237			return
238
239		for worker in self.worker_pool[jobtype]:
240			if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id):
241				# first cancel any interruptable queries for this job's worker
242				while True:
243					active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False)
244					if not active_queries:
245						# all cancellation jobs have been run
246						break
247
248					for cancel_job in active_queries:
249						if cancel_job.is_claimed:
250							continue
251
252						# this will make the job be run asap
253						cancel_job.claim()
254						cancel_job.release(delay=0, claim_after=0)
255
256					# give the cancel job a moment to run
257					time.sleep(0.25)
258
259				# now all queries are interrupted, formally request an abort
260				self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})")
261				worker.request_interrupt(interrupt_level)
262				return

Interrupt a job

This method can be called via e.g. the API, to interrupt a specific job's worker. The worker can be targeted either with a Job object or with a combination of job type and remote ID, since those uniquely identify a job.

Parameters
  • int interrupt_level: Retry later or cancel?
  • Job job: Job object to cancel worker for
  • str remote_id: Remote ID for worker job to cancel
  • str jobtype: Job type for worker job to cancel