backend.lib.worker
Worker class that all workers should implement
1""" 2Worker class that all workers should implement 3""" 4import traceback 5import threading 6import time 7import abc 8 9from common.lib.queue import JobQueue 10from common.lib.database import Database 11from common.lib.exceptions import WorkerInterruptedException, ProcessorException 12from common.config_manager import ConfigWrapper 13 14class BasicWorker(threading.Thread, metaclass=abc.ABCMeta): 15 """ 16 Abstract Worker class 17 18 This runs as a separate thread in which a worker method is executed. The 19 work method can do whatever the worker needs to do - that part is to be 20 implemented by a child class. This class provides scaffolding that makes 21 sure crashes are caught properly and the relevant data is available to the 22 worker code. 23 """ 24 #: Worker type - should match Job ID used when queuing jobs 25 type = "misc" 26 27 #: Amount of workers of this type that can run in parallel. Be careful with 28 #: this, because values higher than 1 will mean that e.g. API rate limits 29 #: are easily violated. 30 max_workers = 1 31 32 #: Flag value to indicate worker interruption type - not interrupted 33 INTERRUPT_NONE = False 34 35 #: Flag value to indicate worker interruption type - interrupted, but can 36 #: be retried 37 INTERRUPT_RETRY = 1 38 39 #: Flag value to indicate worker interruption type - interrupted, but 40 #: should be cancelled 41 INTERRUPT_CANCEL = 2 42 43 #: Job queue that can be used to create or manipulate jobs 44 queue = None 45 46 #: Job this worker is being run for 47 job = None 48 49 #: Local configuration (used in processors) 50 config = None 51 52 #: Logger object 53 log = None 54 55 #: WorkerManager that manages this worker 56 manager = None 57 58 #: Interrupt status, one of the `INTERRUPT_` class constants 59 interrupted = False 60 61 #: Module index 62 modules = None 63 64 #: Unix timestamp at which this worker was started 65 init_time = 0 66 67 def __init__(self, logger, job, queue=None, manager=None, modules=None): 68 """ 69 Worker init 70 71 Set up object attributes, e.g. the worker queue and manager, and 72 initialize a new database connection and connected job queue. We cannot 73 share database connections between workers because they are not 74 thread-safe. 75 76 :param Logger logger: Logging interface 77 :param Job job: Job this worker is being run on 78 :param JobQueue queue: Job queue 79 :param WorkerManager manager: Scheduler instance that started this worker 80 :param modules: Module catalog 81 """ 82 super().__init__() 83 self.name = self.type 84 self.log = logger 85 self.manager = manager 86 self.job = job 87 self.init_time = int(time.time()) 88 self.queue = queue 89 90 # ModuleCollector cannot be easily imported into a worker because it itself 91 # imports all workers, so you get a recursive import that Python (rightly) blocks 92 # so for workers, modules data is passed as a constructor argument 93 self.modules = modules 94 95 def run(self): 96 """ 97 Run the worker 98 99 This calls the `work()` method, quite simply, but adds some 100 scaffolding to take care of any exceptions that occur during the 101 execution of the worker. The exception is then logged and the worker 102 is gracefully ended, but the job is *not* released to ensure that the 103 job is not run again immediately (which would probably instantly crash 104 in the exact same way). 105 106 There is also some set-up to ensure that thread-unsafe objects such as 107 the config reader are only initialised once in a threaded context, i.e. 108 once this method is run. 109 110 You can configure the `WARN_SLACK_URL` configuration variable to make 111 reports of worker crashers be sent to a Slack channel, which is a good 112 way to monitor a running 4CAT instance! 113 """ 114 try: 115 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 116 self.config = ConfigWrapper(self.modules.config) 117 self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT) 118 self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue 119 self.work() 120 except WorkerInterruptedException: 121 self.log.info("Worker %s interrupted - cancelling." % self.type) 122 123 # interrupted - retry later or cancel job altogether? 124 if self.interrupted == self.INTERRUPT_RETRY: 125 self.job.release(delay=10) 126 elif self.interrupted == self.INTERRUPT_CANCEL: 127 self.job.finish() 128 129 self.abort() 130 except ProcessorException as e: 131 self.log.error(str(e), frame=e.frame) 132 except Exception as e: 133 stack = traceback.extract_tb(e.__traceback__) 134 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack] 135 location = "->".join(frames) 136 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack) 137 138 # Clean up after work successfully completed or terminates 139 self.clean_up() 140 141 # explicitly close database connection as soon as it's possible 142 self.db.close() 143 144 def clean_up(self): 145 """ 146 Clean up after a processor runs successfully or results in error. 147 Workers should override this method to implement any procedures 148 to run to clean up a worker; by default this does nothing. 149 """ 150 pass 151 152 def abort(self): 153 """ 154 Called when the application shuts down 155 156 Can be used to stop loops, for looping workers. Workers should override 157 this method to implement any procedures to run to clean up a worker 158 when it is interrupted; by default this does nothing. 159 """ 160 pass 161 162 def request_interrupt(self, level=1): 163 """ 164 Set the 'abort requested' flag 165 166 Child workers should quit at their earliest convenience when this is 167 set. This can be done simply by checking the value of 168 `self.interrupted`. 169 170 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 171 `self.INTERRUPT_CANCEL`. 172 """ 173 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 174 self.interrupted = level 175 176 @abc.abstractmethod 177 def work(self): 178 """ 179 This is where the actual work happens 180 181 Whatever the worker is supposed to do, it should happen (or be 182 initiated from) this method. By default it does nothing, descending 183 classes should implement this method. 184 """ 185 pass 186 187 @staticmethod 188 def is_4cat_class(): 189 """ 190 Is this a 4CAT class? 191 192 This is used to determine whether a class is a 4CAT worker or a 193 processor. This method should always return True for workers. 194 195 :return: True 196 """ 197 return True 198 199 @staticmethod 200 def is_4cat_processor(): 201 """ 202 Is this a 4CAT processor? 203 204 This is used to determine whether a class is a 4CAT 205 processor. 206 207 :return: False 208 """ 209 return False
15class BasicWorker(threading.Thread, metaclass=abc.ABCMeta): 16 """ 17 Abstract Worker class 18 19 This runs as a separate thread in which a worker method is executed. The 20 work method can do whatever the worker needs to do - that part is to be 21 implemented by a child class. This class provides scaffolding that makes 22 sure crashes are caught properly and the relevant data is available to the 23 worker code. 24 """ 25 #: Worker type - should match Job ID used when queuing jobs 26 type = "misc" 27 28 #: Amount of workers of this type that can run in parallel. Be careful with 29 #: this, because values higher than 1 will mean that e.g. API rate limits 30 #: are easily violated. 31 max_workers = 1 32 33 #: Flag value to indicate worker interruption type - not interrupted 34 INTERRUPT_NONE = False 35 36 #: Flag value to indicate worker interruption type - interrupted, but can 37 #: be retried 38 INTERRUPT_RETRY = 1 39 40 #: Flag value to indicate worker interruption type - interrupted, but 41 #: should be cancelled 42 INTERRUPT_CANCEL = 2 43 44 #: Job queue that can be used to create or manipulate jobs 45 queue = None 46 47 #: Job this worker is being run for 48 job = None 49 50 #: Local configuration (used in processors) 51 config = None 52 53 #: Logger object 54 log = None 55 56 #: WorkerManager that manages this worker 57 manager = None 58 59 #: Interrupt status, one of the `INTERRUPT_` class constants 60 interrupted = False 61 62 #: Module index 63 modules = None 64 65 #: Unix timestamp at which this worker was started 66 init_time = 0 67 68 def __init__(self, logger, job, queue=None, manager=None, modules=None): 69 """ 70 Worker init 71 72 Set up object attributes, e.g. the worker queue and manager, and 73 initialize a new database connection and connected job queue. We cannot 74 share database connections between workers because they are not 75 thread-safe. 76 77 :param Logger logger: Logging interface 78 :param Job job: Job this worker is being run on 79 :param JobQueue queue: Job queue 80 :param WorkerManager manager: Scheduler instance that started this worker 81 :param modules: Module catalog 82 """ 83 super().__init__() 84 self.name = self.type 85 self.log = logger 86 self.manager = manager 87 self.job = job 88 self.init_time = int(time.time()) 89 self.queue = queue 90 91 # ModuleCollector cannot be easily imported into a worker because it itself 92 # imports all workers, so you get a recursive import that Python (rightly) blocks 93 # so for workers, modules data is passed as a constructor argument 94 self.modules = modules 95 96 def run(self): 97 """ 98 Run the worker 99 100 This calls the `work()` method, quite simply, but adds some 101 scaffolding to take care of any exceptions that occur during the 102 execution of the worker. The exception is then logged and the worker 103 is gracefully ended, but the job is *not* released to ensure that the 104 job is not run again immediately (which would probably instantly crash 105 in the exact same way). 106 107 There is also some set-up to ensure that thread-unsafe objects such as 108 the config reader are only initialised once in a threaded context, i.e. 109 once this method is run. 110 111 You can configure the `WARN_SLACK_URL` configuration variable to make 112 reports of worker crashers be sent to a Slack channel, which is a good 113 way to monitor a running 4CAT instance! 114 """ 115 try: 116 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 117 self.config = ConfigWrapper(self.modules.config) 118 self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT) 119 self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue 120 self.work() 121 except WorkerInterruptedException: 122 self.log.info("Worker %s interrupted - cancelling." % self.type) 123 124 # interrupted - retry later or cancel job altogether? 125 if self.interrupted == self.INTERRUPT_RETRY: 126 self.job.release(delay=10) 127 elif self.interrupted == self.INTERRUPT_CANCEL: 128 self.job.finish() 129 130 self.abort() 131 except ProcessorException as e: 132 self.log.error(str(e), frame=e.frame) 133 except Exception as e: 134 stack = traceback.extract_tb(e.__traceback__) 135 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack] 136 location = "->".join(frames) 137 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack) 138 139 # Clean up after work successfully completed or terminates 140 self.clean_up() 141 142 # explicitly close database connection as soon as it's possible 143 self.db.close() 144 145 def clean_up(self): 146 """ 147 Clean up after a processor runs successfully or results in error. 148 Workers should override this method to implement any procedures 149 to run to clean up a worker; by default this does nothing. 150 """ 151 pass 152 153 def abort(self): 154 """ 155 Called when the application shuts down 156 157 Can be used to stop loops, for looping workers. Workers should override 158 this method to implement any procedures to run to clean up a worker 159 when it is interrupted; by default this does nothing. 160 """ 161 pass 162 163 def request_interrupt(self, level=1): 164 """ 165 Set the 'abort requested' flag 166 167 Child workers should quit at their earliest convenience when this is 168 set. This can be done simply by checking the value of 169 `self.interrupted`. 170 171 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 172 `self.INTERRUPT_CANCEL`. 173 """ 174 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 175 self.interrupted = level 176 177 @abc.abstractmethod 178 def work(self): 179 """ 180 This is where the actual work happens 181 182 Whatever the worker is supposed to do, it should happen (or be 183 initiated from) this method. By default it does nothing, descending 184 classes should implement this method. 185 """ 186 pass 187 188 @staticmethod 189 def is_4cat_class(): 190 """ 191 Is this a 4CAT class? 192 193 This is used to determine whether a class is a 4CAT worker or a 194 processor. This method should always return True for workers. 195 196 :return: True 197 """ 198 return True 199 200 @staticmethod 201 def is_4cat_processor(): 202 """ 203 Is this a 4CAT processor? 204 205 This is used to determine whether a class is a 4CAT 206 processor. 207 208 :return: False 209 """ 210 return False
Abstract Worker class
This runs as a separate thread in which a worker method is executed. The work method can do whatever the worker needs to do - that part is to be implemented by a child class. This class provides scaffolding that makes sure crashes are caught properly and the relevant data is available to the worker code.
68 def __init__(self, logger, job, queue=None, manager=None, modules=None): 69 """ 70 Worker init 71 72 Set up object attributes, e.g. the worker queue and manager, and 73 initialize a new database connection and connected job queue. We cannot 74 share database connections between workers because they are not 75 thread-safe. 76 77 :param Logger logger: Logging interface 78 :param Job job: Job this worker is being run on 79 :param JobQueue queue: Job queue 80 :param WorkerManager manager: Scheduler instance that started this worker 81 :param modules: Module catalog 82 """ 83 super().__init__() 84 self.name = self.type 85 self.log = logger 86 self.manager = manager 87 self.job = job 88 self.init_time = int(time.time()) 89 self.queue = queue 90 91 # ModuleCollector cannot be easily imported into a worker because it itself 92 # imports all workers, so you get a recursive import that Python (rightly) blocks 93 # so for workers, modules data is passed as a constructor argument 94 self.modules = modules
Worker init
Set up object attributes, e.g. the worker queue and manager, and initialize a new database connection and connected job queue. We cannot share database connections between workers because they are not thread-safe.
Parameters
- Logger logger: Logging interface
- Job job: Job this worker is being run on
- JobQueue queue: Job queue
- WorkerManager manager: Scheduler instance that started this worker
- modules: Module catalog
1146 @property 1147 def name(self): 1148 """A string used for identification purposes only. 1149 1150 It has no semantics. Multiple threads may be given the same name. The 1151 initial name is set by the constructor. 1152 1153 """ 1154 assert self._initialized, "Thread.__init__() not called" 1155 return self._name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
96 def run(self): 97 """ 98 Run the worker 99 100 This calls the `work()` method, quite simply, but adds some 101 scaffolding to take care of any exceptions that occur during the 102 execution of the worker. The exception is then logged and the worker 103 is gracefully ended, but the job is *not* released to ensure that the 104 job is not run again immediately (which would probably instantly crash 105 in the exact same way). 106 107 There is also some set-up to ensure that thread-unsafe objects such as 108 the config reader are only initialised once in a threaded context, i.e. 109 once this method is run. 110 111 You can configure the `WARN_SLACK_URL` configuration variable to make 112 reports of worker crashers be sent to a Slack channel, which is a good 113 way to monitor a running 4CAT instance! 114 """ 115 try: 116 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 117 self.config = ConfigWrapper(self.modules.config) 118 self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT) 119 self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue 120 self.work() 121 except WorkerInterruptedException: 122 self.log.info("Worker %s interrupted - cancelling." % self.type) 123 124 # interrupted - retry later or cancel job altogether? 125 if self.interrupted == self.INTERRUPT_RETRY: 126 self.job.release(delay=10) 127 elif self.interrupted == self.INTERRUPT_CANCEL: 128 self.job.finish() 129 130 self.abort() 131 except ProcessorException as e: 132 self.log.error(str(e), frame=e.frame) 133 except Exception as e: 134 stack = traceback.extract_tb(e.__traceback__) 135 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack] 136 location = "->".join(frames) 137 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack) 138 139 # Clean up after work successfully completed or terminates 140 self.clean_up() 141 142 # explicitly close database connection as soon as it's possible 143 self.db.close()
Run the worker
This calls the work()
method, quite simply, but adds some
scaffolding to take care of any exceptions that occur during the
execution of the worker. The exception is then logged and the worker
is gracefully ended, but the job is not released to ensure that the
job is not run again immediately (which would probably instantly crash
in the exact same way).
There is also some set-up to ensure that thread-unsafe objects such as the config reader are only initialised once in a threaded context, i.e. once this method is run.
You can configure the WARN_SLACK_URL
configuration variable to make
reports of worker crashers be sent to a Slack channel, which is a good
way to monitor a running 4CAT instance!
145 def clean_up(self): 146 """ 147 Clean up after a processor runs successfully or results in error. 148 Workers should override this method to implement any procedures 149 to run to clean up a worker; by default this does nothing. 150 """ 151 pass
Clean up after a processor runs successfully or results in error. Workers should override this method to implement any procedures to run to clean up a worker; by default this does nothing.
153 def abort(self): 154 """ 155 Called when the application shuts down 156 157 Can be used to stop loops, for looping workers. Workers should override 158 this method to implement any procedures to run to clean up a worker 159 when it is interrupted; by default this does nothing. 160 """ 161 pass
Called when the application shuts down
Can be used to stop loops, for looping workers. Workers should override this method to implement any procedures to run to clean up a worker when it is interrupted; by default this does nothing.
163 def request_interrupt(self, level=1): 164 """ 165 Set the 'abort requested' flag 166 167 Child workers should quit at their earliest convenience when this is 168 set. This can be done simply by checking the value of 169 `self.interrupted`. 170 171 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 172 `self.INTERRUPT_CANCEL`. 173 """ 174 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 175 self.interrupted = level
Set the 'abort requested' flag
Child workers should quit at their earliest convenience when this is
set. This can be done simply by checking the value of
self.interrupted
.
Parameters
- int level: Retry or cancel? Either
self.INTERRUPT_RETRY
orself.INTERRUPT_CANCEL
.
177 @abc.abstractmethod 178 def work(self): 179 """ 180 This is where the actual work happens 181 182 Whatever the worker is supposed to do, it should happen (or be 183 initiated from) this method. By default it does nothing, descending 184 classes should implement this method. 185 """ 186 pass
This is where the actual work happens
Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.
188 @staticmethod 189 def is_4cat_class(): 190 """ 191 Is this a 4CAT class? 192 193 This is used to determine whether a class is a 4CAT worker or a 194 processor. This method should always return True for workers. 195 196 :return: True 197 """ 198 return True
Is this a 4CAT class?
This is used to determine whether a class is a 4CAT worker or a processor. This method should always return True for workers.
Returns
True
200 @staticmethod 201 def is_4cat_processor(): 202 """ 203 Is this a 4CAT processor? 204 205 This is used to determine whether a class is a 4CAT 206 processor. 207 208 :return: False 209 """ 210 return False
Is this a 4CAT processor?
This is used to determine whether a class is a 4CAT processor.
Returns
False