Edit on GitHub

backend.lib.worker

Worker class that all workers should implement

  1"""
  2Worker class that all workers should implement
  3"""
  4import traceback
  5import threading
  6import time
  7import abc
  8
  9from common.lib.queue import JobQueue
 10from common.lib.database import Database
 11from common.lib.exceptions import WorkerInterruptedException, ProcessorException
 12from common.config_manager import ConfigWrapper
 13
 14class BasicWorker(threading.Thread, metaclass=abc.ABCMeta):
 15    """
 16    Abstract Worker class
 17
 18    This runs as a separate thread in which a worker method is executed. The
 19    work method can do whatever the worker needs to do - that part is to be
 20    implemented by a child class. This class provides scaffolding that makes
 21    sure crashes are caught properly and the relevant data is available to the
 22    worker code.
 23    """
 24    #: Worker type - should match Job ID used when queuing jobs
 25    type = "misc"
 26
 27    #: Amount of workers of this type that can run in parallel. Be careful with
 28    #: this, because values higher than 1 will mean that e.g. API rate limits
 29    #: are easily violated.
 30    max_workers = 1
 31
 32    #: Flag value to indicate worker interruption type - not interrupted
 33    INTERRUPT_NONE = False
 34
 35    #: Flag value to indicate worker interruption type - interrupted, but can
 36    #: be retried
 37    INTERRUPT_RETRY = 1
 38
 39    #: Flag value to indicate worker interruption type - interrupted, but
 40    #: should be cancelled
 41    INTERRUPT_CANCEL = 2
 42
 43    #: Job queue that can be used to create or manipulate jobs
 44    queue = None
 45
 46    #: Job this worker is being run for
 47    job = None
 48
 49    #: Local configuration (used in processors)
 50    config = None
 51
 52    #: Logger object
 53    log = None
 54
 55    #: WorkerManager that manages this worker
 56    manager = None
 57
 58    #: Interrupt status, one of the `INTERRUPT_` class constants
 59    interrupted = False
 60
 61    #: Module index
 62    modules = None
 63
 64    #: Unix timestamp at which this worker was started
 65    init_time = 0
 66
 67    def __init__(self, logger, job, queue=None, manager=None, modules=None):
 68        """
 69        Worker init
 70
 71        Set up object attributes, e.g. the worker queue and manager, and
 72        initialize a new database connection and connected job queue. We cannot
 73        share database connections between workers because they are not
 74        thread-safe.
 75
 76        :param Logger logger:  Logging interface
 77        :param Job job:  Job this worker is being run on
 78        :param JobQueue queue:  Job queue
 79        :param WorkerManager manager:  Scheduler instance that started this worker
 80        :param modules:  Module catalog
 81        """
 82        super().__init__()
 83        self.name = self.type
 84        self.log = logger
 85        self.manager = manager
 86        self.job = job
 87        self.init_time = int(time.time())
 88        self.queue = queue
 89
 90        # ModuleCollector cannot be easily imported into a worker because it itself
 91        # imports all workers, so you get a recursive import that Python (rightly) blocks
 92        # so for workers, modules data is passed as a constructor argument
 93        self.modules = modules
 94
 95    def run(self):
 96        """
 97        Run the worker
 98
 99        This calls the `work()` method, quite simply, but adds some
100        scaffolding to take care of any exceptions that occur during the
101        execution of the worker. The exception is then logged and the worker
102        is gracefully ended, but the job is *not* released to ensure that the
103        job is not run again immediately (which would probably instantly crash
104        in the exact same way).
105
106        There is also some set-up to ensure that thread-unsafe objects such as
107        the config reader are only initialised once in a threaded context, i.e.
108        once this method is run.
109
110        You can configure the `WARN_SLACK_URL` configuration variable to make
111        reports of worker crashers be sent to a Slack channel, which is a good
112        way to monitor a running 4CAT instance!
113        """
114        try:
115            database_appname = "%s-%s" % (self.type, self.job.data["id"])
116            self.config = ConfigWrapper(self.modules.config)
117            self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT)
118            self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue
119            self.work()
120        except WorkerInterruptedException:
121            self.log.info("Worker %s interrupted - cancelling." % self.type)
122
123            # interrupted - retry later or cancel job altogether?
124            if self.interrupted == self.INTERRUPT_RETRY:
125                self.job.release(delay=10)
126            elif self.interrupted == self.INTERRUPT_CANCEL:
127                self.job.finish()
128
129            self.abort()
130        except ProcessorException as e:
131            self.log.error(str(e), frame=e.frame)
132        except Exception as e:
133            stack = traceback.extract_tb(e.__traceback__)
134            frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
135            location = "->".join(frames)
136            self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
137
138        # Clean up after work successfully completed or terminates
139        self.clean_up()
140
141        # explicitly close database connection as soon as it's possible
142        self.db.close()
143
144    def clean_up(self):
145        """
146        Clean up after a processor runs successfully or results in error.
147        Workers should override this method to implement any procedures
148        to run to clean up a worker; by default this does nothing.
149        """
150        pass
151
152    def abort(self):
153        """
154        Called when the application shuts down
155
156        Can be used to stop loops, for looping workers. Workers should override
157        this method to implement any procedures to run to clean up a worker
158        when it is interrupted; by default this does nothing.
159        """
160        pass
161
162    def request_interrupt(self, level=1):
163        """
164        Set the 'abort requested' flag
165
166        Child workers should quit at their earliest convenience when this is
167        set. This can be done simply by checking the value of
168        `self.interrupted`.
169
170        :param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
171          `self.INTERRUPT_CANCEL`.
172        """
173        self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
174        self.interrupted = level
175
176    @abc.abstractmethod
177    def work(self):
178        """
179        This is where the actual work happens
180
181        Whatever the worker is supposed to do, it should happen (or be
182        initiated from) this method. By default it does nothing, descending
183        classes should implement this method.
184        """
185        pass
186
187    @staticmethod
188    def is_4cat_class():
189        """
190        Is this a 4CAT class?
191
192        This is used to determine whether a class is a 4CAT worker or a
193        processor. This method should always return True for workers.
194
195        :return:  True
196        """
197        return True
198
199    @staticmethod
200    def is_4cat_processor():
201        """
202        Is this a 4CAT processor?
203
204        This is used to determine whether a class is a 4CAT
205        processor.
206
207        :return:  False
208        """
209        return False
class BasicWorker(threading.Thread):
 15class BasicWorker(threading.Thread, metaclass=abc.ABCMeta):
 16    """
 17    Abstract Worker class
 18
 19    This runs as a separate thread in which a worker method is executed. The
 20    work method can do whatever the worker needs to do - that part is to be
 21    implemented by a child class. This class provides scaffolding that makes
 22    sure crashes are caught properly and the relevant data is available to the
 23    worker code.
 24    """
 25    #: Worker type - should match Job ID used when queuing jobs
 26    type = "misc"
 27
 28    #: Amount of workers of this type that can run in parallel. Be careful with
 29    #: this, because values higher than 1 will mean that e.g. API rate limits
 30    #: are easily violated.
 31    max_workers = 1
 32
 33    #: Flag value to indicate worker interruption type - not interrupted
 34    INTERRUPT_NONE = False
 35
 36    #: Flag value to indicate worker interruption type - interrupted, but can
 37    #: be retried
 38    INTERRUPT_RETRY = 1
 39
 40    #: Flag value to indicate worker interruption type - interrupted, but
 41    #: should be cancelled
 42    INTERRUPT_CANCEL = 2
 43
 44    #: Job queue that can be used to create or manipulate jobs
 45    queue = None
 46
 47    #: Job this worker is being run for
 48    job = None
 49
 50    #: Local configuration (used in processors)
 51    config = None
 52
 53    #: Logger object
 54    log = None
 55
 56    #: WorkerManager that manages this worker
 57    manager = None
 58
 59    #: Interrupt status, one of the `INTERRUPT_` class constants
 60    interrupted = False
 61
 62    #: Module index
 63    modules = None
 64
 65    #: Unix timestamp at which this worker was started
 66    init_time = 0
 67
 68    def __init__(self, logger, job, queue=None, manager=None, modules=None):
 69        """
 70        Worker init
 71
 72        Set up object attributes, e.g. the worker queue and manager, and
 73        initialize a new database connection and connected job queue. We cannot
 74        share database connections between workers because they are not
 75        thread-safe.
 76
 77        :param Logger logger:  Logging interface
 78        :param Job job:  Job this worker is being run on
 79        :param JobQueue queue:  Job queue
 80        :param WorkerManager manager:  Scheduler instance that started this worker
 81        :param modules:  Module catalog
 82        """
 83        super().__init__()
 84        self.name = self.type
 85        self.log = logger
 86        self.manager = manager
 87        self.job = job
 88        self.init_time = int(time.time())
 89        self.queue = queue
 90
 91        # ModuleCollector cannot be easily imported into a worker because it itself
 92        # imports all workers, so you get a recursive import that Python (rightly) blocks
 93        # so for workers, modules data is passed as a constructor argument
 94        self.modules = modules
 95
 96    def run(self):
 97        """
 98        Run the worker
 99
100        This calls the `work()` method, quite simply, but adds some
101        scaffolding to take care of any exceptions that occur during the
102        execution of the worker. The exception is then logged and the worker
103        is gracefully ended, but the job is *not* released to ensure that the
104        job is not run again immediately (which would probably instantly crash
105        in the exact same way).
106
107        There is also some set-up to ensure that thread-unsafe objects such as
108        the config reader are only initialised once in a threaded context, i.e.
109        once this method is run.
110
111        You can configure the `WARN_SLACK_URL` configuration variable to make
112        reports of worker crashers be sent to a Slack channel, which is a good
113        way to monitor a running 4CAT instance!
114        """
115        try:
116            database_appname = "%s-%s" % (self.type, self.job.data["id"])
117            self.config = ConfigWrapper(self.modules.config)
118            self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT)
119            self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue
120            self.work()
121        except WorkerInterruptedException:
122            self.log.info("Worker %s interrupted - cancelling." % self.type)
123
124            # interrupted - retry later or cancel job altogether?
125            if self.interrupted == self.INTERRUPT_RETRY:
126                self.job.release(delay=10)
127            elif self.interrupted == self.INTERRUPT_CANCEL:
128                self.job.finish()
129
130            self.abort()
131        except ProcessorException as e:
132            self.log.error(str(e), frame=e.frame)
133        except Exception as e:
134            stack = traceback.extract_tb(e.__traceback__)
135            frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
136            location = "->".join(frames)
137            self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
138
139        # Clean up after work successfully completed or terminates
140        self.clean_up()
141
142        # explicitly close database connection as soon as it's possible
143        self.db.close()
144
145    def clean_up(self):
146        """
147        Clean up after a processor runs successfully or results in error.
148        Workers should override this method to implement any procedures
149        to run to clean up a worker; by default this does nothing.
150        """
151        pass
152
153    def abort(self):
154        """
155        Called when the application shuts down
156
157        Can be used to stop loops, for looping workers. Workers should override
158        this method to implement any procedures to run to clean up a worker
159        when it is interrupted; by default this does nothing.
160        """
161        pass
162
163    def request_interrupt(self, level=1):
164        """
165        Set the 'abort requested' flag
166
167        Child workers should quit at their earliest convenience when this is
168        set. This can be done simply by checking the value of
169        `self.interrupted`.
170
171        :param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
172          `self.INTERRUPT_CANCEL`.
173        """
174        self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
175        self.interrupted = level
176
177    @abc.abstractmethod
178    def work(self):
179        """
180        This is where the actual work happens
181
182        Whatever the worker is supposed to do, it should happen (or be
183        initiated from) this method. By default it does nothing, descending
184        classes should implement this method.
185        """
186        pass
187
188    @staticmethod
189    def is_4cat_class():
190        """
191        Is this a 4CAT class?
192
193        This is used to determine whether a class is a 4CAT worker or a
194        processor. This method should always return True for workers.
195
196        :return:  True
197        """
198        return True
199
200    @staticmethod
201    def is_4cat_processor():
202        """
203        Is this a 4CAT processor?
204
205        This is used to determine whether a class is a 4CAT
206        processor.
207
208        :return:  False
209        """
210        return False

