Edit on GitHub

backend.lib.worker

Worker class that all workers should implement

  1"""
  2Worker class that all workers should implement
  3"""
  4import traceback
  5import threading
  6import time
  7import abc
  8
  9from common.lib.queue import JobQueue
 10from common.lib.database import Database
 11from common.lib.exceptions import WorkerInterruptedException, ProcessorException
 12from common.config_manager import ConfigWrapper
 13
 14class BasicWorker(threading.Thread, metaclass=abc.ABCMeta):
 15    """
 16    Abstract Worker class
 17
 18    This runs as a separate thread in which a worker method is executed. The
 19    work method can do whatever the worker needs to do - that part is to be
 20    implemented by a child class. This class provides scaffolding that makes
 21    sure crashes are caught properly and the relevant data is available to the
 22    worker code.
 23    """
 24    #: Worker type - should match Job ID used when queuing jobs
 25    type = "misc"
 26
 27    #: Amount of workers of this type that can run in parallel. Be careful with
 28    #: this, because values higher than 1 will mean that e.g. API rate limits
 29    #: are easily violated.
 30    max_workers = 1
 31
 32    #: Flag value to indicate worker interruption type - not interrupted
 33    INTERRUPT_NONE = False
 34
 35    #: Flag value to indicate worker interruption type - interrupted, but can
 36    #: be retried
 37    INTERRUPT_RETRY = 1
 38
 39    #: Flag value to indicate worker interruption type - interrupted, but
 40    #: should be cancelled
 41    INTERRUPT_CANCEL = 2
 42
 43    #: Job queue that can be used to create or manipulate jobs
 44    queue = None
 45
 46    #: Job this worker is being run for
 47    job = None
 48
 49    #: Local configuration (used in processors)
 50    config = None
 51
 52    #: Logger object
 53    log = None
 54
 55    #: WorkerManager that manages this worker
 56    manager = None
 57
 58    #: Interrupt status, one of the `INTERRUPT_` class constants
 59    interrupted = False
 60
 61    #: Module index
 62    modules = None
 63
 64    #: Unix timestamp at which this worker was started
 65    init_time = 0
 66
 67    def __init__(self, logger, job, queue=None, manager=None, modules=None):
 68        """
 69        Worker init
 70
 71        Set up object attributes, e.g. the worker queue and manager, and
 72        initialize a new database connection and connected job queue. We cannot
 73        share database connections between workers because they are not
 74        thread-safe.
 75
 76        :param Logger logger:  Logging interface
 77        :param Job job:  Job this worker is being run on
 78        :param JobQueue queue:  Job queue
 79        :param WorkerManager manager:  Scheduler instance that started this worker
 80        :param modules:  Module catalog
 81        """
 82        super().__init__()
 83        self.name = self.type
 84        self.log = logger
 85        self.manager = manager
 86        self.job = job
 87        self.init_time = int(time.time())
 88        self.queue = queue
 89
 90        # ModuleCollector cannot be easily imported into a worker because it itself
 91        # imports all workers, so you get a recursive import that Python (rightly) blocks
 92        # so for workers, modules data is passed as a constructor argument
 93        self.modules = modules
 94
 95    def run(self):
 96        """
 97        Run the worker
 98
 99        This calls the `work()` method, quite simply, but adds some
100        scaffolding to take care of any exceptions that occur during the
101        execution of the worker. The exception is then logged and the worker
102        is gracefully ended, but the job is *not* released to ensure that the
103        job is not run again immediately (which would probably instantly crash
104        in the exact same way).
105
106        There is also some set-up to ensure that thread-unsafe objects such as
107        the config reader are only initialised once in a threaded context, i.e.
108        once this method is run.
109
110        You can configure the `WARN_SLACK_URL` configuration variable to make
111        reports of worker crashers be sent to a Slack channel, which is a good
112        way to monitor a running 4CAT instance!
113        """
114        try:
115            database_appname = "%s-%s" % (self.type, self.job.data["id"])
116            self.config = ConfigWrapper(self.modules.config)
117            self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT)
118            self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue
119            self.work()
120        except WorkerInterruptedException:
121            self.log.info("Worker %s interrupted - cancelling." % self.type)
122
123            # interrupted - retry later or cancel job altogether?
124            if self.interrupted == self.INTERRUPT_RETRY:
125                self.job.release(delay=10)
126            elif self.interrupted == self.INTERRUPT_CANCEL:
127                self.job.finish()
128
129            self.abort()
130        except ProcessorException as e:
131            self.log.error(str(e), frame=e.frame)
132        except Exception as e:
133            stack = traceback.extract_tb(e.__traceback__)
134            frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
135            location = "->".join(frames)
136            self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
137
138        # Clean up after work successfully completed or terminates
139        self.clean_up()
140
141    def clean_up(self):
142        """
143        Clean up after a processor runs successfully or results in error.
144        Workers should override this method to implement any procedures
145        to run to clean up a worker; by default this does nothing.
146        """
147        pass
148
149    def abort(self):
150        """
151        Called when the application shuts down
152
153        Can be used to stop loops, for looping workers. Workers should override
154        this method to implement any procedures to run to clean up a worker
155        when it is interrupted; by default this does nothing.
156        """
157        pass
158
159    def request_interrupt(self, level=1):
160        """
161        Set the 'abort requested' flag
162
163        Child workers should quit at their earliest convenience when this is
164        set. This can be done simply by checking the value of
165        `self.interrupted`.
166
167        :param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
168          `self.INTERRUPT_CANCEL`.
169        """
170        self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
171        self.interrupted = level
172
173    @abc.abstractmethod
174    def work(self):
175        """
176        This is where the actual work happens
177
178        Whatever the worker is supposed to do, it should happen (or be
179        initiated from) this method. By default it does nothing, descending
180        classes should implement this method.
181        """
182        pass
183
184    @staticmethod
185    def is_4cat_class():
186        """
187        Is this a 4CAT class?
188
189        This is used to determine whether a class is a 4CAT worker or a
190        processor. This method should always return True for workers.
191
192        :return:  True
193        """
194        return True
195
196    @staticmethod
197    def is_4cat_processor():
198        """
199        Is this a 4CAT processor?
200
201        This is used to determine whether a class is a 4CAT
202        processor.
203
204        :return:  False
205        """
206        return False
class BasicWorker(threading.Thread):
 15class BasicWorker(threading.Thread, metaclass=abc.ABCMeta):
 16    """
 17    Abstract Worker class
 18
 19    This runs as a separate thread in which a worker method is executed. The
 20    work method can do whatever the worker needs to do - that part is to be
 21    implemented by a child class. This class provides scaffolding that makes
 22    sure crashes are caught properly and the relevant data is available to the
 23    worker code.
 24    """
 25    #: Worker type - should match Job ID used when queuing jobs
 26    type = "misc"
 27
 28    #: Amount of workers of this type that can run in parallel. Be careful with
 29    #: this, because values higher than 1 will mean that e.g. API rate limits
 30    #: are easily violated.
 31    max_workers = 1
 32
 33    #: Flag value to indicate worker interruption type - not interrupted
 34    INTERRUPT_NONE = False
 35
 36    #: Flag value to indicate worker interruption type - interrupted, but can
 37    #: be retried
 38    INTERRUPT_RETRY = 1
 39
 40    #: Flag value to indicate worker interruption type - interrupted, but
 41    #: should be cancelled
 42    INTERRUPT_CANCEL = 2
 43
 44    #: Job queue that can be used to create or manipulate jobs
 45    queue = None
 46
 47    #: Job this worker is being run for
 48    job = None
 49
 50    #: Local configuration (used in processors)
 51    config = None
 52
 53    #: Logger object
 54    log = None
 55
 56    #: WorkerManager that manages this worker
 57    manager = None
 58
 59    #: Interrupt status, one of the `INTERRUPT_` class constants
 60    interrupted = False
 61
 62    #: Module index
 63    modules = None
 64
 65    #: Unix timestamp at which this worker was started
 66    init_time = 0
 67
 68    def __init__(self, logger, job, queue=None, manager=None, modules=None):
 69        """
 70        Worker init
 71
 72        Set up object attributes, e.g. the worker queue and manager, and
 73        initialize a new database connection and connected job queue. We cannot
 74        share database connections between workers because they are not
 75        thread-safe.
 76
 77        :param Logger logger:  Logging interface
 78        :param Job job:  Job this worker is being run on
 79        :param JobQueue queue:  Job queue
 80        :param WorkerManager manager:  Scheduler instance that started this worker
 81        :param modules:  Module catalog
 82        """
 83        super().__init__()
 84        self.name = self.type
 85        self.log = logger
 86        self.manager = manager
 87        self.job = job
 88        self.init_time = int(time.time())
 89        self.queue = queue
 90
 91        # ModuleCollector cannot be easily imported into a worker because it itself
 92        # imports all workers, so you get a recursive import that Python (rightly) blocks
 93        # so for workers, modules data is passed as a constructor argument
 94        self.modules = modules
 95
 96    def run(self):
 97        """
 98        Run the worker
 99
100        This calls the `work()` method, quite simply, but adds some
101        scaffolding to take care of any exceptions that occur during the
102        execution of the worker. The exception is then logged and the worker
103        is gracefully ended, but the job is *not* released to ensure that the
104        job is not run again immediately (which would probably instantly crash
105        in the exact same way).
106
107        There is also some set-up to ensure that thread-unsafe objects such as
108        the config reader are only initialised once in a threaded context, i.e.
109        once this method is run.
110
111        You can configure the `WARN_SLACK_URL` configuration variable to make
112        reports of worker crashers be sent to a Slack channel, which is a good
113        way to monitor a running 4CAT instance!
114        """
115        try:
116            database_appname = "%s-%s" % (self.type, self.job.data["id"])
117            self.config = ConfigWrapper(self.modules.config)
118            self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT)
119            self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue
120            self.work()
121        except WorkerInterruptedException:
122            self.log.info("Worker %s interrupted - cancelling." % self.type)
123
124            # interrupted - retry later or cancel job altogether?
125            if self.interrupted == self.INTERRUPT_RETRY:
126                self.job.release(delay=10)
127            elif self.interrupted == self.INTERRUPT_CANCEL:
128                self.job.finish()
129
130            self.abort()
131        except ProcessorException as e:
132            self.log.error(str(e), frame=e.frame)
133        except Exception as e:
134            stack = traceback.extract_tb(e.__traceback__)
135            frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
136            location = "->".join(frames)
137            self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
138
139        # Clean up after work successfully completed or terminates
140        self.clean_up()
141
142    def clean_up(self):
143        """
144        Clean up after a processor runs successfully or results in error.
145        Workers should override this method to implement any procedures
146        to run to clean up a worker; by default this does nothing.
147        """
148        pass
149
150    def abort(self):
151        """
152        Called when the application shuts down
153
154        Can be used to stop loops, for looping workers. Workers should override
155        this method to implement any procedures to run to clean up a worker
156        when it is interrupted; by default this does nothing.
157        """
158        pass
159
160    def request_interrupt(self, level=1):
161        """
162        Set the 'abort requested' flag
163
164        Child workers should quit at their earliest convenience when this is
165        set. This can be done simply by checking the value of
166        `self.interrupted`.
167
168        :param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
169          `self.INTERRUPT_CANCEL`.
170        """
171        self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
172        self.interrupted = level
173
174    @abc.abstractmethod
175    def work(self):
176        """
177        This is where the actual work happens
178
179        Whatever the worker is supposed to do, it should happen (or be
180        initiated from) this method. By default it does nothing, descending
181        classes should implement this method.
182        """
183        pass
184
185    @staticmethod
186    def is_4cat_class():
187        """
188        Is this a 4CAT class?
189
190        This is used to determine whether a class is a 4CAT worker or a
191        processor. This method should always return True for workers.
192
193        :return:  True
194        """
195        return True
196
197    @staticmethod
198    def is_4cat_processor():
199        """
200        Is this a 4CAT processor?
201
202        This is used to determine whether a class is a 4CAT
203        processor.
204
205        :return:  False
206        """
207        return False

