backend.lib.worker
Worker class that all workers should implement
1""" 2Worker class that all workers should implement 3""" 4import traceback 5import threading 6import time 7import abc 8 9from common.lib.queue import JobQueue 10from common.lib.database import Database 11from common.lib.exceptions import WorkerInterruptedException, ProcessorException 12from common.config_manager import ConfigWrapper 13 14class BasicWorker(threading.Thread, metaclass=abc.ABCMeta): 15 """ 16 Abstract Worker class 17 18 This runs as a separate thread in which a worker method is executed. The 19 work method can do whatever the worker needs to do - that part is to be 20 implemented by a child class. This class provides scaffolding that makes 21 sure crashes are caught properly and the relevant data is available to the 22 worker code. 23 """ 24 #: Worker type - should match Job ID used when queuing jobs 25 type = "misc" 26 27 #: Amount of workers of this type that can run in parallel. Be careful with 28 #: this, because values higher than 1 will mean that e.g. API rate limits 29 #: are easily violated. 30 max_workers = 1 31 32 #: Flag value to indicate worker interruption type - not interrupted 33 INTERRUPT_NONE = False 34 35 #: Flag value to indicate worker interruption type - interrupted, but can 36 #: be retried 37 INTERRUPT_RETRY = 1 38 39 #: Flag value to indicate worker interruption type - interrupted, but 40 #: should be cancelled 41 INTERRUPT_CANCEL = 2 42 43 #: Job queue that can be used to create or manipulate jobs 44 queue = None 45 46 #: Job this worker is being run for 47 job = None 48 49 #: Local configuration (used in processors) 50 config = None 51 52 #: Logger object 53 log = None 54 55 #: WorkerManager that manages this worker 56 manager = None 57 58 #: Interrupt status, one of the `INTERRUPT_` class constants 59 interrupted = False 60 61 #: Module index 62 modules = None 63 64 #: Unix timestamp at which this worker was started 65 init_time = 0 66 67 def __init__(self, logger, job, queue=None, manager=None, modules=None): 68 """ 69 Worker init 70 71 Set up object attributes, e.g. the worker queue and manager, and 72 initialize a new database connection and connected job queue. We cannot 73 share database connections between workers because they are not 74 thread-safe. 75 76 :param Logger logger: Logging interface 77 :param Job job: Job this worker is being run on 78 :param JobQueue queue: Job queue 79 :param WorkerManager manager: Scheduler instance that started this worker 80 :param modules: Module catalog 81 """ 82 super().__init__() 83 self.name = self.type 84 self.log = logger 85 self.manager = manager 86 self.job = job 87 self.init_time = int(time.time()) 88 self.queue = queue 89 90 # ModuleCollector cannot be easily imported into a worker because it itself 91 # imports all workers, so you get a recursive import that Python (rightly) blocks 92 # so for workers, modules data is passed as a constructor argument 93 self.modules = modules 94 95 def run(self): 96 """ 97 Run the worker 98 99 This calls the `work()` method, quite simply, but adds some 100 scaffolding to take care of any exceptions that occur during the 101 execution of the worker. The exception is then logged and the worker 102 is gracefully ended, but the job is *not* released to ensure that the 103 job is not run again immediately (which would probably instantly crash 104 in the exact same way). 105 106 There is also some set-up to ensure that thread-unsafe objects such as 107 the config reader are only initialised once in a threaded context, i.e. 108 once this method is run. 109 110 You can configure the `WARN_SLACK_URL` configuration variable to make 111 reports of worker crashers be sent to a Slack channel, which is a good 112 way to monitor a running 4CAT instance! 113 """ 114 try: 115 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 116 self.config = ConfigWrapper(self.modules.config) 117 self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT) 118 self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue 119 self.work() 120 except WorkerInterruptedException: 121 self.log.info("Worker %s interrupted - cancelling." % self.type) 122 123 # interrupted - retry later or cancel job altogether? 124 if self.interrupted == self.INTERRUPT_RETRY: 125 self.job.release(delay=10) 126 elif self.interrupted == self.INTERRUPT_CANCEL: 127 self.job.finish() 128 129 self.abort() 130 except ProcessorException as e: 131 self.log.error(str(e), frame=e.frame) 132 except Exception as e: 133 stack = traceback.extract_tb(e.__traceback__) 134 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack] 135 location = "->".join(frames) 136 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack) 137 138 # Clean up after work successfully completed or terminates 139 self.clean_up() 140 141 def clean_up(self): 142 """ 143 Clean up after a processor runs successfully or results in error. 144 Workers should override this method to implement any procedures 145 to run to clean up a worker; by default this does nothing. 146 """ 147 pass 148 149 def abort(self): 150 """ 151 Called when the application shuts down 152 153 Can be used to stop loops, for looping workers. Workers should override 154 this method to implement any procedures to run to clean up a worker 155 when it is interrupted; by default this does nothing. 156 """ 157 pass 158 159 def request_interrupt(self, level=1): 160 """ 161 Set the 'abort requested' flag 162 163 Child workers should quit at their earliest convenience when this is 164 set. This can be done simply by checking the value of 165 `self.interrupted`. 166 167 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 168 `self.INTERRUPT_CANCEL`. 169 """ 170 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 171 self.interrupted = level 172 173 @abc.abstractmethod 174 def work(self): 175 """ 176 This is where the actual work happens 177 178 Whatever the worker is supposed to do, it should happen (or be 179 initiated from) this method. By default it does nothing, descending 180 classes should implement this method. 181 """ 182 pass 183 184 @staticmethod 185 def is_4cat_class(): 186 """ 187 Is this a 4CAT class? 188 189 This is used to determine whether a class is a 4CAT worker or a 190 processor. This method should always return True for workers. 191 192 :return: True 193 """ 194 return True 195 196 @staticmethod 197 def is_4cat_processor(): 198 """ 199 Is this a 4CAT processor? 200 201 This is used to determine whether a class is a 4CAT 202 processor. 203 204 :return: False 205 """ 206 return False
15class BasicWorker(threading.Thread, metaclass=abc.ABCMeta): 16 """ 17 Abstract Worker class 18 19 This runs as a separate thread in which a worker method is executed. The 20 work method can do whatever the worker needs to do - that part is to be 21 implemented by a child class. This class provides scaffolding that makes 22 sure crashes are caught properly and the relevant data is available to the 23 worker code. 24 """ 25 #: Worker type - should match Job ID used when queuing jobs 26 type = "misc" 27 28 #: Amount of workers of this type that can run in parallel. Be careful with 29 #: this, because values higher than 1 will mean that e.g. API rate limits 30 #: are easily violated. 31 max_workers = 1 32 33 #: Flag value to indicate worker interruption type - not interrupted 34 INTERRUPT_NONE = False 35 36 #: Flag value to indicate worker interruption type - interrupted, but can 37 #: be retried 38 INTERRUPT_RETRY = 1 39 40 #: Flag value to indicate worker interruption type - interrupted, but 41 #: should be cancelled 42 INTERRUPT_CANCEL = 2 43 44 #: Job queue that can be used to create or manipulate jobs 45 queue = None 46 47 #: Job this worker is being run for 48 job = None 49 50 #: Local configuration (used in processors) 51 config = None 52 53 #: Logger object 54 log = None 55 56 #: WorkerManager that manages this worker 57 manager = None 58 59 #: Interrupt status, one of the `INTERRUPT_` class constants 60 interrupted = False 61 62 #: Module index 63 modules = None 64 65 #: Unix timestamp at which this worker was started 66 init_time = 0 67 68 def __init__(self, logger, job, queue=None, manager=None, modules=None): 69 """ 70 Worker init 71 72 Set up object attributes, e.g. the worker queue and manager, and 73 initialize a new database connection and connected job queue. We cannot 74 share database connections between workers because they are not 75 thread-safe. 76 77 :param Logger logger: Logging interface 78 :param Job job: Job this worker is being run on 79 :param JobQueue queue: Job queue 80 :param WorkerManager manager: Scheduler instance that started this worker 81 :param modules: Module catalog 82 """ 83 super().__init__() 84 self.name = self.type 85 self.log = logger 86 self.manager = manager 87 self.job = job 88 self.init_time = int(time.time()) 89 self.queue = queue 90 91 # ModuleCollector cannot be easily imported into a worker because it itself 92 # imports all workers, so you get a recursive import that Python (rightly) blocks 93 # so for workers, modules data is passed as a constructor argument 94 self.modules = modules 95 96 def run(self): 97 """ 98 Run the worker 99 100 This calls the `work()` method, quite simply, but adds some 101 scaffolding to take care of any exceptions that occur during the 102 execution of the worker. The exception is then logged and the worker 103 is gracefully ended, but the job is *not* released to ensure that the 104 job is not run again immediately (which would probably instantly crash 105 in the exact same way). 106 107 There is also some set-up to ensure that thread-unsafe objects such as 108 the config reader are only initialised once in a threaded context, i.e. 109 once this method is run. 110 111 You can configure the `WARN_SLACK_URL` configuration variable to make 112 reports of worker crashers be sent to a Slack channel, which is a good 113 way to monitor a running 4CAT instance! 114 """ 115 try: 116 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 117 self.config = ConfigWrapper(self.modules.config) 118 self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT) 119 self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue 120 self.work() 121 except WorkerInterruptedException: 122 self.log.info("Worker %s interrupted - cancelling." % self.type) 123 124 # interrupted - retry later or cancel job altogether? 125 if self.interrupted == self.INTERRUPT_RETRY: 126 self.job.release(delay=10) 127 elif self.interrupted == self.INTERRUPT_CANCEL: 128 self.job.finish() 129 130 self.abort() 131 except ProcessorException as e: 132 self.log.error(str(e), frame=e.frame) 133 except Exception as e: 134 stack = traceback.extract_tb(e.__traceback__) 135 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack] 136 location = "->".join(frames) 137 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack) 138 139 # Clean up after work successfully completed or terminates 140 self.clean_up() 141 142 def clean_up(self): 143 """ 144 Clean up after a processor runs successfully or results in error. 145 Workers should override this method to implement any procedures 146 to run to clean up a worker; by default this does nothing. 147 """ 148 pass 149 150 def abort(self): 151 """ 152 Called when the application shuts down 153 154 Can be used to stop loops, for looping workers. Workers should override 155 this method to implement any procedures to run to clean up a worker 156 when it is interrupted; by default this does nothing. 157 """ 158 pass 159 160 def request_interrupt(self, level=1): 161 """ 162 Set the 'abort requested' flag 163 164 Child workers should quit at their earliest convenience when this is 165 set. This can be done simply by checking the value of 166 `self.interrupted`. 167 168 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 169 `self.INTERRUPT_CANCEL`. 170 """ 171 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 172 self.interrupted = level 173 174 @abc.abstractmethod 175 def work(self): 176 """ 177 This is where the actual work happens 178 179 Whatever the worker is supposed to do, it should happen (or be 180 initiated from) this method. By default it does nothing, descending 181 classes should implement this method. 182 """ 183 pass 184 185 @staticmethod 186 def is_4cat_class(): 187 """ 188 Is this a 4CAT class? 189 190 This is used to determine whether a class is a 4CAT worker or a 191 processor. This method should always return True for workers. 192 193 :return: True 194 """ 195 return True 196 197 @staticmethod 198 def is_4cat_processor(): 199 """ 200 Is this a 4CAT processor? 201 202 This is used to determine whether a class is a 4CAT 203 processor. 204 205 :return: False 206 """ 207 return False
Abstract Worker class
This runs as a separate thread in which a worker method is executed. The work method can do whatever the worker needs to do - that part is to be implemented by a child class. This class provides scaffolding that makes sure crashes are caught properly and the relevant data is available to the worker code.
68 def __init__(self, logger, job, queue=None, manager=None, modules=None): 69 """ 70 Worker init 71 72 Set up object attributes, e.g. the worker queue and manager, and 73 initialize a new database connection and connected job queue. We cannot 74 share database connections between workers because they are not 75 thread-safe. 76 77 :param Logger logger: Logging interface 78 :param Job job: Job this worker is being run on 79 :param JobQueue queue: Job queue 80 :param WorkerManager manager: Scheduler instance that started this worker 81 :param modules: Module catalog 82 """ 83 super().__init__() 84 self.name = self.type 85 self.log = logger 86 self.manager = manager 87 self.job = job 88 self.init_time = int(time.time()) 89 self.queue = queue 90 91 # ModuleCollector cannot be easily imported into a worker because it itself 92 # imports all workers, so you get a recursive import that Python (rightly) blocks 93 # so for workers, modules data is passed as a constructor argument 94 self.modules = modules
Worker init
Set up object attributes, e.g. the worker queue and manager, and initialize a new database connection and connected job queue. We cannot share database connections between workers because they are not thread-safe.
Parameters
- Logger logger: Logging interface
- Job job: Job this worker is being run on
- JobQueue queue: Job queue
- WorkerManager manager: Scheduler instance that started this worker
- modules: Module catalog
1146 @property 1147 def name(self): 1148 """A string used for identification purposes only. 1149 1150 It has no semantics. Multiple threads may be given the same name. The 1151 initial name is set by the constructor. 1152 1153 """ 1154 assert self._initialized, "Thread.__init__() not called" 1155 return self._name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
96 def run(self): 97 """ 98 Run the worker 99 100 This calls the `work()` method, quite simply, but adds some 101 scaffolding to take care of any exceptions that occur during the 102 execution of the worker. The exception is then logged and the worker 103 is gracefully ended, but the job is *not* released to ensure that the 104 job is not run again immediately (which would probably instantly crash 105 in the exact same way). 106 107 There is also some set-up to ensure that thread-unsafe objects such as 108 the config reader are only initialised once in a threaded context, i.e. 109 once this method is run. 110 111 You can configure the `WARN_SLACK_URL` configuration variable to make 112 reports of worker crashers be sent to a Slack channel, which is a good 113 way to monitor a running 4CAT instance! 114 """ 115 try: 116 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 117 self.config = ConfigWrapper(self.modules.config) 118 self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT) 119 self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue 120 self.work() 121 except WorkerInterruptedException: 122 self.log.info("Worker %s interrupted - cancelling." % self.type) 123 124 # interrupted - retry later or cancel job altogether? 125 if self.interrupted == self.INTERRUPT_RETRY: 126 self.job.release(delay=10) 127 elif self.interrupted == self.INTERRUPT_CANCEL: 128 self.job.finish() 129 130 self.abort() 131 except ProcessorException as e: 132 self.log.error(str(e), frame=e.frame) 133 except Exception as e: 134 stack = traceback.extract_tb(e.__traceback__) 135 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack] 136 location = "->".join(frames) 137 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack) 138 139 # Clean up after work successfully completed or terminates 140 self.clean_up()
Run the worker
This calls the work()
method, quite simply, but adds some
scaffolding to take care of any exceptions that occur during the
execution of the worker. The exception is then logged and the worker
is gracefully ended, but the job is not released to ensure that the
job is not run again immediately (which would probably instantly crash
in the exact same way).
There is also some set-up to ensure that thread-unsafe objects such as the config reader are only initialised once in a threaded context, i.e. once this method is run.
You can configure the WARN_SLACK_URL
configuration variable to make
reports of worker crashers be sent to a Slack channel, which is a good
way to monitor a running 4CAT instance!
142 def clean_up(self): 143 """ 144 Clean up after a processor runs successfully or results in error. 145 Workers should override this method to implement any procedures 146 to run to clean up a worker; by default this does nothing. 147 """ 148 pass
Clean up after a processor runs successfully or results in error. Workers should override this method to implement any procedures to run to clean up a worker; by default this does nothing.
150 def abort(self): 151 """ 152 Called when the application shuts down 153 154 Can be used to stop loops, for looping workers. Workers should override 155 this method to implement any procedures to run to clean up a worker 156 when it is interrupted; by default this does nothing. 157 """ 158 pass
Called when the application shuts down
Can be used to stop loops, for looping workers. Workers should override this method to implement any procedures to run to clean up a worker when it is interrupted; by default this does nothing.
160 def request_interrupt(self, level=1): 161 """ 162 Set the 'abort requested' flag 163 164 Child workers should quit at their earliest convenience when this is 165 set. This can be done simply by checking the value of 166 `self.interrupted`. 167 168 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 169 `self.INTERRUPT_CANCEL`. 170 """ 171 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 172 self.interrupted = level
Set the 'abort requested' flag
Child workers should quit at their earliest convenience when this is
set. This can be done simply by checking the value of
self.interrupted
.
Parameters
- int level: Retry or cancel? Either
self.INTERRUPT_RETRY
orself.INTERRUPT_CANCEL
.
174 @abc.abstractmethod 175 def work(self): 176 """ 177 This is where the actual work happens 178 179 Whatever the worker is supposed to do, it should happen (or be 180 initiated from) this method. By default it does nothing, descending 181 classes should implement this method. 182 """ 183 pass
This is where the actual work happens
Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.
185 @staticmethod 186 def is_4cat_class(): 187 """ 188 Is this a 4CAT class? 189 190 This is used to determine whether a class is a 4CAT worker or a 191 processor. This method should always return True for workers. 192 193 :return: True 194 """ 195 return True
Is this a 4CAT class?
This is used to determine whether a class is a 4CAT worker or a processor. This method should always return True for workers.
Returns
True
197 @staticmethod 198 def is_4cat_processor(): 199 """ 200 Is this a 4CAT processor? 201 202 This is used to determine whether a class is a 4CAT 203 processor. 204 205 :return: False 206 """ 207 return False
Is this a 4CAT processor?
This is used to determine whether a class is a 4CAT processor.
Returns
False