backend.workers.monitor_workers
Monitor active workers
1""" 2Monitor active workers 3""" 4 5import logging 6import sys 7 8from backend.lib.worker import BasicWorker 9 10 11class WorkerMonitor(BasicWorker): 12 """ 13 Monitor active threads 14 15 This writes a debug log message at an interval listing active 4CAT threads. 16 It can be used to identify workers that do not terminate correctly or to 17 know where a certain worker is stuck if it does not seem to progress. It 18 also shows the 'native ID' of the thread which can be used to find it in 19 e.g. htop or another process monitor to inspect it further. 20 21 If the logging level of 4CAT is not set to DEBUG or lower, this worker does 22 nothing. 23 """ 24 25 type = "worker-monitor" 26 max_workers = 1 27 28 @classmethod 29 def ensure_job(cls, config=None): 30 """ 31 Run at an interval of 15 seconds. 32 """ 33 return {"remote_id": "refresh-items", "interval": 15} 34 35 def work(self): 36 """ 37 Monitor active 4CAT threads 38 """ 39 if self.log.logger.level > logging.DEBUG: 40 # do nothing if we're not debugging 41 return self.job.finish() 42 43 # try to map active threads to 4CAT workers 44 # and also get the 'native ID' which, at least on linux, is the pid of 45 # the thread - allowing for further inspecting 46 frames = sys._current_frames() 47 thread_id_map = {} 48 for worker_type, workers in self.manager.worker_pool.items(): 49 for worker in workers: 50 thread_id_map[worker.ident] = f"{worker_type}/{worker.native_id}" 51 52 monitor_msg = "" 53 for thread_id, frame in frames.items(): 54 stack = [] 55 while frame: 56 # each frame has a reference to the parent frame (if there is one) 57 # traverse this stack to construct the actual call stack 58 stack.append( 59 f"{frame.f_code.co_filename.split('/')[-1]}:{frame.f_lineno}:{frame.f_code.co_name}()" 60 ) 61 frame = frame.f_back 62 63 stack = " <-- ".join(stack[:-1]) # ignore the very first frame which is never relevant 64 65 if thread_id in thread_id_map: 66 monitor_msg += f"\n 4CAT worker {thread_id_map[thread_id]} :: {stack}" 67 elif thread_id == self.manager.ident: 68 monitor_msg += f"\n 4CAT main loop :: {stack}" 69 else: 70 monitor_msg += f"\n 4CAT unknown thread {thread_id} :: {stack}" 71 72 self.log.debug(f"Currently {len(frames):,} active 4CAT threads: {monitor_msg}") 73 return self.job.finish()
12class WorkerMonitor(BasicWorker): 13 """ 14 Monitor active threads 15 16 This writes a debug log message at an interval listing active 4CAT threads. 17 It can be used to identify workers that do not terminate correctly or to 18 know where a certain worker is stuck if it does not seem to progress. It 19 also shows the 'native ID' of the thread which can be used to find it in 20 e.g. htop or another process monitor to inspect it further. 21 22 If the logging level of 4CAT is not set to DEBUG or lower, this worker does 23 nothing. 24 """ 25 26 type = "worker-monitor" 27 max_workers = 1 28 29 @classmethod 30 def ensure_job(cls, config=None): 31 """ 32 Run at an interval of 15 seconds. 33 """ 34 return {"remote_id": "refresh-items", "interval": 15} 35 36 def work(self): 37 """ 38 Monitor active 4CAT threads 39 """ 40 if self.log.logger.level > logging.DEBUG: 41 # do nothing if we're not debugging 42 return self.job.finish() 43 44 # try to map active threads to 4CAT workers 45 # and also get the 'native ID' which, at least on linux, is the pid of 46 # the thread - allowing for further inspecting 47 frames = sys._current_frames() 48 thread_id_map = {} 49 for worker_type, workers in self.manager.worker_pool.items(): 50 for worker in workers: 51 thread_id_map[worker.ident] = f"{worker_type}/{worker.native_id}" 52 53 monitor_msg = "" 54 for thread_id, frame in frames.items(): 55 stack = [] 56 while frame: 57 # each frame has a reference to the parent frame (if there is one) 58 # traverse this stack to construct the actual call stack 59 stack.append( 60 f"{frame.f_code.co_filename.split('/')[-1]}:{frame.f_lineno}:{frame.f_code.co_name}()" 61 ) 62 frame = frame.f_back 63 64 stack = " <-- ".join(stack[:-1]) # ignore the very first frame which is never relevant 65 66 if thread_id in thread_id_map: 67 monitor_msg += f"\n 4CAT worker {thread_id_map[thread_id]} :: {stack}" 68 elif thread_id == self.manager.ident: 69 monitor_msg += f"\n 4CAT main loop :: {stack}" 70 else: 71 monitor_msg += f"\n 4CAT unknown thread {thread_id} :: {stack}" 72 73 self.log.debug(f"Currently {len(frames):,} active 4CAT threads: {monitor_msg}") 74 return self.job.finish()
Monitor active threads
This writes a debug log message at an interval listing active 4CAT threads. It can be used to identify workers that do not terminate correctly or to know where a certain worker is stuck if it does not seem to progress. It also shows the 'native ID' of the thread which can be used to find it in e.g. htop or another process monitor to inspect it further.
If the logging level of 4CAT is not set to DEBUG or lower, this worker does nothing.
@classmethod
def
ensure_job(cls, config=None):
29 @classmethod 30 def ensure_job(cls, config=None): 31 """ 32 Run at an interval of 15 seconds. 33 """ 34 return {"remote_id": "refresh-items", "interval": 15}
Run at an interval of 15 seconds.
def
work(self):
36 def work(self): 37 """ 38 Monitor active 4CAT threads 39 """ 40 if self.log.logger.level > logging.DEBUG: 41 # do nothing if we're not debugging 42 return self.job.finish() 43 44 # try to map active threads to 4CAT workers 45 # and also get the 'native ID' which, at least on linux, is the pid of 46 # the thread - allowing for further inspecting 47 frames = sys._current_frames() 48 thread_id_map = {} 49 for worker_type, workers in self.manager.worker_pool.items(): 50 for worker in workers: 51 thread_id_map[worker.ident] = f"{worker_type}/{worker.native_id}" 52 53 monitor_msg = "" 54 for thread_id, frame in frames.items(): 55 stack = [] 56 while frame: 57 # each frame has a reference to the parent frame (if there is one) 58 # traverse this stack to construct the actual call stack 59 stack.append( 60 f"{frame.f_code.co_filename.split('/')[-1]}:{frame.f_lineno}:{frame.f_code.co_name}()" 61 ) 62 frame = frame.f_back 63 64 stack = " <-- ".join(stack[:-1]) # ignore the very first frame which is never relevant 65 66 if thread_id in thread_id_map: 67 monitor_msg += f"\n 4CAT worker {thread_id_map[thread_id]} :: {stack}" 68 elif thread_id == self.manager.ident: 69 monitor_msg += f"\n 4CAT main loop :: {stack}" 70 else: 71 monitor_msg += f"\n 4CAT unknown thread {thread_id} :: {stack}" 72 73 self.log.debug(f"Currently {len(frames):,} active 4CAT threads: {monitor_msg}") 74 return self.job.finish()
Monitor active 4CAT threads