Abstract Worker class

This runs as a separate thread in which a worker method is executed. The work method can do whatever the worker needs to do - that part is to be implemented by a child class. This class provides scaffolding that makes sure crashes are caught properly and the relevant data is available to the worker code.

BasicWorker(logger, job, queue=None, manager=None, modules=None)
68    def __init__(self, logger, job, queue=None, manager=None, modules=None):
69        """
70        Worker init
71
72        Set up object attributes, e.g. the worker queue and manager, and
73        initialize a new database connection and connected job queue. We cannot
74        share database connections between workers because they are not
75        thread-safe.
76
77        :param Logger logger:  Logging interface
78        :param Job job:  Job this worker is being run on
79        :param JobQueue queue:  Job queue
80        :param WorkerManager manager:  Scheduler instance that started this worker
81        :param modules:  Module catalog
82        """
83        super().__init__()
84        self.name = self.type
85        self.log = logger
86        self.manager = manager
87        self.job = job
88        self.init_time = int(time.time())
89        self.queue = queue
90
91        # ModuleCollector cannot be easily imported into a worker because it itself
92        # imports all workers, so you get a recursive import that Python (rightly) blocks
93        # so for workers, modules data is passed as a constructor argument
94        self.modules = modules

Worker init

Set up object attributes, e.g. the worker queue and manager, and initialize a new database connection and connected job queue. We cannot share database connections between workers because they are not thread-safe.

