backend.lib.manager
The heart of the app - manages jobs and workers
1""" 2The heart of the app - manages jobs and workers 3""" 4import signal 5import time 6 7from backend.lib.proxied_requests import DelegatedRequestHandler 8from common.lib.exceptions import JobClaimedException 9 10 11class WorkerManager: 12 """ 13 Manages the job queue and worker pool 14 """ 15 queue = None 16 db = None 17 log = None 18 modules = None 19 proxy_delegator = None 20 21 worker_pool = {} 22 job_mapping = {} 23 pool = [] 24 looping = True 25 unknown_jobs = set() 26 27 def __init__(self, queue, database, logger, modules, as_daemon=True): 28 """ 29 Initialize manager 30 31 :param queue: Job queue 32 :param database: Database handler 33 :param logger: Logger object 34 :param modules: Modules cache via ModuleLoader() 35 :param bool as_daemon: Whether the manager is being run as a daemon 36 """ 37 self.queue = queue 38 self.db = database 39 self.log = logger 40 self.modules = modules 41 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 42 43 if as_daemon: 44 signal.signal(signal.SIGTERM, self.abort) 45 signal.signal(signal.SIGINT, self.request_interrupt) 46 47 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 48 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 49 self.validate_datasources() 50 51 # queue jobs for workers that always need one 52 for worker_name, worker in self.modules.workers.items(): 53 if hasattr(worker, "ensure_job"): 54 # ensure_job is a class method that returns a dict with job parameters if job should be added 55 # pass config for some workers (e.g., web studies extensions) 56 try: 57 job_params = worker.ensure_job(config=self.modules.config) 58 except Exception as e: 59 self.log.error(f"Error while ensuring job for worker {worker_name}: {e}") 60 job_params = None 61 62 if job_params: 63 self.queue.add_job(jobtype=worker_name, **job_params) 64 65 66 self.log.info("4CAT Started") 67 68 # flush module collector log buffer 69 # the logger is not available when this initialises 70 # but it is now! 71 if self.modules.log_buffer: 72 self.log.warning(self.modules.log_buffer) 73 self.modules.log_buffer = "" 74 75 # it's time 76 self.loop() 77 78 def delegate(self): 79 """ 80 Delegate work 81 82 Checks for open jobs, and then passes those to dedicated workers, if 83 slots are available for those workers. 84 """ 85 jobs = self.queue.get_all_jobs() 86 87 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 88 self.log.debug("Running workers: %i" % num_active) 89 90 # clean up workers that have finished processing 91 for jobtype in self.worker_pool: 92 all_workers = self.worker_pool[jobtype] 93 for worker in all_workers: 94 if not worker.is_alive(): 95 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 96 worker.join() 97 self.worker_pool[jobtype].remove(worker) 98 99 del all_workers 100 101 # check if workers are available for unclaimed jobs 102 for job in jobs: 103 jobtype = job.data["jobtype"] 104 105 if jobtype in self.modules.workers: 106 worker_class = self.modules.workers[jobtype] 107 if jobtype not in self.worker_pool: 108 self.worker_pool[jobtype] = [] 109 110 # if a job is of a known type, and that job type has open 111 # worker slots, start a new worker to run it 112 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 113 try: 114 job.claim() 115 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 116 worker.start() 117 self.log.info(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}") 118 self.worker_pool[jobtype].append(worker) 119 except JobClaimedException: 120 # it's fine 121 pass 122 else: 123 if jobtype not in self.unknown_jobs: 124 self.log.error("Unknown job type: %s" % jobtype) 125 self.unknown_jobs.add(jobtype) 126 127 time.sleep(1) 128 129 def loop(self): 130 """ 131 Main loop 132 133 Constantly delegates work, until no longer looping, after which all 134 workers are asked to stop their work. Once that has happened, the loop 135 properly ends. 136 """ 137 while self.looping: 138 try: 139 self.delegate() 140 except KeyboardInterrupt: 141 self.looping = False 142 143 self.log.info("Telling all workers to stop doing whatever they're doing...") 144 145 # request shutdown from all workers except the API 146 # this allows us to use the API to figure out if a certain worker is 147 # hanging during shutdown, for example 148 for jobtype in self.worker_pool: 149 if jobtype == "api": 150 continue 151 152 for worker in self.worker_pool[jobtype]: 153 if hasattr(worker, "request_interrupt"): 154 worker.request_interrupt() 155 else: 156 worker.abort() 157 158 # wait for all workers that we just asked to quit to finish 159 self.log.info("Waiting for all workers to finish...") 160 for jobtype in self.worker_pool: 161 if jobtype == "api": 162 continue 163 for worker in self.worker_pool[jobtype]: 164 self.log.info("Waiting for worker %s..." % jobtype) 165 worker.join() 166 167 # shut down API last 168 for worker in self.worker_pool.get("api", []): 169 worker.request_interrupt() 170 worker.join() 171 172 # abort 173 time.sleep(1) 174 self.log.info("Bye!") 175 176 def validate_datasources(self): 177 """ 178 Validate data sources 179 180 Logs warnings if not all information is precent for the configured data 181 sources. 182 """ 183 184 for datasource in self.modules.datasources: 185 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 186 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 187 188 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config) 189 190 def abort(self, signal=None, stack=None): 191 """ 192 Stop looping the delegator, clean up, and prepare for shutdown 193 """ 194 self.log.info("Received SIGTERM") 195 196 # cancel all interruptible postgres queries 197 # this needs to be done before we stop looping since after that no new 198 # jobs will be claimed, and needs to be done here because the worker's 199 # own database connection is busy executing the query that it should 200 # cancel! so we can't use it to update the job and make it get claimed 201 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 202 # this will make all these jobs immediately claimable, i.e. queries 203 # will get cancelled asap 204 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 205 job.claim() 206 job.release(delay=0, claim_after=0) 207 208 # wait until all cancel jobs are completed 209 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 210 time.sleep(0.25) 211 212 # now stop looping (i.e. accepting new jobs) 213 self.looping = False 214 215 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 216 """ 217 Interrupt a job 218 219 This method can be called via e.g. the API, to interrupt a specific 220 job's worker. The worker can be targeted either with a Job object or 221 with a combination of job type and remote ID, since those uniquely 222 identify a job. 223 224 :param int interrupt_level: Retry later or cancel? 225 :param Job job: Job object to cancel worker for 226 :param str remote_id: Remote ID for worker job to cancel 227 :param str jobtype: Job type for worker job to cancel 228 """ 229 230 # find worker for given job 231 if job: 232 jobtype = job.data["jobtype"] 233 234 if jobtype not in self.worker_pool: 235 # no jobs of this type currently known 236 return 237 238 for worker in self.worker_pool[jobtype]: 239 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 240 # first cancel any interruptable queries for this job's worker 241 while True: 242 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 243 if not active_queries: 244 # all cancellation jobs have been run 245 break 246 247 for cancel_job in active_queries: 248 if cancel_job.is_claimed: 249 continue 250 251 # this will make the job be run asap 252 cancel_job.claim() 253 cancel_job.release(delay=0, claim_after=0) 254 255 # give the cancel job a moment to run 256 time.sleep(0.25) 257 258 # now all queries are interrupted, formally request an abort 259 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 260 worker.request_interrupt(interrupt_level) 261 return
12class WorkerManager: 13 """ 14 Manages the job queue and worker pool 15 """ 16 queue = None 17 db = None 18 log = None 19 modules = None 20 proxy_delegator = None 21 22 worker_pool = {} 23 job_mapping = {} 24 pool = [] 25 looping = True 26 unknown_jobs = set() 27 28 def __init__(self, queue, database, logger, modules, as_daemon=True): 29 """ 30 Initialize manager 31 32 :param queue: Job queue 33 :param database: Database handler 34 :param logger: Logger object 35 :param modules: Modules cache via ModuleLoader() 36 :param bool as_daemon: Whether the manager is being run as a daemon 37 """ 38 self.queue = queue 39 self.db = database 40 self.log = logger 41 self.modules = modules 42 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 43 44 if as_daemon: 45 signal.signal(signal.SIGTERM, self.abort) 46 signal.signal(signal.SIGINT, self.request_interrupt) 47 48 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 49 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 50 self.validate_datasources() 51 52 # queue jobs for workers that always need one 53 for worker_name, worker in self.modules.workers.items(): 54 if hasattr(worker, "ensure_job"): 55 # ensure_job is a class method that returns a dict with job parameters if job should be added 56 # pass config for some workers (e.g., web studies extensions) 57 try: 58 job_params = worker.ensure_job(config=self.modules.config) 59 except Exception as e: 60 self.log.error(f"Error while ensuring job for worker {worker_name}: {e}") 61 job_params = None 62 63 if job_params: 64 self.queue.add_job(jobtype=worker_name, **job_params) 65 66 67 self.log.info("4CAT Started") 68 69 # flush module collector log buffer 70 # the logger is not available when this initialises 71 # but it is now! 72 if self.modules.log_buffer: 73 self.log.warning(self.modules.log_buffer) 74 self.modules.log_buffer = "" 75 76 # it's time 77 self.loop() 78 79 def delegate(self): 80 """ 81 Delegate work 82 83 Checks for open jobs, and then passes those to dedicated workers, if 84 slots are available for those workers. 85 """ 86 jobs = self.queue.get_all_jobs() 87 88 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 89 self.log.debug("Running workers: %i" % num_active) 90 91 # clean up workers that have finished processing 92 for jobtype in self.worker_pool: 93 all_workers = self.worker_pool[jobtype] 94 for worker in all_workers: 95 if not worker.is_alive(): 96 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 97 worker.join() 98 self.worker_pool[jobtype].remove(worker) 99 100 del all_workers 101 102 # check if workers are available for unclaimed jobs 103 for job in jobs: 104 jobtype = job.data["jobtype"] 105 106 if jobtype in self.modules.workers: 107 worker_class = self.modules.workers[jobtype] 108 if jobtype not in self.worker_pool: 109 self.worker_pool[jobtype] = [] 110 111 # if a job is of a known type, and that job type has open 112 # worker slots, start a new worker to run it 113 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 114 try: 115 job.claim() 116 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 117 worker.start() 118 self.log.info(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}") 119 self.worker_pool[jobtype].append(worker) 120 except JobClaimedException: 121 # it's fine 122 pass 123 else: 124 if jobtype not in self.unknown_jobs: 125 self.log.error("Unknown job type: %s" % jobtype) 126 self.unknown_jobs.add(jobtype) 127 128 time.sleep(1) 129 130 def loop(self): 131 """ 132 Main loop 133 134 Constantly delegates work, until no longer looping, after which all 135 workers are asked to stop their work. Once that has happened, the loop 136 properly ends. 137 """ 138 while self.looping: 139 try: 140 self.delegate() 141 except KeyboardInterrupt: 142 self.looping = False 143 144 self.log.info("Telling all workers to stop doing whatever they're doing...") 145 146 # request shutdown from all workers except the API 147 # this allows us to use the API to figure out if a certain worker is 148 # hanging during shutdown, for example 149 for jobtype in self.worker_pool: 150 if jobtype == "api": 151 continue 152 153 for worker in self.worker_pool[jobtype]: 154 if hasattr(worker, "request_interrupt"): 155 worker.request_interrupt() 156 else: 157 worker.abort() 158 159 # wait for all workers that we just asked to quit to finish 160 self.log.info("Waiting for all workers to finish...") 161 for jobtype in self.worker_pool: 162 if jobtype == "api": 163 continue 164 for worker in self.worker_pool[jobtype]: 165 self.log.info("Waiting for worker %s..." % jobtype) 166 worker.join() 167 168 # shut down API last 169 for worker in self.worker_pool.get("api", []): 170 worker.request_interrupt() 171 worker.join() 172 173 # abort 174 time.sleep(1) 175 self.log.info("Bye!") 176 177 def validate_datasources(self): 178 """ 179 Validate data sources 180 181 Logs warnings if not all information is precent for the configured data 182 sources. 183 """ 184 185 for datasource in self.modules.datasources: 186 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 187 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 188 189 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config) 190 191 def abort(self, signal=None, stack=None): 192 """ 193 Stop looping the delegator, clean up, and prepare for shutdown 194 """ 195 self.log.info("Received SIGTERM") 196 197 # cancel all interruptible postgres queries 198 # this needs to be done before we stop looping since after that no new 199 # jobs will be claimed, and needs to be done here because the worker's 200 # own database connection is busy executing the query that it should 201 # cancel! so we can't use it to update the job and make it get claimed 202 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 203 # this will make all these jobs immediately claimable, i.e. queries 204 # will get cancelled asap 205 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 206 job.claim() 207 job.release(delay=0, claim_after=0) 208 209 # wait until all cancel jobs are completed 210 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 211 time.sleep(0.25) 212 213 # now stop looping (i.e. accepting new jobs) 214 self.looping = False 215 216 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 217 """ 218 Interrupt a job 219 220 This method can be called via e.g. the API, to interrupt a specific 221 job's worker. The worker can be targeted either with a Job object or 222 with a combination of job type and remote ID, since those uniquely 223 identify a job. 224 225 :param int interrupt_level: Retry later or cancel? 226 :param Job job: Job object to cancel worker for 227 :param str remote_id: Remote ID for worker job to cancel 228 :param str jobtype: Job type for worker job to cancel 229 """ 230 231 # find worker for given job 232 if job: 233 jobtype = job.data["jobtype"] 234 235 if jobtype not in self.worker_pool: 236 # no jobs of this type currently known 237 return 238 239 for worker in self.worker_pool[jobtype]: 240 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 241 # first cancel any interruptable queries for this job's worker 242 while True: 243 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 244 if not active_queries: 245 # all cancellation jobs have been run 246 break 247 248 for cancel_job in active_queries: 249 if cancel_job.is_claimed: 250 continue 251 252 # this will make the job be run asap 253 cancel_job.claim() 254 cancel_job.release(delay=0, claim_after=0) 255 256 # give the cancel job a moment to run 257 time.sleep(0.25) 258 259 # now all queries are interrupted, formally request an abort 260 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 261 worker.request_interrupt(interrupt_level) 262 return
Manages the job queue and worker pool
28 def __init__(self, queue, database, logger, modules, as_daemon=True): 29 """ 30 Initialize manager 31 32 :param queue: Job queue 33 :param database: Database handler 34 :param logger: Logger object 35 :param modules: Modules cache via ModuleLoader() 36 :param bool as_daemon: Whether the manager is being run as a daemon 37 """ 38 self.queue = queue 39 self.db = database 40 self.log = logger 41 self.modules = modules 42 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 43 44 if as_daemon: 45 signal.signal(signal.SIGTERM, self.abort) 46 signal.signal(signal.SIGINT, self.request_interrupt) 47 48 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 49 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 50 self.validate_datasources() 51 52 # queue jobs for workers that always need one 53 for worker_name, worker in self.modules.workers.items(): 54 if hasattr(worker, "ensure_job"): 55 # ensure_job is a class method that returns a dict with job parameters if job should be added 56 # pass config for some workers (e.g., web studies extensions) 57 try: 58 job_params = worker.ensure_job(config=self.modules.config) 59 except Exception as e: 60 self.log.error(f"Error while ensuring job for worker {worker_name}: {e}") 61 job_params = None 62 63 if job_params: 64 self.queue.add_job(jobtype=worker_name, **job_params) 65 66 67 self.log.info("4CAT Started") 68 69 # flush module collector log buffer 70 # the logger is not available when this initialises 71 # but it is now! 72 if self.modules.log_buffer: 73 self.log.warning(self.modules.log_buffer) 74 self.modules.log_buffer = "" 75 76 # it's time 77 self.loop()
Initialize manager
Parameters
- queue: Job queue
- database: Database handler
- logger: Logger object
- modules: Modules cache via ModuleLoader()
- bool as_daemon: Whether the manager is being run as a daemon
79 def delegate(self): 80 """ 81 Delegate work 82 83 Checks for open jobs, and then passes those to dedicated workers, if 84 slots are available for those workers. 85 """ 86 jobs = self.queue.get_all_jobs() 87 88 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 89 self.log.debug("Running workers: %i" % num_active) 90 91 # clean up workers that have finished processing 92 for jobtype in self.worker_pool: 93 all_workers = self.worker_pool[jobtype] 94 for worker in all_workers: 95 if not worker.is_alive(): 96 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 97 worker.join() 98 self.worker_pool[jobtype].remove(worker) 99 100 del all_workers 101 102 # check if workers are available for unclaimed jobs 103 for job in jobs: 104 jobtype = job.data["jobtype"] 105 106 if jobtype in self.modules.workers: 107 worker_class = self.modules.workers[jobtype] 108 if jobtype not in self.worker_pool: 109 self.worker_pool[jobtype] = [] 110 111 # if a job is of a known type, and that job type has open 112 # worker slots, start a new worker to run it 113 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 114 try: 115 job.claim() 116 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 117 worker.start() 118 self.log.info(f"Starting new worker for job {job.data['jobtype']}/{job.data['remote_id']}") 119 self.worker_pool[jobtype].append(worker) 120 except JobClaimedException: 121 # it's fine 122 pass 123 else: 124 if jobtype not in self.unknown_jobs: 125 self.log.error("Unknown job type: %s" % jobtype) 126 self.unknown_jobs.add(jobtype) 127 128 time.sleep(1)
Delegate work
Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.
130 def loop(self): 131 """ 132 Main loop 133 134 Constantly delegates work, until no longer looping, after which all 135 workers are asked to stop their work. Once that has happened, the loop 136 properly ends. 137 """ 138 while self.looping: 139 try: 140 self.delegate() 141 except KeyboardInterrupt: 142 self.looping = False 143 144 self.log.info("Telling all workers to stop doing whatever they're doing...") 145 146 # request shutdown from all workers except the API 147 # this allows us to use the API to figure out if a certain worker is 148 # hanging during shutdown, for example 149 for jobtype in self.worker_pool: 150 if jobtype == "api": 151 continue 152 153 for worker in self.worker_pool[jobtype]: 154 if hasattr(worker, "request_interrupt"): 155 worker.request_interrupt() 156 else: 157 worker.abort() 158 159 # wait for all workers that we just asked to quit to finish 160 self.log.info("Waiting for all workers to finish...") 161 for jobtype in self.worker_pool: 162 if jobtype == "api": 163 continue 164 for worker in self.worker_pool[jobtype]: 165 self.log.info("Waiting for worker %s..." % jobtype) 166 worker.join() 167 168 # shut down API last 169 for worker in self.worker_pool.get("api", []): 170 worker.request_interrupt() 171 worker.join() 172 173 # abort 174 time.sleep(1) 175 self.log.info("Bye!")
Main loop
Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.
177 def validate_datasources(self): 178 """ 179 Validate data sources 180 181 Logs warnings if not all information is precent for the configured data 182 sources. 183 """ 184 185 for datasource in self.modules.datasources: 186 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 187 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 188 189 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
Validate data sources
Logs warnings if not all information is precent for the configured data sources.
191 def abort(self, signal=None, stack=None): 192 """ 193 Stop looping the delegator, clean up, and prepare for shutdown 194 """ 195 self.log.info("Received SIGTERM") 196 197 # cancel all interruptible postgres queries 198 # this needs to be done before we stop looping since after that no new 199 # jobs will be claimed, and needs to be done here because the worker's 200 # own database connection is busy executing the query that it should 201 # cancel! so we can't use it to update the job and make it get claimed 202 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 203 # this will make all these jobs immediately claimable, i.e. queries 204 # will get cancelled asap 205 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 206 job.claim() 207 job.release(delay=0, claim_after=0) 208 209 # wait until all cancel jobs are completed 210 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 211 time.sleep(0.25) 212 213 # now stop looping (i.e. accepting new jobs) 214 self.looping = False
Stop looping the delegator, clean up, and prepare for shutdown
216 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 217 """ 218 Interrupt a job 219 220 This method can be called via e.g. the API, to interrupt a specific 221 job's worker. The worker can be targeted either with a Job object or 222 with a combination of job type and remote ID, since those uniquely 223 identify a job. 224 225 :param int interrupt_level: Retry later or cancel? 226 :param Job job: Job object to cancel worker for 227 :param str remote_id: Remote ID for worker job to cancel 228 :param str jobtype: Job type for worker job to cancel 229 """ 230 231 # find worker for given job 232 if job: 233 jobtype = job.data["jobtype"] 234 235 if jobtype not in self.worker_pool: 236 # no jobs of this type currently known 237 return 238 239 for worker in self.worker_pool[jobtype]: 240 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 241 # first cancel any interruptable queries for this job's worker 242 while True: 243 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 244 if not active_queries: 245 # all cancellation jobs have been run 246 break 247 248 for cancel_job in active_queries: 249 if cancel_job.is_claimed: 250 continue 251 252 # this will make the job be run asap 253 cancel_job.claim() 254 cancel_job.release(delay=0, claim_after=0) 255 256 # give the cancel job a moment to run 257 time.sleep(0.25) 258 259 # now all queries are interrupted, formally request an abort 260 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 261 worker.request_interrupt(interrupt_level) 262 return
Interrupt a job
This method can be called via e.g. the API, to interrupt a specific job's worker. The worker can be targeted either with a Job object or with a combination of job type and remote ID, since those uniquely identify a job.
Parameters
- int interrupt_level: Retry later or cancel?
- Job job: Job object to cancel worker for
- str remote_id: Remote ID for worker job to cancel
- str jobtype: Job type for worker job to cancel