Edit on GitHub

backend.workers.monitor_workers

Monitor active workers

 1"""
 2Monitor active workers
 3"""
 4
 5import logging
 6import sys
 7
 8from backend.lib.worker import BasicWorker
 9
10
11class WorkerMonitor(BasicWorker):
12    """
13    Monitor active threads
14
15    This writes a debug log message at an interval listing active 4CAT threads.
16    It can be used to identify workers that do not terminate correctly or to
17    know where a certain worker is stuck if it does not seem to progress. It
18    also shows the 'native ID' of the thread which can be used to find it in
19    e.g. htop or another process monitor to inspect it further.
20
21    If the logging level of 4CAT is not set to DEBUG or lower, this worker does
22    nothing.
23    """
24
25    type = "worker-monitor"
26    max_workers = 1
27
28    @classmethod
29    def ensure_job(cls, config=None):
30        """
31        Run at an interval of 15 seconds.
32        """
33        return {"remote_id": "refresh-items", "interval": 15}
34
35    def work(self):
36        """
37        Monitor active 4CAT threads
38        """
39        if self.log.logger.level > logging.DEBUG:
40            # do nothing if we're not debugging
41            return self.job.finish()
42
43        # try to map active threads to 4CAT workers
44        # and also get the 'native ID' which, at least on linux, is the pid of
45        # the thread - allowing for further inspecting
46        frames = sys._current_frames()
47        thread_id_map = {}
48        for worker_type, workers in self.manager.worker_pool.items():
49            for worker in workers:
50                thread_id_map[worker.ident] = f"{worker_type}/{worker.native_id}"
51
52        monitor_msg = ""
53        for thread_id, frame in frames.items():
54            stack = []
55            while frame:
56                # each frame has a reference to the parent frame (if there is one)
57                # traverse this stack to construct the actual call stack
58                stack.append(
59                    f"{frame.f_code.co_filename.split('/')[-1]}:{frame.f_lineno}:{frame.f_code.co_name}()"
60                )
61                frame = frame.f_back
62
63            stack = " <-- ".join(stack[:-1]) # ignore the very first frame which is never relevant
64            
65            if thread_id in thread_id_map:
66                monitor_msg += f"\n  4CAT worker {thread_id_map[thread_id]} :: {stack}"
67            elif thread_id == self.manager.ident:
68                monitor_msg += f"\n  4CAT main loop :: {stack}"
69            else:
70                monitor_msg += f"\n  4CAT unknown thread {thread_id} :: {stack}"
71
72        self.log.debug(f"Currently {len(frames):,} active 4CAT threads: {monitor_msg}")
73        return self.job.finish()
class WorkerMonitor(backend.lib.worker.BasicWorker):
12class WorkerMonitor(BasicWorker):
13    """
14    Monitor active threads
15
16    This writes a debug log message at an interval listing active 4CAT threads.
17    It can be used to identify workers that do not terminate correctly or to
18    know where a certain worker is stuck if it does not seem to progress. It
19    also shows the 'native ID' of the thread which can be used to find it in
20    e.g. htop or another process monitor to inspect it further.
21
22    If the logging level of 4CAT is not set to DEBUG or lower, this worker does
23    nothing.
24    """
25
26    type = "worker-monitor"
27    max_workers = 1
28
29    @classmethod
30    def ensure_job(cls, config=None):
31        """
32        Run at an interval of 15 seconds.
33        """
34        return {"remote_id": "refresh-items", "interval": 15}
35
36    def work(self):
37        """
38        Monitor active 4CAT threads
39        """
40        if self.log.logger.level > logging.DEBUG:
41            # do nothing if we're not debugging
42            return self.job.finish()
43
44        # try to map active threads to 4CAT workers
45        # and also get the 'native ID' which, at least on linux, is the pid of
46        # the thread - allowing for further inspecting
47        frames = sys._current_frames()
48        thread_id_map = {}
49        for worker_type, workers in self.manager.worker_pool.items():
50            for worker in workers:
51                thread_id_map[worker.ident] = f"{worker_type}/{worker.native_id}"
52
53        monitor_msg = ""
54        for thread_id, frame in frames.items():
55            stack = []
56            while frame:
57                # each frame has a reference to the parent frame (if there is one)
58                # traverse this stack to construct the actual call stack
59                stack.append(
60                    f"{frame.f_code.co_filename.split('/')[-1]}:{frame.f_lineno}:{frame.f_code.co_name}()"
61                )
62                frame = frame.f_back
63
64            stack = " <-- ".join(stack[:-1]) # ignore the very first frame which is never relevant
65            
66            if thread_id in thread_id_map:
67                monitor_msg += f"\n  4CAT worker {thread_id_map[thread_id]} :: {stack}"
68            elif thread_id == self.manager.ident:
69                monitor_msg += f"\n  4CAT main loop :: {stack}"
70            else:
71                monitor_msg += f"\n  4CAT unknown thread {thread_id} :: {stack}"
72
73        self.log.debug(f"Currently {len(frames):,} active 4CAT threads: {monitor_msg}")
74        return self.job.finish()

Monitor active threads

This writes a debug log message at an interval listing active 4CAT threads. It can be used to identify workers that do not terminate correctly or to know where a certain worker is stuck if it does not seem to progress. It also shows the 'native ID' of the thread which can be used to find it in e.g. htop or another process monitor to inspect it further.

If the logging level of 4CAT is not set to DEBUG or lower, this worker does nothing.

type = 'worker-monitor'
max_workers = 1
@classmethod
def ensure_job(cls, config=None):
29    @classmethod
30    def ensure_job(cls, config=None):
31        """
32        Run at an interval of 15 seconds.
33        """
34        return {"remote_id": "refresh-items", "interval": 15}

Run at an interval of 15 seconds.

def work(self):
36    def work(self):
37        """
38        Monitor active 4CAT threads
39        """
40        if self.log.logger.level > logging.DEBUG:
41            # do nothing if we're not debugging
42            return self.job.finish()
43
44        # try to map active threads to 4CAT workers
45        # and also get the 'native ID' which, at least on linux, is the pid of
46        # the thread - allowing for further inspecting
47        frames = sys._current_frames()
48        thread_id_map = {}
49        for worker_type, workers in self.manager.worker_pool.items():
50            for worker in workers:
51                thread_id_map[worker.ident] = f"{worker_type}/{worker.native_id}"
52
53        monitor_msg = ""
54        for thread_id, frame in frames.items():
55            stack = []
56            while frame:
57                # each frame has a reference to the parent frame (if there is one)
58                # traverse this stack to construct the actual call stack
59                stack.append(
60                    f"{frame.f_code.co_filename.split('/')[-1]}:{frame.f_lineno}:{frame.f_code.co_name}()"
61                )
62                frame = frame.f_back
63
64            stack = " <-- ".join(stack[:-1]) # ignore the very first frame which is never relevant
65            
66            if thread_id in thread_id_map:
67                monitor_msg += f"\n  4CAT worker {thread_id_map[thread_id]} :: {stack}"
68            elif thread_id == self.manager.ident:
69                monitor_msg += f"\n  4CAT main loop :: {stack}"
70            else:
71                monitor_msg += f"\n  4CAT unknown thread {thread_id} :: {stack}"
72
73        self.log.debug(f"Currently {len(frames):,} active 4CAT threads: {monitor_msg}")
74        return self.job.finish()

Monitor active 4CAT threads