Parameters
  • Logger logger: Logging interface
  • Job job: Job this worker is being run on
  • JobQueue queue: Job queue
  • WorkerManager manager: Scheduler instance that started this worker
  • modules: Module catalog
type = 'misc'
max_workers = 1
INTERRUPT_NONE = False
INTERRUPT_RETRY = 1
INTERRUPT_CANCEL = 2
queue = None
job = None
config = None
log = None
manager = None
interrupted = False
modules = None
init_time = 0
name
1146    @property
1147    def name(self):
1148        """A string used for identification purposes only.
1149
1150        It has no semantics. Multiple threads may be given the same name. The
1151        initial name is set by the constructor.
1152
1153        """
1154        assert self._initialized, "Thread.__init__() not called"
1155        return self._name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

def run(self):
 96    def run(self):
 97        """
 98        Run the worker
 99
100        This calls the `work()` method, quite simply, but adds some
101        scaffolding to take care of any exceptions that occur during the
102        execution of the worker. The exception is then logged and the worker
103        is gracefully ended, but the job is *not* released to ensure that the
104        job is not run again immediately (which would probably instantly crash
105        in the exact same way).
106
107        There is also some set-up to ensure that thread-unsafe objects such as
108        the config reader are only initialised once in a threaded context, i.e.
109        once this method is run.
110
111        You can configure the `WARN_SLACK_URL` configuration variable to make
112        reports of worker crashers be sent to a Slack channel, which is a good
113        way to monitor a running 4CAT instance!
114        """
115        try:
116            database_appname = "%s-%s" % (self.type, self.job.data["id"])
117            self.config = ConfigWrapper(self.modules.config)
118            self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT)
119            self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue
120            self.work()
121        except WorkerInterruptedException:
122            self.log.info("Worker %s interrupted - cancelling." % self.type)
123
124            # interrupted - retry later or cancel job altogether?
125            if self.interrupted == self.INTERRUPT_RETRY:
126                self.job.release(delay=10)
127            elif self.interrupted == self.INTERRUPT_CANCEL:
128                self.job.finish()
129
130            self.abort()
131        except ProcessorException as e:
132            self.log.error(str(e), frame=e.frame)
133        except Exception as e:
134            stack = traceback.extract_tb(e.__traceback__)
135            frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
136            location = "->".join(frames)
137            self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
138
139        # Clean up after work successfully completed or terminates
140        self.clean_up()

