backend.lib.manager
The heart of the app - manages jobs and workers
1""" 2The heart of the app - manages jobs and workers 3""" 4import signal 5import time 6 7from backend.lib.proxied_requests import DelegatedRequestHandler 8from common.lib.exceptions import JobClaimedException 9 10 11class WorkerManager: 12 """ 13 Manages the job queue and worker pool 14 """ 15 queue = None 16 db = None 17 log = None 18 modules = None 19 proxy_delegator = None 20 21 worker_pool = {} 22 job_mapping = {} 23 pool = [] 24 looping = True 25 unknown_jobs = set() 26 27 def __init__(self, queue, database, logger, modules, as_daemon=True): 28 """ 29 Initialize manager 30 31 :param queue: Job queue 32 :param database: Database handler 33 :param logger: Logger object 34 :param modules: Modules cache via ModuleLoader() 35 :param bool as_daemon: Whether the manager is being run as a daemon 36 """ 37 self.queue = queue 38 self.db = database 39 self.log = logger 40 self.modules = modules 41 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 42 43 if as_daemon: 44 signal.signal(signal.SIGTERM, self.abort) 45 signal.signal(signal.SIGINT, self.request_interrupt) 46 47 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 48 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 49 self.validate_datasources() 50 51 # queue jobs for workers that always need one 52 for worker_name, worker in self.modules.workers.items(): 53 if hasattr(worker, "ensure_job"): 54 self.queue.add_job(jobtype=worker_name, **worker.ensure_job) 55 56 self.log.info("4CAT Started") 57 58 # flush module collector log buffer 59 # the logger is not available when this initialises 60 # but it is now! 61 if self.modules.log_buffer: 62 self.log.warning(self.modules.log_buffer) 63 self.modules.log_buffer = "" 64 65 # it's time 66 self.loop() 67 68 def delegate(self): 69 """ 70 Delegate work 71 72 Checks for open jobs, and then passes those to dedicated workers, if 73 slots are available for those workers. 74 """ 75 jobs = self.queue.get_all_jobs() 76 77 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 78 self.log.debug("Running workers: %i" % num_active) 79 80 # clean up workers that have finished processing 81 for jobtype in self.worker_pool: 82 all_workers = self.worker_pool[jobtype] 83 for worker in all_workers: 84 if not worker.is_alive(): 85 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 86 worker.join() 87 self.worker_pool[jobtype].remove(worker) 88 89 del all_workers 90 91 # check if workers are available for unclaimed jobs 92 for job in jobs: 93 jobtype = job.data["jobtype"] 94 95 if jobtype in self.modules.workers: 96 worker_class = self.modules.workers[jobtype] 97 if jobtype not in self.worker_pool: 98 self.worker_pool[jobtype] = [] 99 100 # if a job is of a known type, and that job type has open 101 # worker slots, start a new worker to run it 102 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 103 try: 104 job.claim() 105 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 106 worker.start() 107 self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}") 108 self.worker_pool[jobtype].append(worker) 109 except JobClaimedException: 110 # it's fine 111 pass 112 else: 113 if jobtype not in self.unknown_jobs: 114 self.log.error("Unknown job type: %s" % jobtype) 115 self.unknown_jobs.add(jobtype) 116 117 time.sleep(1) 118 119 def loop(self): 120 """ 121 Main loop 122 123 Constantly delegates work, until no longer looping, after which all 124 workers are asked to stop their work. Once that has happened, the loop 125 properly ends. 126 """ 127 while self.looping: 128 try: 129 self.delegate() 130 except KeyboardInterrupt: 131 self.looping = False 132 133 self.log.info("Telling all workers to stop doing whatever they're doing...") 134 135 # request shutdown from all workers except the API 136 # this allows us to use the API to figure out if a certain worker is 137 # hanging during shutdown, for example 138 for jobtype in self.worker_pool: 139 if jobtype == "api": 140 continue 141 142 for worker in self.worker_pool[jobtype]: 143 if hasattr(worker, "request_interrupt"): 144 worker.request_interrupt() 145 else: 146 worker.abort() 147 148 # wait for all workers that we just asked to quit to finish 149 self.log.info("Waiting for all workers to finish...") 150 for jobtype in self.worker_pool: 151 if jobtype == "api": 152 continue 153 for worker in self.worker_pool[jobtype]: 154 self.log.info("Waiting for worker %s..." % jobtype) 155 worker.join() 156 157 # shut down API last 158 for worker in self.worker_pool.get("api", []): 159 worker.request_interrupt() 160 worker.join() 161 162 # abort 163 time.sleep(1) 164 self.log.info("Bye!") 165 166 def validate_datasources(self): 167 """ 168 Validate data sources 169 170 Logs warnings if not all information is precent for the configured data 171 sources. 172 """ 173 174 for datasource in self.modules.datasources: 175 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 176 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 177 178 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config) 179 180 def abort(self, signal=None, stack=None): 181 """ 182 Stop looping the delegator, clean up, and prepare for shutdown 183 """ 184 self.log.info("Received SIGTERM") 185 186 # cancel all interruptible postgres queries 187 # this needs to be done before we stop looping since after that no new 188 # jobs will be claimed, and needs to be done here because the worker's 189 # own database connection is busy executing the query that it should 190 # cancel! so we can't use it to update the job and make it get claimed 191 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 192 # this will make all these jobs immediately claimable, i.e. queries 193 # will get cancelled asap 194 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 195 job.claim() 196 job.release(delay=0, claim_after=0) 197 198 # wait until all cancel jobs are completed 199 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 200 time.sleep(0.25) 201 202 # now stop looping (i.e. accepting new jobs) 203 self.looping = False 204 205 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 206 """ 207 Interrupt a job 208 209 This method can be called via e.g. the API, to interrupt a specific 210 job's worker. The worker can be targeted either with a Job object or 211 with a combination of job type and remote ID, since those uniquely 212 identify a job. 213 214 :param int interrupt_level: Retry later or cancel? 215 :param Job job: Job object to cancel worker for 216 :param str remote_id: Remote ID for worker job to cancel 217 :param str jobtype: Job type for worker job to cancel 218 """ 219 220 # find worker for given job 221 if job: 222 jobtype = job.data["jobtype"] 223 224 if jobtype not in self.worker_pool: 225 # no jobs of this type currently known 226 return 227 228 for worker in self.worker_pool[jobtype]: 229 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 230 # first cancel any interruptable queries for this job's worker 231 while True: 232 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 233 if not active_queries: 234 # all cancellation jobs have been run 235 break 236 237 for cancel_job in active_queries: 238 if cancel_job.is_claimed: 239 continue 240 241 # this will make the job be run asap 242 cancel_job.claim() 243 cancel_job.release(delay=0, claim_after=0) 244 245 # give the cancel job a moment to run 246 time.sleep(0.25) 247 248 # now all queries are interrupted, formally request an abort 249 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 250 worker.request_interrupt(interrupt_level) 251 return
12class WorkerManager: 13 """ 14 Manages the job queue and worker pool 15 """ 16 queue = None 17 db = None 18 log = None 19 modules = None 20 proxy_delegator = None 21 22 worker_pool = {} 23 job_mapping = {} 24 pool = [] 25 looping = True 26 unknown_jobs = set() 27 28 def __init__(self, queue, database, logger, modules, as_daemon=True): 29 """ 30 Initialize manager 31 32 :param queue: Job queue 33 :param database: Database handler 34 :param logger: Logger object 35 :param modules: Modules cache via ModuleLoader() 36 :param bool as_daemon: Whether the manager is being run as a daemon 37 """ 38 self.queue = queue 39 self.db = database 40 self.log = logger 41 self.modules = modules 42 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 43 44 if as_daemon: 45 signal.signal(signal.SIGTERM, self.abort) 46 signal.signal(signal.SIGINT, self.request_interrupt) 47 48 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 49 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 50 self.validate_datasources() 51 52 # queue jobs for workers that always need one 53 for worker_name, worker in self.modules.workers.items(): 54 if hasattr(worker, "ensure_job"): 55 self.queue.add_job(jobtype=worker_name, **worker.ensure_job) 56 57 self.log.info("4CAT Started") 58 59 # flush module collector log buffer 60 # the logger is not available when this initialises 61 # but it is now! 62 if self.modules.log_buffer: 63 self.log.warning(self.modules.log_buffer) 64 self.modules.log_buffer = "" 65 66 # it's time 67 self.loop() 68 69 def delegate(self): 70 """ 71 Delegate work 72 73 Checks for open jobs, and then passes those to dedicated workers, if 74 slots are available for those workers. 75 """ 76 jobs = self.queue.get_all_jobs() 77 78 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 79 self.log.debug("Running workers: %i" % num_active) 80 81 # clean up workers that have finished processing 82 for jobtype in self.worker_pool: 83 all_workers = self.worker_pool[jobtype] 84 for worker in all_workers: 85 if not worker.is_alive(): 86 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 87 worker.join() 88 self.worker_pool[jobtype].remove(worker) 89 90 del all_workers 91 92 # check if workers are available for unclaimed jobs 93 for job in jobs: 94 jobtype = job.data["jobtype"] 95 96 if jobtype in self.modules.workers: 97 worker_class = self.modules.workers[jobtype] 98 if jobtype not in self.worker_pool: 99 self.worker_pool[jobtype] = [] 100 101 # if a job is of a known type, and that job type has open 102 # worker slots, start a new worker to run it 103 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 104 try: 105 job.claim() 106 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 107 worker.start() 108 self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}") 109 self.worker_pool[jobtype].append(worker) 110 except JobClaimedException: 111 # it's fine 112 pass 113 else: 114 if jobtype not in self.unknown_jobs: 115 self.log.error("Unknown job type: %s" % jobtype) 116 self.unknown_jobs.add(jobtype) 117 118 time.sleep(1) 119 120 def loop(self): 121 """ 122 Main loop 123 124 Constantly delegates work, until no longer looping, after which all 125 workers are asked to stop their work. Once that has happened, the loop 126 properly ends. 127 """ 128 while self.looping: 129 try: 130 self.delegate() 131 except KeyboardInterrupt: 132 self.looping = False 133 134 self.log.info("Telling all workers to stop doing whatever they're doing...") 135 136 # request shutdown from all workers except the API 137 # this allows us to use the API to figure out if a certain worker is 138 # hanging during shutdown, for example 139 for jobtype in self.worker_pool: 140 if jobtype == "api": 141 continue 142 143 for worker in self.worker_pool[jobtype]: 144 if hasattr(worker, "request_interrupt"): 145 worker.request_interrupt() 146 else: 147 worker.abort() 148 149 # wait for all workers that we just asked to quit to finish 150 self.log.info("Waiting for all workers to finish...") 151 for jobtype in self.worker_pool: 152 if jobtype == "api": 153 continue 154 for worker in self.worker_pool[jobtype]: 155 self.log.info("Waiting for worker %s..." % jobtype) 156 worker.join() 157 158 # shut down API last 159 for worker in self.worker_pool.get("api", []): 160 worker.request_interrupt() 161 worker.join() 162 163 # abort 164 time.sleep(1) 165 self.log.info("Bye!") 166 167 def validate_datasources(self): 168 """ 169 Validate data sources 170 171 Logs warnings if not all information is precent for the configured data 172 sources. 173 """ 174 175 for datasource in self.modules.datasources: 176 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 177 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 178 179 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config) 180 181 def abort(self, signal=None, stack=None): 182 """ 183 Stop looping the delegator, clean up, and prepare for shutdown 184 """ 185 self.log.info("Received SIGTERM") 186 187 # cancel all interruptible postgres queries 188 # this needs to be done before we stop looping since after that no new 189 # jobs will be claimed, and needs to be done here because the worker's 190 # own database connection is busy executing the query that it should 191 # cancel! so we can't use it to update the job and make it get claimed 192 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 193 # this will make all these jobs immediately claimable, i.e. queries 194 # will get cancelled asap 195 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 196 job.claim() 197 job.release(delay=0, claim_after=0) 198 199 # wait until all cancel jobs are completed 200 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 201 time.sleep(0.25) 202 203 # now stop looping (i.e. accepting new jobs) 204 self.looping = False 205 206 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 207 """ 208 Interrupt a job 209 210 This method can be called via e.g. the API, to interrupt a specific 211 job's worker. The worker can be targeted either with a Job object or 212 with a combination of job type and remote ID, since those uniquely 213 identify a job. 214 215 :param int interrupt_level: Retry later or cancel? 216 :param Job job: Job object to cancel worker for 217 :param str remote_id: Remote ID for worker job to cancel 218 :param str jobtype: Job type for worker job to cancel 219 """ 220 221 # find worker for given job 222 if job: 223 jobtype = job.data["jobtype"] 224 225 if jobtype not in self.worker_pool: 226 # no jobs of this type currently known 227 return 228 229 for worker in self.worker_pool[jobtype]: 230 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 231 # first cancel any interruptable queries for this job's worker 232 while True: 233 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 234 if not active_queries: 235 # all cancellation jobs have been run 236 break 237 238 for cancel_job in active_queries: 239 if cancel_job.is_claimed: 240 continue 241 242 # this will make the job be run asap 243 cancel_job.claim() 244 cancel_job.release(delay=0, claim_after=0) 245 246 # give the cancel job a moment to run 247 time.sleep(0.25) 248 249 # now all queries are interrupted, formally request an abort 250 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 251 worker.request_interrupt(interrupt_level) 252 return
Manages the job queue and worker pool
28 def __init__(self, queue, database, logger, modules, as_daemon=True): 29 """ 30 Initialize manager 31 32 :param queue: Job queue 33 :param database: Database handler 34 :param logger: Logger object 35 :param modules: Modules cache via ModuleLoader() 36 :param bool as_daemon: Whether the manager is being run as a daemon 37 """ 38 self.queue = queue 39 self.db = database 40 self.log = logger 41 self.modules = modules 42 self.proxy_delegator = DelegatedRequestHandler(self.log, self.modules.config) 43 44 if as_daemon: 45 signal.signal(signal.SIGTERM, self.abort) 46 signal.signal(signal.SIGINT, self.request_interrupt) 47 48 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 49 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 50 self.validate_datasources() 51 52 # queue jobs for workers that always need one 53 for worker_name, worker in self.modules.workers.items(): 54 if hasattr(worker, "ensure_job"): 55 self.queue.add_job(jobtype=worker_name, **worker.ensure_job) 56 57 self.log.info("4CAT Started") 58 59 # flush module collector log buffer 60 # the logger is not available when this initialises 61 # but it is now! 62 if self.modules.log_buffer: 63 self.log.warning(self.modules.log_buffer) 64 self.modules.log_buffer = "" 65 66 # it's time 67 self.loop()
Initialize manager
Parameters
- queue: Job queue
- database: Database handler
- logger: Logger object
- modules: Modules cache via ModuleLoader()
- bool as_daemon: Whether the manager is being run as a daemon
69 def delegate(self): 70 """ 71 Delegate work 72 73 Checks for open jobs, and then passes those to dedicated workers, if 74 slots are available for those workers. 75 """ 76 jobs = self.queue.get_all_jobs() 77 78 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 79 self.log.debug("Running workers: %i" % num_active) 80 81 # clean up workers that have finished processing 82 for jobtype in self.worker_pool: 83 all_workers = self.worker_pool[jobtype] 84 for worker in all_workers: 85 if not worker.is_alive(): 86 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 87 worker.join() 88 self.worker_pool[jobtype].remove(worker) 89 90 del all_workers 91 92 # check if workers are available for unclaimed jobs 93 for job in jobs: 94 jobtype = job.data["jobtype"] 95 96 if jobtype in self.modules.workers: 97 worker_class = self.modules.workers[jobtype] 98 if jobtype not in self.worker_pool: 99 self.worker_pool[jobtype] = [] 100 101 # if a job is of a known type, and that job type has open 102 # worker slots, start a new worker to run it 103 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 104 try: 105 job.claim() 106 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 107 worker.start() 108 self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}") 109 self.worker_pool[jobtype].append(worker) 110 except JobClaimedException: 111 # it's fine 112 pass 113 else: 114 if jobtype not in self.unknown_jobs: 115 self.log.error("Unknown job type: %s" % jobtype) 116 self.unknown_jobs.add(jobtype) 117 118 time.sleep(1)
Delegate work
Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.
120 def loop(self): 121 """ 122 Main loop 123 124 Constantly delegates work, until no longer looping, after which all 125 workers are asked to stop their work. Once that has happened, the loop 126 properly ends. 127 """ 128 while self.looping: 129 try: 130 self.delegate() 131 except KeyboardInterrupt: 132 self.looping = False 133 134 self.log.info("Telling all workers to stop doing whatever they're doing...") 135 136 # request shutdown from all workers except the API 137 # this allows us to use the API to figure out if a certain worker is 138 # hanging during shutdown, for example 139 for jobtype in self.worker_pool: 140 if jobtype == "api": 141 continue 142 143 for worker in self.worker_pool[jobtype]: 144 if hasattr(worker, "request_interrupt"): 145 worker.request_interrupt() 146 else: 147 worker.abort() 148 149 # wait for all workers that we just asked to quit to finish 150 self.log.info("Waiting for all workers to finish...") 151 for jobtype in self.worker_pool: 152 if jobtype == "api": 153 continue 154 for worker in self.worker_pool[jobtype]: 155 self.log.info("Waiting for worker %s..." % jobtype) 156 worker.join() 157 158 # shut down API last 159 for worker in self.worker_pool.get("api", []): 160 worker.request_interrupt() 161 worker.join() 162 163 # abort 164 time.sleep(1) 165 self.log.info("Bye!")
Main loop
Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.
167 def validate_datasources(self): 168 """ 169 Validate data sources 170 171 Logs warnings if not all information is precent for the configured data 172 sources. 173 """ 174 175 for datasource in self.modules.datasources: 176 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 177 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 178 179 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource, self.modules.config)
Validate data sources
Logs warnings if not all information is precent for the configured data sources.
181 def abort(self, signal=None, stack=None): 182 """ 183 Stop looping the delegator, clean up, and prepare for shutdown 184 """ 185 self.log.info("Received SIGTERM") 186 187 # cancel all interruptible postgres queries 188 # this needs to be done before we stop looping since after that no new 189 # jobs will be claimed, and needs to be done here because the worker's 190 # own database connection is busy executing the query that it should 191 # cancel! so we can't use it to update the job and make it get claimed 192 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 193 # this will make all these jobs immediately claimable, i.e. queries 194 # will get cancelled asap 195 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 196 job.claim() 197 job.release(delay=0, claim_after=0) 198 199 # wait until all cancel jobs are completed 200 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 201 time.sleep(0.25) 202 203 # now stop looping (i.e. accepting new jobs) 204 self.looping = False
Stop looping the delegator, clean up, and prepare for shutdown
206 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 207 """ 208 Interrupt a job 209 210 This method can be called via e.g. the API, to interrupt a specific 211 job's worker. The worker can be targeted either with a Job object or 212 with a combination of job type and remote ID, since those uniquely 213 identify a job. 214 215 :param int interrupt_level: Retry later or cancel? 216 :param Job job: Job object to cancel worker for 217 :param str remote_id: Remote ID for worker job to cancel 218 :param str jobtype: Job type for worker job to cancel 219 """ 220 221 # find worker for given job 222 if job: 223 jobtype = job.data["jobtype"] 224 225 if jobtype not in self.worker_pool: 226 # no jobs of this type currently known 227 return 228 229 for worker in self.worker_pool[jobtype]: 230 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 231 # first cancel any interruptable queries for this job's worker 232 while True: 233 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 234 if not active_queries: 235 # all cancellation jobs have been run 236 break 237 238 for cancel_job in active_queries: 239 if cancel_job.is_claimed: 240 continue 241 242 # this will make the job be run asap 243 cancel_job.claim() 244 cancel_job.release(delay=0, claim_after=0) 245 246 # give the cancel job a moment to run 247 time.sleep(0.25) 248 249 # now all queries are interrupted, formally request an abort 250 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 251 worker.request_interrupt(interrupt_level) 252 return
Interrupt a job
This method can be called via e.g. the API, to interrupt a specific job's worker. The worker can be targeted either with a Job object or with a combination of job type and remote ID, since those uniquely identify a job.
Parameters
- int interrupt_level: Retry later or cancel?
- Job job: Job object to cancel worker for
- str remote_id: Remote ID for worker job to cancel
- str jobtype: Job type for worker job to cancel