backend.lib.worker
Worker class that all workers should implement
1""" 2Worker class that all workers should implement 3""" 4import traceback 5import threading 6import time 7import abc 8 9from common.lib.queue import JobQueue 10from common.lib.database import Database 11from common.lib.exceptions import WorkerInterruptedException, ProcessorException 12from common.config_manager import config, ConfigDummy 13 14 15class BasicWorker(threading.Thread, metaclass=abc.ABCMeta): 16 """ 17 Abstract Worker class 18 19 This runs as a separate thread in which a worker method is executed. The 20 work method can do whatever the worker needs to do - that part is to be 21 implemented by a child class. This class provides scaffolding that makes 22 sure crashes are caught properly and the relevant data is available to the 23 worker code. 24 """ 25 #: Worker type - should match Job ID used when queuing jobs 26 type = "misc" 27 28 #: Amount of workers of this type that can run in parallel. Be careful with 29 #: this, because values higher than 1 will mean that e.g. API rate limits 30 #: are easily violated. 31 max_workers = 1 32 33 #: Flag value to indicate worker interruption type - not interrupted 34 INTERRUPT_NONE = False 35 36 #: Flag value to indicate worker interruption type - interrupted, but can 37 #: be retried 38 INTERRUPT_RETRY = 1 39 40 #: Flag value to indicate worker interruption type - interrupted, but 41 #: should be cancelled 42 INTERRUPT_CANCEL = 2 43 44 #: Job queue that can be used to create or manipulate jobs 45 queue = None 46 47 #: Job this worker is being run for 48 job = None 49 50 #: Local configuration (used in processors) 51 config = None 52 53 #: Logger object 54 log = None 55 56 #: WorkerManager that manages this worker 57 manager = None 58 59 #: Interrupt status, one of the `INTERRUPT_` class constants 60 interrupted = False 61 62 #: Module index 63 modules = None 64 65 #: Unix timestamp at which this worker was started 66 init_time = 0 67 68 def __init__(self, logger, job, queue=None, manager=None, modules=None): 69 """ 70 Worker init 71 72 Set up object attributes, e.g. the worker queue and manager, and 73 initialize a new database connection and connected job queue. We cannot 74 share database connections between workers because they are not 75 thread-safe. 76 77 :param Logger logger: Logging interface 78 :param Job job: Job this worker is being run on 79 :param JobQueue queue: Job queue 80 :param WorkerManager manager: Scheduler instance that started this worker 81 :param modules: Module catalog 82 """ 83 super().__init__() 84 self.name = self.type 85 self.log = logger 86 self.manager = manager 87 self.job = job 88 self.init_time = int(time.time()) 89 self.config = ConfigDummy() 90 91 # ModuleCollector cannot be easily imported into a worker because it itself 92 # imports all workers, so you get a recursive import that Python (rightly) blocks 93 # so for workers, modules data is passed as a constructor argument 94 self.modules = modules 95 96 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 97 self.db = Database(logger=self.log, appname=database_appname, 98 dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, host=config.DB_HOST, port=config.DB_PORT) 99 self.queue = JobQueue(logger=self.log, database=self.db) if not queue else queue 100 101 def run(self): 102 """ 103 Run the worker 104 105 This calls the `work()` method, quite simply, but adds some 106 scaffolding to take care of any exceptions that occur during the 107 execution of the worker. The exception is then logged and the worker 108 is gracefully ended, but the job is *not* released to ensure that the 109 job is not run again immediately (which would probably instantly crash 110 in the exact same way). 111 112 You can configure the `WARN_SLACK_URL` configuration variable to make 113 reports of worker crashers be sent to a Slack channel, which is a good 114 way to monitor a running 4CAT instance! 115 """ 116 try: 117 self.work() 118 except WorkerInterruptedException: 119 self.log.info("Worker %s interrupted - cancelling." % self.type) 120 121 # interrupted - retry later or cancel job altogether? 122 if self.interrupted == self.INTERRUPT_RETRY: 123 self.job.release(delay=10) 124 elif self.interrupted == self.INTERRUPT_CANCEL: 125 self.job.finish() 126 127 self.abort() 128 except ProcessorException as e: 129 self.log.error(str(e), frame=e.frame) 130 except Exception as e: 131 frames = traceback.extract_tb(e.__traceback__) 132 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames] 133 location = "->".join(frames) 134 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location)) 135 136 # Clean up after work successfully completed or terminates 137 self.clean_up() 138 139 def clean_up(self): 140 """ 141 Clean up after a processor runs successfully or results in error. 142 Workers should override this method to implement any procedures 143 to run to clean up a worker; by default this does nothing. 144 """ 145 pass 146 147 def abort(self): 148 """ 149 Called when the application shuts down 150 151 Can be used to stop loops, for looping workers. Workers should override 152 this method to implement any procedures to run to clean up a worker 153 when it is interrupted; by default this does nothing. 154 """ 155 pass 156 157 def request_interrupt(self, level=1): 158 """ 159 Set the 'abort requested' flag 160 161 Child workers should quit at their earliest convenience when this is 162 set. This can be done simply by checking the value of 163 `self.interrupted`. 164 165 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 166 `self.INTERRUPT_CANCEL`. 167 """ 168 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 169 self.interrupted = level 170 171 @abc.abstractmethod 172 def work(self): 173 """ 174 This is where the actual work happens 175 176 Whatever the worker is supposed to do, it should happen (or be 177 initiated from) this method. By default it does nothing, descending 178 classes should implement this method. 179 """ 180 pass 181 182 @staticmethod 183 def is_4cat_class(): 184 """ 185 Is this a 4CAT class? 186 187 This is used to determine whether a class is a 4CAT worker or a 188 processor. This method should always return True for workers. 189 190 :return: True 191 """ 192 return True 193 194 @staticmethod 195 def is_4cat_processor(): 196 """ 197 Is this a 4CAT processor? 198 199 This is used to determine whether a class is a 4CAT 200 processor. 201 202 :return: False 203 """ 204 return False
16class BasicWorker(threading.Thread, metaclass=abc.ABCMeta): 17 """ 18 Abstract Worker class 19 20 This runs as a separate thread in which a worker method is executed. The 21 work method can do whatever the worker needs to do - that part is to be 22 implemented by a child class. This class provides scaffolding that makes 23 sure crashes are caught properly and the relevant data is available to the 24 worker code. 25 """ 26 #: Worker type - should match Job ID used when queuing jobs 27 type = "misc" 28 29 #: Amount of workers of this type that can run in parallel. Be careful with 30 #: this, because values higher than 1 will mean that e.g. API rate limits 31 #: are easily violated. 32 max_workers = 1 33 34 #: Flag value to indicate worker interruption type - not interrupted 35 INTERRUPT_NONE = False 36 37 #: Flag value to indicate worker interruption type - interrupted, but can 38 #: be retried 39 INTERRUPT_RETRY = 1 40 41 #: Flag value to indicate worker interruption type - interrupted, but 42 #: should be cancelled 43 INTERRUPT_CANCEL = 2 44 45 #: Job queue that can be used to create or manipulate jobs 46 queue = None 47 48 #: Job this worker is being run for 49 job = None 50 51 #: Local configuration (used in processors) 52 config = None 53 54 #: Logger object 55 log = None 56 57 #: WorkerManager that manages this worker 58 manager = None 59 60 #: Interrupt status, one of the `INTERRUPT_` class constants 61 interrupted = False 62 63 #: Module index 64 modules = None 65 66 #: Unix timestamp at which this worker was started 67 init_time = 0 68 69 def __init__(self, logger, job, queue=None, manager=None, modules=None): 70 """ 71 Worker init 72 73 Set up object attributes, e.g. the worker queue and manager, and 74 initialize a new database connection and connected job queue. We cannot 75 share database connections between workers because they are not 76 thread-safe. 77 78 :param Logger logger: Logging interface 79 :param Job job: Job this worker is being run on 80 :param JobQueue queue: Job queue 81 :param WorkerManager manager: Scheduler instance that started this worker 82 :param modules: Module catalog 83 """ 84 super().__init__() 85 self.name = self.type 86 self.log = logger 87 self.manager = manager 88 self.job = job 89 self.init_time = int(time.time()) 90 self.config = ConfigDummy() 91 92 # ModuleCollector cannot be easily imported into a worker because it itself 93 # imports all workers, so you get a recursive import that Python (rightly) blocks 94 # so for workers, modules data is passed as a constructor argument 95 self.modules = modules 96 97 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 98 self.db = Database(logger=self.log, appname=database_appname, 99 dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, host=config.DB_HOST, port=config.DB_PORT) 100 self.queue = JobQueue(logger=self.log, database=self.db) if not queue else queue 101 102 def run(self): 103 """ 104 Run the worker 105 106 This calls the `work()` method, quite simply, but adds some 107 scaffolding to take care of any exceptions that occur during the 108 execution of the worker. The exception is then logged and the worker 109 is gracefully ended, but the job is *not* released to ensure that the 110 job is not run again immediately (which would probably instantly crash 111 in the exact same way). 112 113 You can configure the `WARN_SLACK_URL` configuration variable to make 114 reports of worker crashers be sent to a Slack channel, which is a good 115 way to monitor a running 4CAT instance! 116 """ 117 try: 118 self.work() 119 except WorkerInterruptedException: 120 self.log.info("Worker %s interrupted - cancelling." % self.type) 121 122 # interrupted - retry later or cancel job altogether? 123 if self.interrupted == self.INTERRUPT_RETRY: 124 self.job.release(delay=10) 125 elif self.interrupted == self.INTERRUPT_CANCEL: 126 self.job.finish() 127 128 self.abort() 129 except ProcessorException as e: 130 self.log.error(str(e), frame=e.frame) 131 except Exception as e: 132 frames = traceback.extract_tb(e.__traceback__) 133 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames] 134 location = "->".join(frames) 135 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location)) 136 137 # Clean up after work successfully completed or terminates 138 self.clean_up() 139 140 def clean_up(self): 141 """ 142 Clean up after a processor runs successfully or results in error. 143 Workers should override this method to implement any procedures 144 to run to clean up a worker; by default this does nothing. 145 """ 146 pass 147 148 def abort(self): 149 """ 150 Called when the application shuts down 151 152 Can be used to stop loops, for looping workers. Workers should override 153 this method to implement any procedures to run to clean up a worker 154 when it is interrupted; by default this does nothing. 155 """ 156 pass 157 158 def request_interrupt(self, level=1): 159 """ 160 Set the 'abort requested' flag 161 162 Child workers should quit at their earliest convenience when this is 163 set. This can be done simply by checking the value of 164 `self.interrupted`. 165 166 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 167 `self.INTERRUPT_CANCEL`. 168 """ 169 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 170 self.interrupted = level 171 172 @abc.abstractmethod 173 def work(self): 174 """ 175 This is where the actual work happens 176 177 Whatever the worker is supposed to do, it should happen (or be 178 initiated from) this method. By default it does nothing, descending 179 classes should implement this method. 180 """ 181 pass 182 183 @staticmethod 184 def is_4cat_class(): 185 """ 186 Is this a 4CAT class? 187 188 This is used to determine whether a class is a 4CAT worker or a 189 processor. This method should always return True for workers. 190 191 :return: True 192 """ 193 return True 194 195 @staticmethod 196 def is_4cat_processor(): 197 """ 198 Is this a 4CAT processor? 199 200 This is used to determine whether a class is a 4CAT 201 processor. 202 203 :return: False 204 """ 205 return False
Abstract Worker class
This runs as a separate thread in which a worker method is executed. The work method can do whatever the worker needs to do - that part is to be implemented by a child class. This class provides scaffolding that makes sure crashes are caught properly and the relevant data is available to the worker code.
69 def __init__(self, logger, job, queue=None, manager=None, modules=None): 70 """ 71 Worker init 72 73 Set up object attributes, e.g. the worker queue and manager, and 74 initialize a new database connection and connected job queue. We cannot 75 share database connections between workers because they are not 76 thread-safe. 77 78 :param Logger logger: Logging interface 79 :param Job job: Job this worker is being run on 80 :param JobQueue queue: Job queue 81 :param WorkerManager manager: Scheduler instance that started this worker 82 :param modules: Module catalog 83 """ 84 super().__init__() 85 self.name = self.type 86 self.log = logger 87 self.manager = manager 88 self.job = job 89 self.init_time = int(time.time()) 90 self.config = ConfigDummy() 91 92 # ModuleCollector cannot be easily imported into a worker because it itself 93 # imports all workers, so you get a recursive import that Python (rightly) blocks 94 # so for workers, modules data is passed as a constructor argument 95 self.modules = modules 96 97 database_appname = "%s-%s" % (self.type, self.job.data["id"]) 98 self.db = Database(logger=self.log, appname=database_appname, 99 dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, host=config.DB_HOST, port=config.DB_PORT) 100 self.queue = JobQueue(logger=self.log, database=self.db) if not queue else queue
Worker init
Set up object attributes, e.g. the worker queue and manager, and initialize a new database connection and connected job queue. We cannot share database connections between workers because they are not thread-safe.
Parameters
- Logger logger: Logging interface
- Job job: Job this worker is being run on
- JobQueue queue: Job queue
- WorkerManager manager: Scheduler instance that started this worker
- modules: Module catalog
1146 @property 1147 def name(self): 1148 """A string used for identification purposes only. 1149 1150 It has no semantics. Multiple threads may be given the same name. The 1151 initial name is set by the constructor. 1152 1153 """ 1154 assert self._initialized, "Thread.__init__() not called" 1155 return self._name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
102 def run(self): 103 """ 104 Run the worker 105 106 This calls the `work()` method, quite simply, but adds some 107 scaffolding to take care of any exceptions that occur during the 108 execution of the worker. The exception is then logged and the worker 109 is gracefully ended, but the job is *not* released to ensure that the 110 job is not run again immediately (which would probably instantly crash 111 in the exact same way). 112 113 You can configure the `WARN_SLACK_URL` configuration variable to make 114 reports of worker crashers be sent to a Slack channel, which is a good 115 way to monitor a running 4CAT instance! 116 """ 117 try: 118 self.work() 119 except WorkerInterruptedException: 120 self.log.info("Worker %s interrupted - cancelling." % self.type) 121 122 # interrupted - retry later or cancel job altogether? 123 if self.interrupted == self.INTERRUPT_RETRY: 124 self.job.release(delay=10) 125 elif self.interrupted == self.INTERRUPT_CANCEL: 126 self.job.finish() 127 128 self.abort() 129 except ProcessorException as e: 130 self.log.error(str(e), frame=e.frame) 131 except Exception as e: 132 frames = traceback.extract_tb(e.__traceback__) 133 frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in frames] 134 location = "->".join(frames) 135 self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location)) 136 137 # Clean up after work successfully completed or terminates 138 self.clean_up()
Run the worker
This calls the work()
method, quite simply, but adds some
scaffolding to take care of any exceptions that occur during the
execution of the worker. The exception is then logged and the worker
is gracefully ended, but the job is not released to ensure that the
job is not run again immediately (which would probably instantly crash
in the exact same way).
You can configure the WARN_SLACK_URL
configuration variable to make
reports of worker crashers be sent to a Slack channel, which is a good
way to monitor a running 4CAT instance!
140 def clean_up(self): 141 """ 142 Clean up after a processor runs successfully or results in error. 143 Workers should override this method to implement any procedures 144 to run to clean up a worker; by default this does nothing. 145 """ 146 pass
Clean up after a processor runs successfully or results in error. Workers should override this method to implement any procedures to run to clean up a worker; by default this does nothing.
148 def abort(self): 149 """ 150 Called when the application shuts down 151 152 Can be used to stop loops, for looping workers. Workers should override 153 this method to implement any procedures to run to clean up a worker 154 when it is interrupted; by default this does nothing. 155 """ 156 pass
Called when the application shuts down
Can be used to stop loops, for looping workers. Workers should override this method to implement any procedures to run to clean up a worker when it is interrupted; by default this does nothing.
158 def request_interrupt(self, level=1): 159 """ 160 Set the 'abort requested' flag 161 162 Child workers should quit at their earliest convenience when this is 163 set. This can be done simply by checking the value of 164 `self.interrupted`. 165 166 :param int level: Retry or cancel? Either `self.INTERRUPT_RETRY` or 167 `self.INTERRUPT_CANCEL`. 168 """ 169 self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"])) 170 self.interrupted = level
Set the 'abort requested' flag
Child workers should quit at their earliest convenience when this is
set. This can be done simply by checking the value of
self.interrupted
.
Parameters
- int level: Retry or cancel? Either
self.INTERRUPT_RETRY
orself.INTERRUPT_CANCEL
.
172 @abc.abstractmethod 173 def work(self): 174 """ 175 This is where the actual work happens 176 177 Whatever the worker is supposed to do, it should happen (or be 178 initiated from) this method. By default it does nothing, descending 179 classes should implement this method. 180 """ 181 pass
This is where the actual work happens
Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.
183 @staticmethod 184 def is_4cat_class(): 185 """ 186 Is this a 4CAT class? 187 188 This is used to determine whether a class is a 4CAT worker or a 189 processor. This method should always return True for workers. 190 191 :return: True 192 """ 193 return True
Is this a 4CAT class?
This is used to determine whether a class is a 4CAT worker or a processor. This method should always return True for workers.
Returns
True
195 @staticmethod 196 def is_4cat_processor(): 197 """ 198 Is this a 4CAT processor? 199 200 This is used to determine whether a class is a 4CAT 201 processor. 202 203 :return: False 204 """ 205 return False
Is this a 4CAT processor?
This is used to determine whether a class is a 4CAT processor.
Returns
False