Abstract Worker class

This runs as a separate thread in which a worker method is executed. The work method can do whatever the worker needs to do - that part is to be implemented by a child class. This class provides scaffolding that makes sure crashes are caught properly and the relevant data is available to the worker code.

BasicWorker(logger, job, queue=None, manager=None, modules=None)
68    def __init__(self, logger, job, queue=None, manager=None, modules=None):
69        """
70        Worker init
71
72        Set up object attributes, e.g. the worker queue and manager, and
73        initialize a new database connection and connected job queue. We cannot
74        share database connections between workers because they are not
75        thread-safe.
76
77        :param Logger logger:  Logging interface
78        :param Job job:  Job this worker is being run on
79        :param JobQueue queue:  Job queue
80        :param WorkerManager manager:  Scheduler instance that started this worker
81        :param modules:  Module catalog
82        """
83        super().__init__()
84        self.name = self.type
85        self.log = logger
86        self.manager = manager
87        self.job = job
88        self.init_time = int(time.time())
89        self.queue = queue
90
91        # ModuleCollector cannot be easily imported into a worker because it itself
92        # imports all workers, so you get a recursive import that Python (rightly) blocks
93        # so for workers, modules data is passed as a constructor argument
94        self.modules = modules

Worker init

Set up object attributes, e.g. the worker queue and manager, and initialize a new database connection and connected job queue. We cannot share database connections between workers because they are not thread-safe.

