backend.lib.manager
The heart of the app - manages jobs and workers
1""" 2The heart of the app - manages jobs and workers 3""" 4import signal 5import time 6 7from common.lib.module_loader import ModuleCollector 8from common.lib.exceptions import JobClaimedException 9 10 11class WorkerManager: 12 """ 13 Manages the job queue and worker pool 14 """ 15 queue = None 16 db = None 17 log = None 18 modules = None 19 20 worker_pool = {} 21 job_mapping = {} 22 pool = [] 23 looping = True 24 25 def __init__(self, queue, database, logger, as_daemon=True): 26 """ 27 Initialize manager 28 29 :param queue: Job queue 30 :param database: Database handler 31 :param logger: Logger object 32 :param bool as_daemon: Whether the manager is being run as a daemon 33 """ 34 self.queue = queue 35 self.db = database 36 self.log = logger 37 self.modules = ModuleCollector(write_config=True) 38 39 if as_daemon: 40 signal.signal(signal.SIGTERM, self.abort) 41 signal.signal(signal.SIGINT, self.request_interrupt) 42 43 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 44 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 45 self.validate_datasources() 46 47 # queue jobs for workers that always need one 48 for worker_name, worker in self.modules.workers.items(): 49 if hasattr(worker, "ensure_job"): 50 self.queue.add_job(jobtype=worker_name, **worker.ensure_job) 51 52 self.log.info("4CAT Started") 53 54 # flush module collector log buffer 55 # the logger is not available when this initialises 56 # but it is now! 57 if self.modules.log_buffer: 58 self.log.warning(self.modules.log_buffer) 59 self.modules.log_buffer = "" 60 61 # it's time 62 self.loop() 63 64 def delegate(self): 65 """ 66 Delegate work 67 68 Checks for open jobs, and then passes those to dedicated workers, if 69 slots are available for those workers. 70 """ 71 jobs = self.queue.get_all_jobs() 72 73 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 74 self.log.debug("Running workers: %i" % num_active) 75 76 # clean up workers that have finished processing 77 for jobtype in self.worker_pool: 78 all_workers = self.worker_pool[jobtype] 79 for worker in all_workers: 80 if not worker.is_alive(): 81 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 82 worker.join() 83 self.worker_pool[jobtype].remove(worker) 84 85 del all_workers 86 87 # check if workers are available for unclaimed jobs 88 for job in jobs: 89 jobtype = job.data["jobtype"] 90 91 if jobtype in self.modules.workers: 92 worker_class = self.modules.workers[jobtype] 93 if jobtype not in self.worker_pool: 94 self.worker_pool[jobtype] = [] 95 96 # if a job is of a known type, and that job type has open 97 # worker slots, start a new worker to run it 98 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 99 try: 100 job.claim() 101 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 102 worker.start() 103 self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}") 104 self.worker_pool[jobtype].append(worker) 105 except JobClaimedException: 106 # it's fine 107 pass 108 else: 109 self.log.error("Unknown job type: %s" % jobtype) 110 111 time.sleep(1) 112 113 def loop(self): 114 """ 115 Main loop 116 117 Constantly delegates work, until no longer looping, after which all 118 workers are asked to stop their work. Once that has happened, the loop 119 properly ends. 120 """ 121 while self.looping: 122 try: 123 self.delegate() 124 except KeyboardInterrupt: 125 self.looping = False 126 127 self.log.info("Telling all workers to stop doing whatever they're doing...") 128 # request shutdown from all workers except the API 129 # this allows us to use the API to figure out if a certain worker is 130 # hanging during shutdown, for example 131 for jobtype in self.worker_pool: 132 if jobtype == "api": 133 continue 134 135 for worker in self.worker_pool[jobtype]: 136 if hasattr(worker, "request_interrupt"): 137 worker.request_interrupt() 138 else: 139 worker.abort() 140 141 # wait for all workers that we just asked to quit to finish 142 self.log.info("Waiting for all workers to finish...") 143 for jobtype in self.worker_pool: 144 if jobtype == "api": 145 continue 146 for worker in self.worker_pool[jobtype]: 147 self.log.info("Waiting for worker %s..." % jobtype) 148 worker.join() 149 150 # shut down API last 151 for worker in self.worker_pool.get("api", []): 152 worker.request_interrupt() 153 worker.join() 154 155 # abort 156 time.sleep(1) 157 self.log.info("Bye!") 158 159 def validate_datasources(self): 160 """ 161 Validate data sources 162 163 Logs warnings if not all information is precent for the configured data 164 sources. 165 """ 166 167 for datasource in self.modules.datasources: 168 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 169 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 170 171 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource) 172 173 def abort(self, signal=None, stack=None): 174 """ 175 Stop looping the delegator, clean up, and prepare for shutdown 176 """ 177 self.log.info("Received SIGTERM") 178 179 # cancel all interruptible postgres queries 180 # this needs to be done before we stop looping since after that no new 181 # jobs will be claimed, and needs to be done here because the worker's 182 # own database connection is busy executing the query that it should 183 # cancel! so we can't use it to update the job and make it get claimed 184 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 185 # this will make all these jobs immediately claimable, i.e. queries 186 # will get cancelled asap 187 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 188 job.claim() 189 job.release(delay=0, claim_after=0) 190 191 # wait until all cancel jobs are completed 192 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 193 time.sleep(0.25) 194 195 # now stop looping (i.e. accepting new jobs) 196 self.looping = False 197 198 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 199 """ 200 Interrupt a job 201 202 This method can be called via e.g. the API, to interrupt a specific 203 job's worker. The worker can be targeted either with a Job object or 204 with a combination of job type and remote ID, since those uniquely 205 identify a job. 206 207 :param int interrupt_level: Retry later or cancel? 208 :param Job job: Job object to cancel worker for 209 :param str remote_id: Remote ID for worker job to cancel 210 :param str jobtype: Job type for worker job to cancel 211 """ 212 213 # find worker for given job 214 if job: 215 jobtype = job.data["jobtype"] 216 217 if jobtype not in self.worker_pool: 218 # no jobs of this type currently known 219 return 220 221 for worker in self.worker_pool[jobtype]: 222 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 223 # first cancel any interruptable queries for this job's worker 224 while True: 225 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 226 if not active_queries: 227 # all cancellation jobs have been run 228 break 229 230 for cancel_job in active_queries: 231 if cancel_job.is_claimed: 232 continue 233 234 # this will make the job be run asap 235 cancel_job.claim() 236 cancel_job.release(delay=0, claim_after=0) 237 238 # give the cancel job a moment to run 239 time.sleep(0.25) 240 241 # now all queries are interrupted, formally request an abort 242 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 243 worker.request_interrupt(interrupt_level) 244 return
12class WorkerManager: 13 """ 14 Manages the job queue and worker pool 15 """ 16 queue = None 17 db = None 18 log = None 19 modules = None 20 21 worker_pool = {} 22 job_mapping = {} 23 pool = [] 24 looping = True 25 26 def __init__(self, queue, database, logger, as_daemon=True): 27 """ 28 Initialize manager 29 30 :param queue: Job queue 31 :param database: Database handler 32 :param logger: Logger object 33 :param bool as_daemon: Whether the manager is being run as a daemon 34 """ 35 self.queue = queue 36 self.db = database 37 self.log = logger 38 self.modules = ModuleCollector(write_config=True) 39 40 if as_daemon: 41 signal.signal(signal.SIGTERM, self.abort) 42 signal.signal(signal.SIGINT, self.request_interrupt) 43 44 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 45 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 46 self.validate_datasources() 47 48 # queue jobs for workers that always need one 49 for worker_name, worker in self.modules.workers.items(): 50 if hasattr(worker, "ensure_job"): 51 self.queue.add_job(jobtype=worker_name, **worker.ensure_job) 52 53 self.log.info("4CAT Started") 54 55 # flush module collector log buffer 56 # the logger is not available when this initialises 57 # but it is now! 58 if self.modules.log_buffer: 59 self.log.warning(self.modules.log_buffer) 60 self.modules.log_buffer = "" 61 62 # it's time 63 self.loop() 64 65 def delegate(self): 66 """ 67 Delegate work 68 69 Checks for open jobs, and then passes those to dedicated workers, if 70 slots are available for those workers. 71 """ 72 jobs = self.queue.get_all_jobs() 73 74 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 75 self.log.debug("Running workers: %i" % num_active) 76 77 # clean up workers that have finished processing 78 for jobtype in self.worker_pool: 79 all_workers = self.worker_pool[jobtype] 80 for worker in all_workers: 81 if not worker.is_alive(): 82 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 83 worker.join() 84 self.worker_pool[jobtype].remove(worker) 85 86 del all_workers 87 88 # check if workers are available for unclaimed jobs 89 for job in jobs: 90 jobtype = job.data["jobtype"] 91 92 if jobtype in self.modules.workers: 93 worker_class = self.modules.workers[jobtype] 94 if jobtype not in self.worker_pool: 95 self.worker_pool[jobtype] = [] 96 97 # if a job is of a known type, and that job type has open 98 # worker slots, start a new worker to run it 99 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 100 try: 101 job.claim() 102 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 103 worker.start() 104 self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}") 105 self.worker_pool[jobtype].append(worker) 106 except JobClaimedException: 107 # it's fine 108 pass 109 else: 110 self.log.error("Unknown job type: %s" % jobtype) 111 112 time.sleep(1) 113 114 def loop(self): 115 """ 116 Main loop 117 118 Constantly delegates work, until no longer looping, after which all 119 workers are asked to stop their work. Once that has happened, the loop 120 properly ends. 121 """ 122 while self.looping: 123 try: 124 self.delegate() 125 except KeyboardInterrupt: 126 self.looping = False 127 128 self.log.info("Telling all workers to stop doing whatever they're doing...") 129 # request shutdown from all workers except the API 130 # this allows us to use the API to figure out if a certain worker is 131 # hanging during shutdown, for example 132 for jobtype in self.worker_pool: 133 if jobtype == "api": 134 continue 135 136 for worker in self.worker_pool[jobtype]: 137 if hasattr(worker, "request_interrupt"): 138 worker.request_interrupt() 139 else: 140 worker.abort() 141 142 # wait for all workers that we just asked to quit to finish 143 self.log.info("Waiting for all workers to finish...") 144 for jobtype in self.worker_pool: 145 if jobtype == "api": 146 continue 147 for worker in self.worker_pool[jobtype]: 148 self.log.info("Waiting for worker %s..." % jobtype) 149 worker.join() 150 151 # shut down API last 152 for worker in self.worker_pool.get("api", []): 153 worker.request_interrupt() 154 worker.join() 155 156 # abort 157 time.sleep(1) 158 self.log.info("Bye!") 159 160 def validate_datasources(self): 161 """ 162 Validate data sources 163 164 Logs warnings if not all information is precent for the configured data 165 sources. 166 """ 167 168 for datasource in self.modules.datasources: 169 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 170 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 171 172 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource) 173 174 def abort(self, signal=None, stack=None): 175 """ 176 Stop looping the delegator, clean up, and prepare for shutdown 177 """ 178 self.log.info("Received SIGTERM") 179 180 # cancel all interruptible postgres queries 181 # this needs to be done before we stop looping since after that no new 182 # jobs will be claimed, and needs to be done here because the worker's 183 # own database connection is busy executing the query that it should 184 # cancel! so we can't use it to update the job and make it get claimed 185 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 186 # this will make all these jobs immediately claimable, i.e. queries 187 # will get cancelled asap 188 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 189 job.claim() 190 job.release(delay=0, claim_after=0) 191 192 # wait until all cancel jobs are completed 193 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 194 time.sleep(0.25) 195 196 # now stop looping (i.e. accepting new jobs) 197 self.looping = False 198 199 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 200 """ 201 Interrupt a job 202 203 This method can be called via e.g. the API, to interrupt a specific 204 job's worker. The worker can be targeted either with a Job object or 205 with a combination of job type and remote ID, since those uniquely 206 identify a job. 207 208 :param int interrupt_level: Retry later or cancel? 209 :param Job job: Job object to cancel worker for 210 :param str remote_id: Remote ID for worker job to cancel 211 :param str jobtype: Job type for worker job to cancel 212 """ 213 214 # find worker for given job 215 if job: 216 jobtype = job.data["jobtype"] 217 218 if jobtype not in self.worker_pool: 219 # no jobs of this type currently known 220 return 221 222 for worker in self.worker_pool[jobtype]: 223 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 224 # first cancel any interruptable queries for this job's worker 225 while True: 226 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 227 if not active_queries: 228 # all cancellation jobs have been run 229 break 230 231 for cancel_job in active_queries: 232 if cancel_job.is_claimed: 233 continue 234 235 # this will make the job be run asap 236 cancel_job.claim() 237 cancel_job.release(delay=0, claim_after=0) 238 239 # give the cancel job a moment to run 240 time.sleep(0.25) 241 242 # now all queries are interrupted, formally request an abort 243 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 244 worker.request_interrupt(interrupt_level) 245 return
Manages the job queue and worker pool
26 def __init__(self, queue, database, logger, as_daemon=True): 27 """ 28 Initialize manager 29 30 :param queue: Job queue 31 :param database: Database handler 32 :param logger: Logger object 33 :param bool as_daemon: Whether the manager is being run as a daemon 34 """ 35 self.queue = queue 36 self.db = database 37 self.log = logger 38 self.modules = ModuleCollector(write_config=True) 39 40 if as_daemon: 41 signal.signal(signal.SIGTERM, self.abort) 42 signal.signal(signal.SIGINT, self.request_interrupt) 43 44 # datasources are initialized here; the init_datasource functions found in their respective __init__.py files 45 # are called which, in the case of scrapers, also adds the scrape jobs to the queue. 46 self.validate_datasources() 47 48 # queue jobs for workers that always need one 49 for worker_name, worker in self.modules.workers.items(): 50 if hasattr(worker, "ensure_job"): 51 self.queue.add_job(jobtype=worker_name, **worker.ensure_job) 52 53 self.log.info("4CAT Started") 54 55 # flush module collector log buffer 56 # the logger is not available when this initialises 57 # but it is now! 58 if self.modules.log_buffer: 59 self.log.warning(self.modules.log_buffer) 60 self.modules.log_buffer = "" 61 62 # it's time 63 self.loop()
Initialize manager
Parameters
- queue: Job queue
- database: Database handler
- logger: Logger object
- bool as_daemon: Whether the manager is being run as a daemon
65 def delegate(self): 66 """ 67 Delegate work 68 69 Checks for open jobs, and then passes those to dedicated workers, if 70 slots are available for those workers. 71 """ 72 jobs = self.queue.get_all_jobs() 73 74 num_active = sum([len(self.worker_pool[jobtype]) for jobtype in self.worker_pool]) 75 self.log.debug("Running workers: %i" % num_active) 76 77 # clean up workers that have finished processing 78 for jobtype in self.worker_pool: 79 all_workers = self.worker_pool[jobtype] 80 for worker in all_workers: 81 if not worker.is_alive(): 82 self.log.debug(f"Terminating worker {worker.job.data['jobtype']}/{worker.job.data['remote_id']}") 83 worker.join() 84 self.worker_pool[jobtype].remove(worker) 85 86 del all_workers 87 88 # check if workers are available for unclaimed jobs 89 for job in jobs: 90 jobtype = job.data["jobtype"] 91 92 if jobtype in self.modules.workers: 93 worker_class = self.modules.workers[jobtype] 94 if jobtype not in self.worker_pool: 95 self.worker_pool[jobtype] = [] 96 97 # if a job is of a known type, and that job type has open 98 # worker slots, start a new worker to run it 99 if len(self.worker_pool[jobtype]) < worker_class.max_workers: 100 try: 101 job.claim() 102 worker = worker_class(logger=self.log, manager=self, job=job, modules=self.modules) 103 worker.start() 104 self.log.debug(f"Starting new worker of for job {job.data['jobtype']}/{job.data['remote_id']}") 105 self.worker_pool[jobtype].append(worker) 106 except JobClaimedException: 107 # it's fine 108 pass 109 else: 110 self.log.error("Unknown job type: %s" % jobtype) 111 112 time.sleep(1)
Delegate work
Checks for open jobs, and then passes those to dedicated workers, if slots are available for those workers.
114 def loop(self): 115 """ 116 Main loop 117 118 Constantly delegates work, until no longer looping, after which all 119 workers are asked to stop their work. Once that has happened, the loop 120 properly ends. 121 """ 122 while self.looping: 123 try: 124 self.delegate() 125 except KeyboardInterrupt: 126 self.looping = False 127 128 self.log.info("Telling all workers to stop doing whatever they're doing...") 129 # request shutdown from all workers except the API 130 # this allows us to use the API to figure out if a certain worker is 131 # hanging during shutdown, for example 132 for jobtype in self.worker_pool: 133 if jobtype == "api": 134 continue 135 136 for worker in self.worker_pool[jobtype]: 137 if hasattr(worker, "request_interrupt"): 138 worker.request_interrupt() 139 else: 140 worker.abort() 141 142 # wait for all workers that we just asked to quit to finish 143 self.log.info("Waiting for all workers to finish...") 144 for jobtype in self.worker_pool: 145 if jobtype == "api": 146 continue 147 for worker in self.worker_pool[jobtype]: 148 self.log.info("Waiting for worker %s..." % jobtype) 149 worker.join() 150 151 # shut down API last 152 for worker in self.worker_pool.get("api", []): 153 worker.request_interrupt() 154 worker.join() 155 156 # abort 157 time.sleep(1) 158 self.log.info("Bye!")
Main loop
Constantly delegates work, until no longer looping, after which all workers are asked to stop their work. Once that has happened, the loop properly ends.
160 def validate_datasources(self): 161 """ 162 Validate data sources 163 164 Logs warnings if not all information is precent for the configured data 165 sources. 166 """ 167 168 for datasource in self.modules.datasources: 169 if datasource + "-search" not in self.modules.workers and datasource + "-import" not in self.modules.workers: 170 self.log.error("No search worker defined for datasource %s or its modules are missing. Search queries will not be executed." % datasource) 171 172 self.modules.datasources[datasource]["init"](self.db, self.log, self.queue, datasource)
Validate data sources
Logs warnings if not all information is precent for the configured data sources.
174 def abort(self, signal=None, stack=None): 175 """ 176 Stop looping the delegator, clean up, and prepare for shutdown 177 """ 178 self.log.info("Received SIGTERM") 179 180 # cancel all interruptible postgres queries 181 # this needs to be done before we stop looping since after that no new 182 # jobs will be claimed, and needs to be done here because the worker's 183 # own database connection is busy executing the query that it should 184 # cancel! so we can't use it to update the job and make it get claimed 185 for job in self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 186 # this will make all these jobs immediately claimable, i.e. queries 187 # will get cancelled asap 188 self.log.debug("Cancelling interruptable Postgres queries for connection %s..." % job.data["remote_id"]) 189 job.claim() 190 job.release(delay=0, claim_after=0) 191 192 # wait until all cancel jobs are completed 193 while self.queue.get_all_jobs("cancel-pg-query", restrict_claimable=False): 194 time.sleep(0.25) 195 196 # now stop looping (i.e. accepting new jobs) 197 self.looping = False
Stop looping the delegator, clean up, and prepare for shutdown
199 def request_interrupt(self, interrupt_level, job=None, remote_id=None, jobtype=None): 200 """ 201 Interrupt a job 202 203 This method can be called via e.g. the API, to interrupt a specific 204 job's worker. The worker can be targeted either with a Job object or 205 with a combination of job type and remote ID, since those uniquely 206 identify a job. 207 208 :param int interrupt_level: Retry later or cancel? 209 :param Job job: Job object to cancel worker for 210 :param str remote_id: Remote ID for worker job to cancel 211 :param str jobtype: Job type for worker job to cancel 212 """ 213 214 # find worker for given job 215 if job: 216 jobtype = job.data["jobtype"] 217 218 if jobtype not in self.worker_pool: 219 # no jobs of this type currently known 220 return 221 222 for worker in self.worker_pool[jobtype]: 223 if (job and worker.job.data["id"] == job.data["id"]) or (worker.job.data["jobtype"] == jobtype and worker.job.data["remote_id"] == remote_id): 224 # first cancel any interruptable queries for this job's worker 225 while True: 226 active_queries = self.queue.get_all_jobs("cancel-pg-query", remote_id=worker.db.appname, restrict_claimable=False) 227 if not active_queries: 228 # all cancellation jobs have been run 229 break 230 231 for cancel_job in active_queries: 232 if cancel_job.is_claimed: 233 continue 234 235 # this will make the job be run asap 236 cancel_job.claim() 237 cancel_job.release(delay=0, claim_after=0) 238 239 # give the cancel job a moment to run 240 time.sleep(0.25) 241 242 # now all queries are interrupted, formally request an abort 243 self.log.info(f"Requesting interrupt of job {worker.job.data['id']} ({worker.job.data['jobtype']}/{worker.job.data['remote_id']})") 244 worker.request_interrupt(interrupt_level) 245 return
Interrupt a job
This method can be called via e.g. the API, to interrupt a specific job's worker. The worker can be targeted either with a Job object or with a combination of job type and remote ID, since those uniquely identify a job.
Parameters
- int interrupt_level: Retry later or cancel?
- Job job: Job object to cancel worker for
- str remote_id: Remote ID for worker job to cancel
- str jobtype: Job type for worker job to cancel