Run the worker

This calls the work() method, quite simply, but adds some scaffolding to take care of any exceptions that occur during the execution of the worker. The exception is then logged and the worker is gracefully ended, but the job is not released to ensure that the job is not run again immediately (which would probably instantly crash in the exact same way).

There is also some set-up to ensure that thread-unsafe objects such as the config reader are only initialised once in a threaded context, i.e. once this method is run.

You can configure the WARN_SLACK_URL configuration variable to make reports of worker crashers be sent to a Slack channel, which is a good way to monitor a running 4CAT instance!

def clean_up(self):
142    def clean_up(self):
143        """
144        Clean up after a processor runs successfully or results in error.
145        Workers should override this method to implement any procedures
146        to run to clean up a worker; by default this does nothing.
147        """
148        pass

Clean up after a processor runs successfully or results in error. Workers should override this method to implement any procedures to run to clean up a worker; by default this does nothing.

def abort(self):
150    def abort(self):
151        """
152        Called when the application shuts down
153
154        Can be used to stop loops, for looping workers. Workers should override
155        this method to implement any procedures to run to clean up a worker
156        when it is interrupted; by default this does nothing.
157        """
158        pass

Called when the application shuts down

Can be used to stop loops, for looping workers. Workers should override this method to implement any procedures to run to clean up a worker when it is interrupted; by default this does nothing.

def request_interrupt(self, level=1):
160    def request_interrupt(self, level=1):
161        """
162        Set the 'abort requested' flag
163
164        Child workers should quit at their earliest convenience when this is
165        set. This can be done simply by checking the value of
166        `self.interrupted`.
167
168        :param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
169          `self.INTERRUPT_CANCEL`.
170        """
171        self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
172        self.interrupted = level

Set the 'abort requested' flag

Child workers should quit at their earliest convenience when this is set. This can be done simply by checking the value of self.interrupted.

Parameters
  • int level: Retry or cancel? Either self.INTERRUPT_RETRY or self.INTERRUPT_CANCEL.
@abc.abstractmethod
def work(self):
174    @abc.abstractmethod
175    def work(self):
176        """
177        This is where the actual work happens
178
179        Whatever the worker is supposed to do, it should happen (or be
180        initiated from) this method. By default it does nothing, descending
181        classes should implement this method.
182        """
183        pass

This is where the actual work happens

Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.

@staticmethod
def is_4cat_class():
185    @staticmethod
186    def is_4cat_class():
187        """
188        Is this a 4CAT class?
189
190        This is used to determine whether a class is a 4CAT worker or a
191        processor. This method should always return True for workers.
192
193        :return:  True
194        """
195        return True

Is this a 4CAT class?

This is used to determine whether a class is a 4CAT worker or a processor. This method should always return True for workers.

Returns

True

@staticmethod
def is_4cat_processor():
197    @staticmethod
198    def is_4cat_processor():
199        """
200        Is this a 4CAT processor?
201
202        This is used to determine whether a class is a 4CAT
203        processor.
204
205        :return:  False
206        """
207        return False

Is this a 4CAT processor?

This is used to determine whether a class is a 4CAT processor.

Returns

False