Parameters
  • Logger logger: Logging interface
  • Job job: Job this worker is being run on
  • JobQueue queue: Job queue
  • WorkerManager manager: Scheduler instance that started this worker
  • modules: Module catalog
type = 'misc'
max_workers = 1
INTERRUPT_NONE = False
INTERRUPT_RETRY = 1
INTERRUPT_CANCEL = 2
queue = None
job = None
config = None
log = None
manager = None
interrupted = False
modules = None
init_time = 0
name
1146    @property
1147    def name(self):
1148        """A string used for identification purposes only.
1149
1150        It has no semantics. Multiple threads may be given the same name. The
1151        initial name is set by the constructor.
1152
1153        """
1154        assert self._initialized, "Thread.__init__() not called"
1155        return self._name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

def run(self):
 96    def run(self):
 97        """
 98        Run the worker
 99
100        This calls the `work()` method, quite simply, but adds some
101        scaffolding to take care of any exceptions that occur during the
102        execution of the worker. The exception is then logged and the worker
103        is gracefully ended, but the job is *not* released to ensure that the
104        job is not run again immediately (which would probably instantly crash
105        in the exact same way).
106
107        There is also some set-up to ensure that thread-unsafe objects such as
108        the config reader are only initialised once in a threaded context, i.e.
109        once this method is run.
110
111        You can configure the `WARN_SLACK_URL` configuration variable to make
112        reports of worker crashers be sent to a Slack channel, which is a good
113        way to monitor a running 4CAT instance!
114        """
115        try:
116            database_appname = "%s-%s" % (self.type, self.job.data["id"])
117            self.config = ConfigWrapper(self.modules.config)
118            self.db = Database(logger=self.log, appname=database_appname, dbname=self.config.DB_NAME, user=self.config.DB_USER, password=self.config.DB_PASSWORD, host=self.config.DB_HOST, port=self.config.DB_PORT)
119            self.queue = JobQueue(logger=self.log, database=self.db) if not self.queue else self.queue
120            self.work()
121        except WorkerInterruptedException:
122            self.log.info("Worker %s interrupted - cancelling." % self.type)
123
124            # interrupted - retry later or cancel job altogether?
125            if self.interrupted == self.INTERRUPT_RETRY:
126                self.job.release(delay=10)
127            elif self.interrupted == self.INTERRUPT_CANCEL:
128                self.job.finish()
129
130            self.abort()
131        except ProcessorException as e:
132            self.log.error(str(e), frame=e.frame)
133        except Exception as e:
134            stack = traceback.extract_tb(e.__traceback__)
135            frames = [frame.filename.split("/").pop() + ":" + str(frame.lineno) for frame in stack]
136            location = "->".join(frames)
137            self.log.error("Worker %s raised exception %s and will abort: %s at %s" % (self.type, e.__class__.__name__, str(e), location), frame=stack)
138
139        # Clean up after work successfully completed or terminates
140        self.clean_up()
141
142        # explicitly close database connection as soon as it's possible
143        self.db.close()

