backend.lib.manager
The heart of the app - manages jobs and workers
1""" 2The heart of the app - manages jobs and workers 3""" 4import threading 5import signal 6import time 7 8from collections.abc import Generator 9 10from backend.lib.proxied_requests import DelegatedRequestHandler 11from backend.lib.worker import BasicWorker 12from common.lib.exceptions import JobClaimedException 13 14# for now, this is hardcoded - could be dynamic or depending on the queue ID in 15# the future 16MAX_JOBS_PER_QUEUE = 1 17 18class WorkerManager: 19 """ 20 Manages the job queue and worker pool 21 """ 22 queue = None 23 db = None 24 log = None 25 modules = None 26 proxy_delegator = None 27 28 worker_pool = {} 29 job_mapping = {} 30 pool = [] 31 looping = True 32 unknown_jobs = set() 33 34 def __init__(self, queue, database, logger, modules, as_daemon=True): 35 """ 36 Initialize manager 37 38 :param queue: Job queue 39 :param database: Database handler 40 :param logger: Logger object 41 :param modules: Modules cache via ModuleLoader() 42 :param bool as_daemon: Whether the manager is being run as a daemon 43 """ 44 self.queue = queue 45 self.db = database 46 self.log = logger 47 self.modules = modules 48 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 49 50 if as_daemon: 51 signal.signal(signal.SIGTERM, self.abort) 52 53 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 54 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 55 self.validate_datasources() 56 57 # queue jobs for workers that always need one 58 for worker_name, worker in self.modules.workers.items(): 59 if hasattr(worker, "ensure_job"): 60 # ensure_job is a class method that returns a dict with job parameters if job should be added 61 # pass config for some workers (e.g., web studies extensions) 62 try: 63 self.log.debug(f"Ensuring job exists for worker {worker_name}") 64 job_params = worker.ensure_job(config=self.modules.config) 65 except Exception as e: 66 self.log.error(f"Error while ensuring job for worker {worker_name}: {e}") 67 job_params = None 68 69 if job_params: 70 self.queue.add_job(worker_or_type=worker, **job_params) 71 72 self.ident = threading.get_ident() 73 self.log.info("4CAT Started") 74 75 # flush module collector log buffer 76 # the logger is not available when this initialises 77 # but it is now! 78 if self.modules.log_buffer: 79 self.log.warning(self.modules.log_buffer) 80 self.modules.log_buffer = "" 81 82 # it's time 83 self.loop() 84 85 def delegate(self): 86 """ 87 Delegate work 88 89 Checks for open jobs, and then passes those to dedicated workers, if 90 slots are available for those workers. 91 """ 92 jobs = self.queue.get_all_jobs() 93 94 num_active = len(list(self.iterate_active_workers())) 95 self.log.debug2(f"Running {num_active} active workers") 96 97 # clean up workers that have finished processing 98 # not using iterate_active_workers() here because we're going to change 99 # the dictionary while iterating through it 100 for queue_id in self.worker_pool: 101 all_workers = self.worker_pool[queue_id] 102 for worker in all_workers: 103 if not worker.is_alive(): 104 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 105 worker.join() 106 self.worker_pool[queue_id].remove(worker) 107 108 del all_workers 109 110 # check if workers are available for unclaimed jobs 111 for job in jobs: 112 queue_id = job.data["queue_id"] 113 jobtype = job.data["jobtype"] 114 115 if jobtype in self.modules.workers: 116 worker_class = self.modules.workers[jobtype] 117 if queue_id not in self.worker_pool: 118 self.worker_pool[queue_id] = [] 119 120 # if a job is of a known type, and that job type has open 121 # worker slots, start a new worker to run it 122 if len(self.worker_pool[queue_id]) < MAX_JOBS_PER_QUEUE: 123 try: 124 job.claim() 125 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 126 worker.start() 127 log_level = self.log.levels["DEBUG"] if job.data["interval"] else self.log.levels["INFO"] 128 self.log.log(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}", log_level) 129 self.worker_pool[queue_id].append(worker) 130 except JobClaimedException: 131 # it's fine 132 pass 133 else: 134 if jobtype not in self.unknown_jobs: 135 self.log.error(f"Unknown job type: {jobtype}") 136 self.unknown_jobs.add(jobtype) 137 138 time.sleep(1) 139 140 def loop(self): 141 """ 142 Main loop 143 144 Constantly delegates work, until no longer looping, after which all 145 workers are asked to stop their work. Once that has happened, the loop 146 properly ends. 147 """ 148 while self.looping: 149 try: 150 self.delegate() 151 except KeyboardInterrupt: 152 self.looping = False 153 154 self.log.info("Telling all workers to stop doing whatever they're doing...") 155 156 # request shutdown from all workers except the API 157 # this allows us to use the API to figure out if a certain worker is 158 # hanging during shutdown, for example 159 for queue_id, worker in self.iterate_active_workers(): 160 if worker.type == "api": 161 continue 162 163 if hasattr(worker, "request_interrupt"): 164 worker.request_interrupt() 165 else: 166 worker.abort() 167 168 # wait for all workers that we just asked to quit to finish 169 self.log.info("Waiting for all workers to finish...") 170 for queue_id, worker in self.iterate_active_workers(): 171 if worker.type == "api": 172 continue 173 174 self.log.info(f"Waiting for worker of type {worker.type}...") 175 worker.join() 176 177 # shut down any remaining workers (i.e. the API) 178 for queue_id, worker in self.iterate_active_workers(): 179 worker.request_interrupt() 180 worker.join() 181 182 # abort 183 time.sleep(1) 184 self.log.info("Bye!") 185 186 def validate_datasources(self): 187 """ 188 Validate data sources 189 190 Logs warnings if not all information is present for the configured data 191 sources. 192 """ 193 for datasource in self.modules.datasources: 194 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 195 self.log.error(f"No search worker defined for data source {datasource} or its modules are missing. " 196 f"Datasets cannot be created for it.") 197 198 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config) 199 200 def abort(self, signal=None, stack=None): 201 """ 202 Stop looping the delegator, clean up, and prepare for shutdown 203 """ 204 self.log.info("Received SIGTERM") 205 206 # cancel all interruptible postgres queries 207 # this needs to be done before we stop looping since after that no new 208 # jobs will be claimed, and needs to be done here because the worker's 209 # own database connection is busy executing the query that it should 210 # cancel! so we can't use it to update the job and make it get claimed 211 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 212 # this will make all these jobs immediately claimable, i.e. queries 213 # will get cancelled asap 214 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 215 job.claim() 216 job.release(delay=0, claim_after=0) 217 218 # wait until all cancel jobs are completed 219 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 220 time.sleep(0.25) 221 222 # now stop looping (i.e. accepting new jobs) 223 self.looping = False 224 225 def request_interrupt(self, interrupt_level, job): 226 """ 227 Interrupt a specific job 228 229 This method can be called via e.g. the API, to interrupt a specific 230 job's worker. The worker for the given Job object is searched for and 231 if it exists, its `request_interrupt()` method is called. 232 233 :param int interrupt_level: Retry later or cancel? 234 :param Job job: Job object to cancel worker for 235 """ 236 for queue_id, worker in self.iterate_active_workers(): 237 if worker.job.data["id"] == job.data["id"]: 238 # first cancel any interruptable postgres queries for this job's worker 239 while True: 240 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 241 if not active_queries: 242 # all cancellation jobs have been run 243 break 244 245 for cancel_job in active_queries: 246 if cancel_job.is_claimed: 247 continue 248 249 # this will cause the job be run asap 250 cancel_job.claim() 251 cancel_job.release(delay=0, claim_after=0) 252 253 # give the cancel job a moment to run 254 time.sleep(0.25) 255 256 # now all queries are interrupted, formally request an abort 257 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 258 worker.request_interrupt(interrupt_level) 259 return 260 261 def iterate_active_workers(self) -> Generator[tuple[str, BasicWorker]]: 262 """ 263 Return all active workers 264 265 Convenience function to avoid having to always use two nested for 266 loops. 267 268 :return: Generator that yields tuples of (queue_id, worker) 269 """ 270 for queue_id in self.worker_pool: 271 for worker in self.worker_pool[queue_id]: 272 yield queue_id, worker
19class WorkerManager: 20 """ 21 Manages the job queue and worker pool 22 """ 23 queue = None 24 db = None 25 log = None 26 modules = None 27 proxy_delegator = None 28 29 worker_pool = {} 30 job_mapping = {} 31 pool = [] 32 looping = True 33 unknown_jobs = set() 34 35 def __init__(self, queue, database, logger, modules, as_daemon=True): 36 """ 37 Initialize manager 38 39 :param queue: Job queue 40 :param database: Database handler 41 :param logger: Logger object 42 :param modules: Modules cache via ModuleLoader() 43 :param bool as_daemon: Whether the manager is being run as a daemon 44 """ 45 self.queue = queue 46 self.db = database 47 self.log = logger 48 self.modules = modules 49 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 50 51 if as_daemon: 52 signal.signal(signal.SIGTERM, self.abort) 53 54 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 55 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 56 self.validate_datasources() 57 58 # queue jobs for workers that always need one 59 for worker_name, worker in self.modules.workers.items(): 60 if hasattr(worker, "ensure_job"): 61 # ensure_job is a class method that returns a dict with job parameters if job should be added 62 # pass config for some workers (e.g., web studies extensions) 63 try: 64 self.log.debug(f"Ensuring job exists for worker {worker_name}") 65 job_params = worker.ensure_job(config=self.modules.config) 66 except Exception as e: 67 self.log.error(f"Error while ensuring job for worker {worker_name}: {e}") 68 job_params = None 69 70 if job_params: 71 self.queue.add_job(worker_or_type=worker, **job_params) 72 73 self.ident = threading.get_ident() 74 self.log.info("4CAT Started") 75 76 # flush module collector log buffer 77 # the logger is not available when this initialises 78 # but it is now! 79 if self.modules.log_buffer: 80 self.log.warning(self.modules.log_buffer) 81 self.modules.log_buffer = "" 82 83 # it's time 84 self.loop() 85 86 def delegate(self): 87 """ 88 Delegate work 89 90 Checks for open jobs, and then passes those to dedicated workers, if 91 slots are available for those workers. 92 """ 93 jobs = self.queue.get_all_jobs() 94 95 num_active = len(list(self.iterate_active_workers())) 96 self.log.debug2(f"Running {num_active} active workers") 97 98 # clean up workers that have finished processing 99 # not using iterate_active_workers() here because we're going to change 100 # the dictionary while iterating through it 101 for queue_id in self.worker_pool: 102 all_workers = self.worker_pool[queue_id] 103 for worker in all_workers: 104 if not worker.is_alive(): 105 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 106 worker.join() 107 self.worker_pool[queue_id].remove(worker) 108 109 del all_workers 110 111 # check if workers are available for unclaimed jobs 112 for job in jobs: 113 queue_id = job.data["queue_id"] 114 jobtype = job.data["jobtype"] 115 116 if jobtype in self.modules.workers: 117 worker_class = self.modules.workers[jobtype] 118 if queue_id not in self.worker_pool: 119 self.worker_pool[queue_id] = [] 120 121 # if a job is of a known type, and that job type has open 122 # worker slots, start a new worker to run it 123 if len(self.worker_pool[queue_id]) < MAX_JOBS_PER_QUEUE: 124 try: 125 job.claim() 126 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 127 worker.start() 128 log_level = self.log.levels["DEBUG"] if job.data["interval"] else self.log.levels["INFO"] 129 self.log.log(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}", log_level) 130 self.worker_pool[queue_id].append(worker) 131 except JobClaimedException: 132 # it's fine 133 pass 134 else: 135 if jobtype not in self.unknown_jobs: 136 self.log.error(f"Unknown job type: {jobtype}") 137 self.unknown_jobs.add(jobtype) 138 139 time.sleep(1) 140 141 def loop(self): 142 """ 143 Main loop 144 145 Constantly delegates work, until no longer looping, after which all 146 workers are asked to stop their work. Once that has happened, the loop 147 properly ends. 148 """ 149 while self.looping: 150 try: 151 self.delegate() 152 except KeyboardInterrupt: 153 self.looping = False 154 155 self.log.info("Telling all workers to stop doing whatever they're doing...") 156 157 # request shutdown from all workers except the API 158 # this allows us to use the API to figure out if a certain worker is 159 # hanging during shutdown, for example 160 for queue_id, worker in self.iterate_active_workers(): 161 if worker.type == "api": 162 continue 163 164 if hasattr(worker, "request_interrupt"): 165 worker.request_interrupt() 166 else: 167 worker.abort() 168 169 # wait for all workers that we just asked to quit to finish 170 self.log.info("Waiting for all workers to finish...") 171 for queue_id, worker in self.iterate_active_workers(): 172 if worker.type == "api": 173 continue 174 175 self.log.info(f"Waiting for worker of type {worker.type}...") 176 worker.join() 177 178 # shut down any remaining workers (i.e. the API) 179 for queue_id, worker in self.iterate_active_workers(): 180 worker.request_interrupt() 181 worker.join() 182 183 # abort 184 time.sleep(1) 185 self.log.info("Bye!") 186 187 def validate_datasources(self): 188 """ 189 Validate data sources 190 191 Logs warnings if not all information is present for the configured data 192 sources. 193 """ 194 for datasource in self.modules.datasources: 195 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 196 self.log.error(f"No search worker defined for data source {datasource} or its modules are missing. " 197 f"Datasets cannot be created for it.") 198 199 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config) 200 201 def abort(self, signal=None, stack=None): 202 """ 203 Stop looping the delegator, clean up, and prepare for shutdown 204 """ 205 self.log.info("Received SIGTERM") 206 207 # cancel all interruptible postgres queries 208 # this needs to be done before we stop looping since after that no new 209 # jobs will be claimed, and needs to be done here because the worker's 210 # own database connection is busy executing the query that it should 211 # cancel! so we can't use it to update the job and make it get claimed 212 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 213 # this will make all these jobs immediately claimable, i.e. queries 214 # will get cancelled asap 215 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 216 job.claim() 217 job.release(delay=0, claim_after=0) 218 219 # wait until all cancel jobs are completed 220 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 221 time.sleep(0.25) 222 223 # now stop looping (i.e. accepting new jobs) 224 self.looping = False 225 226 def request_interrupt(self, interrupt_level, job): 227 """ 228 Interrupt a specific job 229 230 This method can be called via e.g. the API, to interrupt a specific 231 job's worker. The worker for the given Job object is searched for and 232 if it exists, its `request_interrupt()` method is called. 233 234 :param int interrupt_level: Retry later or cancel? 235 :param Job job: Job object to cancel worker for 236 """ 237 for queue_id, worker in self.iterate_active_workers(): 238 if worker.job.data["id"] == job.data["id"]: 239 # first cancel any interruptable postgres queries for this job's worker 240 while True: 241 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 242 if not active_queries: 243 # all cancellation jobs have been run 244 break 245 246 for cancel_job in active_queries: 247 if cancel_job.is_claimed: 248 continue 249 250 # this will cause the job be run asap 251 cancel_job.claim() 252 cancel_job.release(delay=0, claim_after=0) 253 254 # give the cancel job a moment to run 255 time.sleep(0.25) 256 257 # now all queries are interrupted, formally request an abort 258 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 259 worker.request_interrupt(interrupt_level) 260 return 261 262 def iterate_active_workers(self) -> Generator[tuple[str, BasicWorker]]: 263 """ 264 Return all active workers 265 266 Convenience function to avoid having to always use two nested for 267 loops. 268 269 :return: Generator that yields tuples of (queue_id, worker) 270 """ 271 for queue_id in self.worker_pool: 272 for worker in self.worker_pool[queue_id]: 273 yield queue_id, worker
Manages the job queue and worker pool
35 def __init__(self, queue, database, logger, modules, as_daemon=True): 36 """ 37 Initialize manager 38 39 :param queue: Job queue 40 :param database: Database handler 41 :param logger: Logger object 42 :param modules: Modules cache via ModuleLoader() 43 :param bool as_daemon: Whether the manager is being run as a daemon 44 """ 45 self.queue = queue 46 self.db = database 47 self.log = logger 48 self.modules = modules 49 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 50 51 if as_daemon: 52 signal.signal(signal.SIGTERM, self.abort) 53 54 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 55 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 56 self.validate_datasources() 57 58 # queue jobs for workers that always need one 59 for worker_name, worker in self.modules.workers.items(): 60 if hasattr(worker, "ensure_job"): 61 # ensure_job is a class method that returns a dict with job parameters if job should be added 62 # pass config for some workers (e.g., web studies extensions) 63 try: 64 self.log.debug(f"Ensuring job exists for worker {worker_name}") 65 job_params = worker.ensure_job(config=self.modules.config) 66 except Exception as e: 67 self.log.error(f"Error while ensuring job for worker {worker_name}: {e}") 68 job_params = None 69 70 if job_params: 71 self.queue.add_job(worker_or_type=worker, **job_params) 72 73 self.ident = threading.get_ident() 74 self.log.info("4CAT Started") 75 76 # flush module collector log buffer 77 # the logger is not available when this initialises 78 # but it is now! 79 if self.modules.log_buffer: 80 self.log.warning(self.modules.log_buffer) 81 self.modules.log_buffer = "" 82 83 # it's time 84 self.loop()
Initialize manager
Parameters
- queue: Job queue
- database: Database handler
- logger: Logger object
- modules: Modules cache via ModuleLoader()
- bool as_daemon: Whether the manager is being run as a daemon
86 def delegate(self): 87 """ 88 Delegate work 89 90 Checks for open jobs, and then passes those to dedicated workers, if 91 slots are available for those workers. 92 """ 93 jobs = self.queue.get_all_jobs() 94 95 num_active = len(list(self.iterate_active_workers())) 96 self.log.debug2(f"Running {num_active} active workers") 97 98 # clean up workers that have finished processing 99 # not using iterate_active_workers() here because we're going to change 100 # the dictionary while iterating through it 101 for queue_id in self.worker_pool: 102 all_workers = self.worker_pool[queue_id] 103 for worker in all_workers: 104 if not worker.is_alive(): 105 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 106 worker.join() 107 self.worker_pool[queue_id].remove(worker) 108 109 del all_workers 110 111 # check if workers are available for unclaimed jobs 112 for job in jobs: 113 queue_id = job.data["queue_id"] 114 jobtype = job.data["jobtype"] 115 116 if jobtype in self.modules.workers: 117 worker_class = self.modules.workers[jobtype] 118 if queue_id not in self.worker_pool: 119 self.worker_pool[queue_id] = [] 120 121 # if a job is of a known type, and that job type has open 122 # worker slots, start a new worker to run it 123 if len(self.worker_pool[queue_id]) < MAX_JOBS_PER_QUEUE: 124 try: 125 job.claim() 126 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 127 worker.start() 128 log_level = self.log.levels["DEBUG"] if job.data["interval"] else self.log.levels["INFO"] 129 self.log.log(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}", log_level) 130 self.worker_pool[queue_id].append(worker) 131 except JobClaimedException: 132 # it's fine 133 pass 134 else: 135 if jobtype not in self.unknown_jobs: 136 self.log.error(f"Unknown job type: {jobtype}") 137 self.unknown_jobs.add(jobtype) 138 139 time.sleep(1)
Delegate work
Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.
141 def loop(self): 142 """ 143 Main loop 144 145 Constantly delegates work, until no longer looping, after which all 146 workers are asked to stop their work. Once that has happened, the loop 147 properly ends. 148 """ 149 while self.looping: 150 try: 151 self.delegate() 152 except KeyboardInterrupt: 153 self.looping = False 154 155 self.log.info("Telling all workers to stop doing whatever they're doing...") 156 157 # request shutdown from all workers except the API 158 # this allows us to use the API to figure out if a certain worker is 159 # hanging during shutdown, for example 160 for queue_id, worker in self.iterate_active_workers(): 161 if worker.type == "api": 162 continue 163 164 if hasattr(worker, "request_interrupt"): 165 worker.request_interrupt() 166 else: 167 worker.abort() 168 169 # wait for all workers that we just asked to quit to finish 170 self.log.info("Waiting for all workers to finish...") 171 for queue_id, worker in self.iterate_active_workers(): 172 if worker.type == "api": 173 continue 174 175 self.log.info(f"Waiting for worker of type {worker.type}...") 176 worker.join() 177 178 # shut down any remaining workers (i.e. the API) 179 for queue_id, worker in self.iterate_active_workers(): 180 worker.request_interrupt() 181 worker.join() 182 183 # abort 184 time.sleep(1) 185 self.log.info("Bye!")
Main loop
Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.
187 def validate_datasources(self): 188 """ 189 Validate data sources 190 191 Logs warnings if not all information is present for the configured data 192 sources. 193 """ 194 for datasource in self.modules.datasources: 195 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 196 self.log.error(f"No search worker defined for data source {datasource} or its modules are missing. " 197 f"Datasets cannot be created for it.") 198 199 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
Validate data sources
Logs warnings if not all information is present for the configured data sources.
201 def abort(self, signal=None, stack=None): 202 """ 203 Stop looping the delegator, clean up, and prepare for shutdown 204 """ 205 self.log.info("Received SIGTERM") 206 207 # cancel all interruptible postgres queries 208 # this needs to be done before we stop looping since after that no new 209 # jobs will be claimed, and needs to be done here because the worker's 210 # own database connection is busy executing the query that it should 211 # cancel! so we can't use it to update the job and make it get claimed 212 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 213 # this will make all these jobs immediately claimable, i.e. queries 214 # will get cancelled asap 215 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 216 job.claim() 217 job.release(delay=0, claim_after=0) 218 219 # wait until all cancel jobs are completed 220 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 221 time.sleep(0.25) 222 223 # now stop looping (i.e. accepting new jobs) 224 self.looping = False
Stop looping the delegator, clean up, and prepare for shutdown
226 def request_interrupt(self, interrupt_level, job): 227 """ 228 Interrupt a specific job 229 230 This method can be called via e.g. the API, to interrupt a specific 231 job's worker. The worker for the given Job object is searched for and 232 if it exists, its `request_interrupt()` method is called. 233 234 :param int interrupt_level: Retry later or cancel? 235 :param Job job: Job object to cancel worker for 236 """ 237 for queue_id, worker in self.iterate_active_workers(): 238 if worker.job.data["id"] == job.data["id"]: 239 # first cancel any interruptable postgres queries for this job's worker 240 while True: 241 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 242 if not active_queries: 243 # all cancellation jobs have been run 244 break 245 246 for cancel_job in active_queries: 247 if cancel_job.is_claimed: 248 continue 249 250 # this will cause the job be run asap 251 cancel_job.claim() 252 cancel_job.release(delay=0, claim_after=0) 253 254 # give the cancel job a moment to run 255 time.sleep(0.25) 256 257 # now all queries are interrupted, formally request an abort 258 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 259 worker.request_interrupt(interrupt_level) 260 return
Interrupt a specific job
This method can be called via e.g. the API, to interrupt a specific
job's worker. The worker for the given Job object is searched for and
if it exists, its request_interrupt() method is called.
Parameters
- int interrupt_level: Retry later or cancel?
- Job job: Job object to cancel worker for
262 def iterate_active_workers(self) -> Generator[tuple[str, BasicWorker]]: 263 """ 264 Return all active workers 265 266 Convenience function to avoid having to always use two nested for 267 loops. 268 269 :return: Generator that yields tuples of (queue_id, worker) 270 """ 271 for queue_id in self.worker_pool: 272 for worker in self.worker_pool[queue_id]: 273 yield queue_id, worker
Return all active workers
Convenience function to avoid having to always use two nested for loops.
Returns
Generator that yields tuples of (queue_id, worker)