backend.workers.datasource_metrics
Calculate various 4CAT metrics
Two types of metrics are currently calculated:
- General metrics. Currently this is mostly the total dataset size, which is useful to know for admins
- Datasource metrics. This is used for both processors (e.g. to calculate relative) and to show how many posts a local datasource contains.
1""" 2Calculate various 4CAT metrics 3 4Two types of metrics are currently calculated: 5 6- General metrics. Currently this is mostly the total dataset size, which is 7 useful to know for admins 8- Datasource metrics. This is used for both processors (e.g. to calculate 9 relative) and to show how many posts a local datasource contains. 10""" 11import os 12 13from datetime import datetime, time, timezone 14 15from backend.lib.worker import BasicWorker 16from common.config_manager import config 17 18 19class DatasourceMetrics(BasicWorker): 20 """ 21 Calculate metrics 22 23 This will be stored in a separate PostgreSQL table. 24 """ 25 type = "datasource-metrics" 26 max_workers = 1 27 28 ensure_job = {"remote_id": "localhost", "interval": 43200} 29 30 def work(self): 31 self.general_stats() 32 self.data_stats() 33 34 @staticmethod 35 def folder_size(path='.'): 36 """ 37 Get the size of a folder using os.scandir for efficiency 38 """ 39 total = 0 40 for entry in os.scandir(path): 41 if entry.is_file(): 42 total += entry.stat().st_size 43 elif entry.is_dir(): 44 total += DatasourceMetrics.folder_size(entry.path) 45 return total 46 47 def general_stats(self): 48 """ 49 Calculate general 4CAT stats 50 51 These sizes can be very slow to calculate, which is why we do it in 52 this worker instead of on demand. 53 """ 54 metrics = { 55 "size_data": DatasourceMetrics.folder_size(config.get("PATH_DATA")), 56 "size_logs": DatasourceMetrics.folder_size(config.get("PATH_LOGS")), 57 "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (config.get("DB_NAME"),))["num"] 58 } 59 60 for metric, value in metrics.items(): 61 self.db.upsert("metrics", { 62 "metric": metric, 63 "count": value, 64 "datasource": "4cat", 65 "board": "", 66 "date": "now" 67 }, constraints=["metric", "datasource", "board", "date"]) 68 69 def data_stats(self): 70 """ 71 Go through all local datasources, and update the posts per day 72 if they haven't been calculated yet. These data can then be used 73 to calculate e.g. posts per month. 74 :return: 75 """ 76 77 # Get a list of all database tables 78 all_tables = [row["tablename"] for row in self.db.fetchall( 79 "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")] 80 81 # Check if the metrics table is already present 82 metrics_exists = True if "metrics" in all_tables else False 83 84 # If not, make it. 85 if not metrics_exists: 86 self.db.execute(""" 87 CREATE TABLE IF NOT EXISTS metrics ( 88 metric text, 89 datasource text, 90 board text, 91 date text, 92 count integer 93 ); 94 95 """) 96 97 added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")] 98 enabled_datasources = config.get("datasources.enabled", {}) 99 100 for datasource_id in self.modules.datasources: 101 if datasource_id not in enabled_datasources: 102 continue 103 104 datasource = self.modules.workers.get(datasource_id + "-search") 105 if not datasource: 106 continue 107 108 # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") 109 database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id 110 111 is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False 112 is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False 113 114 # Only update local datasources 115 if is_local: 116 117 # Some translating.. 118 settings_id = datasource_id 119 if datasource_id == "4chan": 120 settings_id = "fourchan" 121 elif datasource_id == "8chan": 122 settings_id = "eightchan" 123 124 boards = [b for b in config.get(settings_id + "-search.boards", [])] 125 126 # If a datasource is static (so not updated) and it 127 # is already present in the metrics table, we don't 128 # need to update its metrics anymore. 129 if is_static and datasource_id in added_datasources: 130 continue 131 else: 132 133 # ------------------------- 134 # Posts per day metric 135 # ------------------------- 136 137 # Get the name of the posts table for this datasource 138 posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id 139 140 # Count and update for every board individually 141 for board in boards: 142 143 if not board: 144 board_sql = " board = '' OR board = NULL" 145 else: 146 board_sql = " board='" + board + "'" 147 148 # Midnight of this day in UTC epoch timestamp 149 midnight = int( 150 datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp()) 151 152 # We only count passed days 153 time_sql = "timestamp < " + str(midnight) 154 155 # If the datasource is dynamic, we also only update days 156 # that haven't been added yet - these are heavy queries. 157 if not is_static: 158 days_added = self.db.fetchall( 159 "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % ( 160 database_db_id, board)) 161 162 if days_added: 163 164 last_day_added = max([row["date"] for row in days_added]) 165 last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace( 166 tzinfo=timezone.utc) 167 168 # If the last day added is today, there's no need to update yet 169 if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date(): 170 self.log.info( 171 "No new posts per day to count for %s%s" % (datasource_id, "/" + board)) 172 continue 173 174 # Change to UTC epoch timestamp for postgres query 175 after_timestamp = int(last_day_added.timestamp()) 176 177 time_sql += " AND timestamp > " + str(after_timestamp) + " " 178 179 self.log.info( 180 "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board)) 181 182 # Get those counts 183 query = """ 184 SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT 185 FROM %s 186 WHERE %s AND %s 187 GROUP BY metric, datasource, board, date; 188 """ % (database_db_id, posts_table, board_sql, time_sql) 189 # Add to metrics table 190 rows = [dict(row) for row in self.db.fetchall(query)] 191 192 if rows: 193 for row in rows: 194 self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"]) 195 196 # ------------------------------- 197 # no other metrics added yet 198 # ------------------------------- 199 200 self.job.finish()
20class DatasourceMetrics(BasicWorker): 21 """ 22 Calculate metrics 23 24 This will be stored in a separate PostgreSQL table. 25 """ 26 type = "datasource-metrics" 27 max_workers = 1 28 29 ensure_job = {"remote_id": "localhost", "interval": 43200} 30 31 def work(self): 32 self.general_stats() 33 self.data_stats() 34 35 @staticmethod 36 def folder_size(path='.'): 37 """ 38 Get the size of a folder using os.scandir for efficiency 39 """ 40 total = 0 41 for entry in os.scandir(path): 42 if entry.is_file(): 43 total += entry.stat().st_size 44 elif entry.is_dir(): 45 total += DatasourceMetrics.folder_size(entry.path) 46 return total 47 48 def general_stats(self): 49 """ 50 Calculate general 4CAT stats 51 52 These sizes can be very slow to calculate, which is why we do it in 53 this worker instead of on demand. 54 """ 55 metrics = { 56 "size_data": DatasourceMetrics.folder_size(config.get("PATH_DATA")), 57 "size_logs": DatasourceMetrics.folder_size(config.get("PATH_LOGS")), 58 "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (config.get("DB_NAME"),))["num"] 59 } 60 61 for metric, value in metrics.items(): 62 self.db.upsert("metrics", { 63 "metric": metric, 64 "count": value, 65 "datasource": "4cat", 66 "board": "", 67 "date": "now" 68 }, constraints=["metric", "datasource", "board", "date"]) 69 70 def data_stats(self): 71 """ 72 Go through all local datasources, and update the posts per day 73 if they haven't been calculated yet. These data can then be used 74 to calculate e.g. posts per month. 75 :return: 76 """ 77 78 # Get a list of all database tables 79 all_tables = [row["tablename"] for row in self.db.fetchall( 80 "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")] 81 82 # Check if the metrics table is already present 83 metrics_exists = True if "metrics" in all_tables else False 84 85 # If not, make it. 86 if not metrics_exists: 87 self.db.execute(""" 88 CREATE TABLE IF NOT EXISTS metrics ( 89 metric text, 90 datasource text, 91 board text, 92 date text, 93 count integer 94 ); 95 96 """) 97 98 added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")] 99 enabled_datasources = config.get("datasources.enabled", {}) 100 101 for datasource_id in self.modules.datasources: 102 if datasource_id not in enabled_datasources: 103 continue 104 105 datasource = self.modules.workers.get(datasource_id + "-search") 106 if not datasource: 107 continue 108 109 # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") 110 database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id 111 112 is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False 113 is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False 114 115 # Only update local datasources 116 if is_local: 117 118 # Some translating.. 119 settings_id = datasource_id 120 if datasource_id == "4chan": 121 settings_id = "fourchan" 122 elif datasource_id == "8chan": 123 settings_id = "eightchan" 124 125 boards = [b for b in config.get(settings_id + "-search.boards", [])] 126 127 # If a datasource is static (so not updated) and it 128 # is already present in the metrics table, we don't 129 # need to update its metrics anymore. 130 if is_static and datasource_id in added_datasources: 131 continue 132 else: 133 134 # ------------------------- 135 # Posts per day metric 136 # ------------------------- 137 138 # Get the name of the posts table for this datasource 139 posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id 140 141 # Count and update for every board individually 142 for board in boards: 143 144 if not board: 145 board_sql = " board = '' OR board = NULL" 146 else: 147 board_sql = " board='" + board + "'" 148 149 # Midnight of this day in UTC epoch timestamp 150 midnight = int( 151 datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp()) 152 153 # We only count passed days 154 time_sql = "timestamp < " + str(midnight) 155 156 # If the datasource is dynamic, we also only update days 157 # that haven't been added yet - these are heavy queries. 158 if not is_static: 159 days_added = self.db.fetchall( 160 "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % ( 161 database_db_id, board)) 162 163 if days_added: 164 165 last_day_added = max([row["date"] for row in days_added]) 166 last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace( 167 tzinfo=timezone.utc) 168 169 # If the last day added is today, there's no need to update yet 170 if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date(): 171 self.log.info( 172 "No new posts per day to count for %s%s" % (datasource_id, "/" + board)) 173 continue 174 175 # Change to UTC epoch timestamp for postgres query 176 after_timestamp = int(last_day_added.timestamp()) 177 178 time_sql += " AND timestamp > " + str(after_timestamp) + " " 179 180 self.log.info( 181 "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board)) 182 183 # Get those counts 184 query = """ 185 SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT 186 FROM %s 187 WHERE %s AND %s 188 GROUP BY metric, datasource, board, date; 189 """ % (database_db_id, posts_table, board_sql, time_sql) 190 # Add to metrics table 191 rows = [dict(row) for row in self.db.fetchall(query)] 192 193 if rows: 194 for row in rows: 195 self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"]) 196 197 # ------------------------------- 198 # no other metrics added yet 199 # ------------------------------- 200 201 self.job.finish()
Calculate metrics
This will be stored in a separate PostgreSQL table.
def
work(self):
This is where the actual work happens
Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.
@staticmethod
def
folder_size(path='.'):
35 @staticmethod 36 def folder_size(path='.'): 37 """ 38 Get the size of a folder using os.scandir for efficiency 39 """ 40 total = 0 41 for entry in os.scandir(path): 42 if entry.is_file(): 43 total += entry.stat().st_size 44 elif entry.is_dir(): 45 total += DatasourceMetrics.folder_size(entry.path) 46 return total
Get the size of a folder using os.scandir for efficiency
def
general_stats(self):
48 def general_stats(self): 49 """ 50 Calculate general 4CAT stats 51 52 These sizes can be very slow to calculate, which is why we do it in 53 this worker instead of on demand. 54 """ 55 metrics = { 56 "size_data": DatasourceMetrics.folder_size(config.get("PATH_DATA")), 57 "size_logs": DatasourceMetrics.folder_size(config.get("PATH_LOGS")), 58 "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (config.get("DB_NAME"),))["num"] 59 } 60 61 for metric, value in metrics.items(): 62 self.db.upsert("metrics", { 63 "metric": metric, 64 "count": value, 65 "datasource": "4cat", 66 "board": "", 67 "date": "now" 68 }, constraints=["metric", "datasource", "board", "date"])
Calculate general 4CAT stats
These sizes can be very slow to calculate, which is why we do it in this worker instead of on demand.
def
data_stats(self):
70 def data_stats(self): 71 """ 72 Go through all local datasources, and update the posts per day 73 if they haven't been calculated yet. These data can then be used 74 to calculate e.g. posts per month. 75 :return: 76 """ 77 78 # Get a list of all database tables 79 all_tables = [row["tablename"] for row in self.db.fetchall( 80 "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")] 81 82 # Check if the metrics table is already present 83 metrics_exists = True if "metrics" in all_tables else False 84 85 # If not, make it. 86 if not metrics_exists: 87 self.db.execute(""" 88 CREATE TABLE IF NOT EXISTS metrics ( 89 metric text, 90 datasource text, 91 board text, 92 date text, 93 count integer 94 ); 95 96 """) 97 98 added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")] 99 enabled_datasources = config.get("datasources.enabled", {}) 100 101 for datasource_id in self.modules.datasources: 102 if datasource_id not in enabled_datasources: 103 continue 104 105 datasource = self.modules.workers.get(datasource_id + "-search") 106 if not datasource: 107 continue 108 109 # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") 110 database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id 111 112 is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False 113 is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False 114 115 # Only update local datasources 116 if is_local: 117 118 # Some translating.. 119 settings_id = datasource_id 120 if datasource_id == "4chan": 121 settings_id = "fourchan" 122 elif datasource_id == "8chan": 123 settings_id = "eightchan" 124 125 boards = [b for b in config.get(settings_id + "-search.boards", [])] 126 127 # If a datasource is static (so not updated) and it 128 # is already present in the metrics table, we don't 129 # need to update its metrics anymore. 130 if is_static and datasource_id in added_datasources: 131 continue 132 else: 133 134 # ------------------------- 135 # Posts per day metric 136 # ------------------------- 137 138 # Get the name of the posts table for this datasource 139 posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id 140 141 # Count and update for every board individually 142 for board in boards: 143 144 if not board: 145 board_sql = " board = '' OR board = NULL" 146 else: 147 board_sql = " board='" + board + "'" 148 149 # Midnight of this day in UTC epoch timestamp 150 midnight = int( 151 datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp()) 152 153 # We only count passed days 154 time_sql = "timestamp < " + str(midnight) 155 156 # If the datasource is dynamic, we also only update days 157 # that haven't been added yet - these are heavy queries. 158 if not is_static: 159 days_added = self.db.fetchall( 160 "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % ( 161 database_db_id, board)) 162 163 if days_added: 164 165 last_day_added = max([row["date"] for row in days_added]) 166 last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace( 167 tzinfo=timezone.utc) 168 169 # If the last day added is today, there's no need to update yet 170 if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date(): 171 self.log.info( 172 "No new posts per day to count for %s%s" % (datasource_id, "/" + board)) 173 continue 174 175 # Change to UTC epoch timestamp for postgres query 176 after_timestamp = int(last_day_added.timestamp()) 177 178 time_sql += " AND timestamp > " + str(after_timestamp) + " " 179 180 self.log.info( 181 "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board)) 182 183 # Get those counts 184 query = """ 185 SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT 186 FROM %s 187 WHERE %s AND %s 188 GROUP BY metric, datasource, board, date; 189 """ % (database_db_id, posts_table, board_sql, time_sql) 190 # Add to metrics table 191 rows = [dict(row) for row in self.db.fetchall(query)] 192 193 if rows: 194 for row in rows: 195 self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"]) 196 197 # ------------------------------- 198 # no other metrics added yet 199 # ------------------------------- 200 201 self.job.finish()
Go through all local datasources, and update the posts per day if they haven't been calculated yet. These data can then be used to calculate e.g. posts per month.