Run the worker

This calls the work() method, quite simply, but adds some scaffolding to take care of any exceptions that occur during the execution of the worker. The exception is then logged and the worker is gracefully ended, but the job is not released to ensure that the job is not run again immediately (which would probably instantly crash in the exact same way).

There is also some set-up to ensure that thread-unsafe objects such as the config reader are only initialised once in a threaded context, i.e. once this method is run.

You can configure the WARN_SLACK_URL configuration variable to make reports of worker crashers be sent to a Slack channel, which is a good way to monitor a running 4CAT instance!

def clean_up(self):
145    def clean_up(self):
146        """
147        Clean up after a processor runs successfully or results in error.
148        Workers should override this method to implement any procedures
149        to run to clean up a worker; by default this does nothing.
150        """
151        pass

Clean up after a processor runs successfully or results in error. Workers should override this method to implement any procedures to run to clean up a worker; by default this does nothing.

def abort(self):
153    def abort(self):
154        """
155        Called when the application shuts down
156
157        Can be used to stop loops, for looping workers. Workers should override
158        this method to implement any procedures to run to clean up a worker
159        when it is interrupted; by default this does nothing.
160        """
161        pass

Called when the application shuts down

Can be used to stop loops, for looping workers. Workers should override this method to implement any procedures to run to clean up a worker when it is interrupted; by default this does nothing.

def request_interrupt(self, level=1):
163    def request_interrupt(self, level=1):
164        """
165        Set the 'abort requested' flag
166
167        Child workers should quit at their earliest convenience when this is
168        set. This can be done simply by checking the value of
169        `self.interrupted`.
170
171        :param int level:  Retry or cancel? Either `self.INTERRUPT_RETRY` or
172          `self.INTERRUPT_CANCEL`.
173        """
174        self.log.debug("Interrupt requested for worker %s/%s" % (self.job.data["jobtype"], self.job.data["remote_id"]))
175        self.interrupted = level

Set the 'abort requested' flag

Child workers should quit at their earliest convenience when this is set. This can be done simply by checking the value of self.interrupted.

Parameters
  • int level: Retry or cancel? Either self.INTERRUPT_RETRY or self.INTERRUPT_CANCEL.
@abc.abstractmethod
def work(self):
177    @abc.abstractmethod
178    def work(self):
179        """
180        This is where the actual work happens
181
182        Whatever the worker is supposed to do, it should happen (or be
183        initiated from) this method. By default it does nothing, descending
184        classes should implement this method.
185        """
186        pass

This is where the actual work happens

Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.

@staticmethod
def is_4cat_class():
188    @staticmethod
189    def is_4cat_class():
190        """
191        Is this a 4CAT class?
192
193        This is used to determine whether a class is a 4CAT worker or a
194        processor. This method should always return True for workers.
195
196        :return:  True
197        """
198        return True

Is this a 4CAT class?

This is used to determine whether a class is a 4CAT worker or a processor. This method should always return True for workers.

Returns

True

@staticmethod
def is_4cat_processor():
200    @staticmethod
201    def is_4cat_processor():
202        """
203        Is this a 4CAT processor?
204
205        This is used to determine whether a class is a 4CAT
206        processor.
207
208        :return:  False
209        """
210        return False

Is this a 4CAT processor?

This is used to determine whether a class is a 4CAT processor.

Returns

False