backend.workers.datasource_metrics
Calculate various 4CAT metrics
Two types of metrics are currently calculated:
- General metrics. Currently this is mostly the total dataset size, which is useful to know for admins
- Datasource metrics. This is used for both processors (e.g. to calculate relative) and to show how many posts a local datasource contains.
1""" 2Calculate various 4CAT metrics 3 4Two types of metrics are currently calculated: 5 6- General metrics. Currently this is mostly the total dataset size, which is 7 useful to know for admins 8- Datasource metrics. This is used for both processors (e.g. to calculate 9 relative) and to show how many posts a local datasource contains. 10""" 11import os 12 13from datetime import datetime, time, timezone 14 15from backend.lib.worker import BasicWorker 16 17 18class DatasourceMetrics(BasicWorker): 19 """ 20 Calculate metrics 21 22 This will be stored in a separate PostgreSQL table. 23 """ 24 type = "datasource-metrics" 25 max_workers = 1 26 27 ensure_job = {"remote_id": "localhost", "interval": 43200} 28 29 def work(self): 30 self.general_stats() 31 self.data_stats() 32 33 @staticmethod 34 def folder_size(path='.'): 35 """ 36 Get the size of a folder using os.scandir for efficiency 37 """ 38 total = 0 39 for entry in os.scandir(path): 40 if entry.is_file(): 41 total += entry.stat().st_size 42 elif entry.is_dir(): 43 total += DatasourceMetrics.folder_size(entry.path) 44 return total 45 46 def general_stats(self): 47 """ 48 Calculate general 4CAT stats 49 50 These sizes can be very slow to calculate, which is why we do it in 51 this worker instead of on demand. 52 """ 53 metrics = { 54 "size_data": DatasourceMetrics.folder_size(self.config.get("PATH_DATA")), 55 "size_logs": DatasourceMetrics.folder_size(self.config.get("PATH_LOGS")), 56 "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (self.config.get("DB_NAME"),))["num"] 57 } 58 59 for metric, value in metrics.items(): 60 self.db.upsert("metrics", { 61 "metric": metric, 62 "count": value, 63 "datasource": "4cat", 64 "board": "", 65 "date": "now" 66 }, constraints=["metric", "datasource", "board", "date"]) 67 68 def data_stats(self): 69 """ 70 Go through all local datasources, and update the posts per day 71 if they haven't been calculated yet. These data can then be used 72 to calculate e.g. posts per month. 73 :return: 74 """ 75 76 # Get a list of all database tables 77 all_tables = [row["tablename"] for row in self.db.fetchall( 78 "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")] 79 80 # Check if the metrics table is already present 81 metrics_exists = True if "metrics" in all_tables else False 82 83 # If not, make it. 84 if not metrics_exists: 85 self.db.execute(""" 86 CREATE TABLE IF NOT EXISTS metrics ( 87 metric text, 88 datasource text, 89 board text, 90 date text, 91 count integer 92 ); 93 94 """) 95 96 added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")] 97 enabled_datasources = self.config.get("datasources.enabled", {}) 98 99 for datasource_id in self.modules.datasources: 100 if datasource_id not in enabled_datasources: 101 continue 102 103 datasource = self.modules.workers.get(datasource_id + "-search") 104 if not datasource: 105 continue 106 107 # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") 108 database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id 109 110 is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False 111 is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False 112 113 # Only update local datasources 114 if is_local: 115 116 # Some translating.. 117 settings_id = datasource_id 118 if datasource_id == "4chan": 119 settings_id = "fourchan" 120 elif datasource_id == "8chan": 121 settings_id = "eightchan" 122 123 boards = [b for b in self.config.get(settings_id + "-search.boards", [])] 124 125 # If a datasource is static (so not updated) and it 126 # is already present in the metrics table, we don't 127 # need to update its metrics anymore. 128 if is_static and datasource_id in added_datasources: 129 continue 130 else: 131 132 # ------------------------- 133 # Posts per day metric 134 # ------------------------- 135 136 # Get the name of the posts table for this datasource 137 posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id 138 139 # Count and update for every board individually 140 for board in boards: 141 142 if not board: 143 board_sql = " board = '' OR board = NULL" 144 else: 145 board_sql = " board='" + board + "'" 146 147 # Midnight of this day in UTC epoch timestamp 148 midnight = int( 149 datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp()) 150 151 # We only count passed days 152 time_sql = "timestamp < " + str(midnight) 153 154 # If the datasource is dynamic, we also only update days 155 # that haven't been added yet - these are heavy queries. 156 if not is_static: 157 days_added = self.db.fetchall( 158 "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % ( 159 database_db_id, board)) 160 161 if days_added: 162 163 last_day_added = max([row["date"] for row in days_added]) 164 last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace( 165 tzinfo=timezone.utc) 166 167 # If the last day added is today, there's no need to update yet 168 if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date(): 169 self.log.info( 170 "No new posts per day to count for %s%s" % (datasource_id, "/" + board)) 171 continue 172 173 # Change to UTC epoch timestamp for postgres query 174 after_timestamp = int(last_day_added.timestamp()) 175 176 time_sql += " AND timestamp > " + str(after_timestamp) + " " 177 178 self.log.info( 179 "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board)) 180 181 # Get those counts 182 query = """ 183 SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT 184 FROM %s 185 WHERE %s AND %s 186 GROUP BY metric, datasource, board, date; 187 """ % (database_db_id, posts_table, board_sql, time_sql) 188 # Add to metrics table 189 rows = [dict(row) for row in self.db.fetchall(query)] 190 191 if rows: 192 for row in rows: 193 self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"]) 194 195 # ------------------------------- 196 # no other metrics added yet 197 # ------------------------------- 198 199 self.job.finish()
19class DatasourceMetrics(BasicWorker): 20 """ 21 Calculate metrics 22 23 This will be stored in a separate PostgreSQL table. 24 """ 25 type = "datasource-metrics" 26 max_workers = 1 27 28 ensure_job = {"remote_id": "localhost", "interval": 43200} 29 30 def work(self): 31 self.general_stats() 32 self.data_stats() 33 34 @staticmethod 35 def folder_size(path='.'): 36 """ 37 Get the size of a folder using os.scandir for efficiency 38 """ 39 total = 0 40 for entry in os.scandir(path): 41 if entry.is_file(): 42 total += entry.stat().st_size 43 elif entry.is_dir(): 44 total += DatasourceMetrics.folder_size(entry.path) 45 return total 46 47 def general_stats(self): 48 """ 49 Calculate general 4CAT stats 50 51 These sizes can be very slow to calculate, which is why we do it in 52 this worker instead of on demand. 53 """ 54 metrics = { 55 "size_data": DatasourceMetrics.folder_size(self.config.get("PATH_DATA")), 56 "size_logs": DatasourceMetrics.folder_size(self.config.get("PATH_LOGS")), 57 "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (self.config.get("DB_NAME"),))["num"] 58 } 59 60 for metric, value in metrics.items(): 61 self.db.upsert("metrics", { 62 "metric": metric, 63 "count": value, 64 "datasource": "4cat", 65 "board": "", 66 "date": "now" 67 }, constraints=["metric", "datasource", "board", "date"]) 68 69 def data_stats(self): 70 """ 71 Go through all local datasources, and update the posts per day 72 if they haven't been calculated yet. These data can then be used 73 to calculate e.g. posts per month. 74 :return: 75 """ 76 77 # Get a list of all database tables 78 all_tables = [row["tablename"] for row in self.db.fetchall( 79 "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")] 80 81 # Check if the metrics table is already present 82 metrics_exists = True if "metrics" in all_tables else False 83 84 # If not, make it. 85 if not metrics_exists: 86 self.db.execute(""" 87 CREATE TABLE IF NOT EXISTS metrics ( 88 metric text, 89 datasource text, 90 board text, 91 date text, 92 count integer 93 ); 94 95 """) 96 97 added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")] 98 enabled_datasources = self.config.get("datasources.enabled", {}) 99 100 for datasource_id in self.modules.datasources: 101 if datasource_id not in enabled_datasources: 102 continue 103 104 datasource = self.modules.workers.get(datasource_id + "-search") 105 if not datasource: 106 continue 107 108 # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") 109 database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id 110 111 is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False 112 is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False 113 114 # Only update local datasources 115 if is_local: 116 117 # Some translating.. 118 settings_id = datasource_id 119 if datasource_id == "4chan": 120 settings_id = "fourchan" 121 elif datasource_id == "8chan": 122 settings_id = "eightchan" 123 124 boards = [b for b in self.config.get(settings_id + "-search.boards", [])] 125 126 # If a datasource is static (so not updated) and it 127 # is already present in the metrics table, we don't 128 # need to update its metrics anymore. 129 if is_static and datasource_id in added_datasources: 130 continue 131 else: 132 133 # ------------------------- 134 # Posts per day metric 135 # ------------------------- 136 137 # Get the name of the posts table for this datasource 138 posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id 139 140 # Count and update for every board individually 141 for board in boards: 142 143 if not board: 144 board_sql = " board = '' OR board = NULL" 145 else: 146 board_sql = " board='" + board + "'" 147 148 # Midnight of this day in UTC epoch timestamp 149 midnight = int( 150 datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp()) 151 152 # We only count passed days 153 time_sql = "timestamp < " + str(midnight) 154 155 # If the datasource is dynamic, we also only update days 156 # that haven't been added yet - these are heavy queries. 157 if not is_static: 158 days_added = self.db.fetchall( 159 "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % ( 160 database_db_id, board)) 161 162 if days_added: 163 164 last_day_added = max([row["date"] for row in days_added]) 165 last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace( 166 tzinfo=timezone.utc) 167 168 # If the last day added is today, there's no need to update yet 169 if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date(): 170 self.log.info( 171 "No new posts per day to count for %s%s" % (datasource_id, "/" + board)) 172 continue 173 174 # Change to UTC epoch timestamp for postgres query 175 after_timestamp = int(last_day_added.timestamp()) 176 177 time_sql += " AND timestamp > " + str(after_timestamp) + " " 178 179 self.log.info( 180 "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board)) 181 182 # Get those counts 183 query = """ 184 SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT 185 FROM %s 186 WHERE %s AND %s 187 GROUP BY metric, datasource, board, date; 188 """ % (database_db_id, posts_table, board_sql, time_sql) 189 # Add to metrics table 190 rows = [dict(row) for row in self.db.fetchall(query)] 191 192 if rows: 193 for row in rows: 194 self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"]) 195 196 # ------------------------------- 197 # no other metrics added yet 198 # ------------------------------- 199 200 self.job.finish()
Calculate metrics
This will be stored in a separate PostgreSQL table.
def
work(self):
This is where the actual work happens
Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.
@staticmethod
def
folder_size(path='.'):
34 @staticmethod 35 def folder_size(path='.'): 36 """ 37 Get the size of a folder using os.scandir for efficiency 38 """ 39 total = 0 40 for entry in os.scandir(path): 41 if entry.is_file(): 42 total += entry.stat().st_size 43 elif entry.is_dir(): 44 total += DatasourceMetrics.folder_size(entry.path) 45 return total
Get the size of a folder using os.scandir for efficiency
def
general_stats(self):
47 def general_stats(self): 48 """ 49 Calculate general 4CAT stats 50 51 These sizes can be very slow to calculate, which is why we do it in 52 this worker instead of on demand. 53 """ 54 metrics = { 55 "size_data": DatasourceMetrics.folder_size(self.config.get("PATH_DATA")), 56 "size_logs": DatasourceMetrics.folder_size(self.config.get("PATH_LOGS")), 57 "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (self.config.get("DB_NAME"),))["num"] 58 } 59 60 for metric, value in metrics.items(): 61 self.db.upsert("metrics", { 62 "metric": metric, 63 "count": value, 64 "datasource": "4cat", 65 "board": "", 66 "date": "now" 67 }, constraints=["metric", "datasource", "board", "date"])
Calculate general 4CAT stats
These sizes can be very slow to calculate, which is why we do it in this worker instead of on demand.
def
data_stats(self):
69 def data_stats(self): 70 """ 71 Go through all local datasources, and update the posts per day 72 if they haven't been calculated yet. These data can then be used 73 to calculate e.g. posts per month. 74 :return: 75 """ 76 77 # Get a list of all database tables 78 all_tables = [row["tablename"] for row in self.db.fetchall( 79 "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")] 80 81 # Check if the metrics table is already present 82 metrics_exists = True if "metrics" in all_tables else False 83 84 # If not, make it. 85 if not metrics_exists: 86 self.db.execute(""" 87 CREATE TABLE IF NOT EXISTS metrics ( 88 metric text, 89 datasource text, 90 board text, 91 date text, 92 count integer 93 ); 94 95 """) 96 97 added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")] 98 enabled_datasources = self.config.get("datasources.enabled", {}) 99 100 for datasource_id in self.modules.datasources: 101 if datasource_id not in enabled_datasources: 102 continue 103 104 datasource = self.modules.workers.get(datasource_id + "-search") 105 if not datasource: 106 continue 107 108 # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan") 109 database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id 110 111 is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False 112 is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False 113 114 # Only update local datasources 115 if is_local: 116 117 # Some translating.. 118 settings_id = datasource_id 119 if datasource_id == "4chan": 120 settings_id = "fourchan" 121 elif datasource_id == "8chan": 122 settings_id = "eightchan" 123 124 boards = [b for b in self.config.get(settings_id + "-search.boards", [])] 125 126 # If a datasource is static (so not updated) and it 127 # is already present in the metrics table, we don't 128 # need to update its metrics anymore. 129 if is_static and datasource_id in added_datasources: 130 continue 131 else: 132 133 # ------------------------- 134 # Posts per day metric 135 # ------------------------- 136 137 # Get the name of the posts table for this datasource 138 posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id 139 140 # Count and update for every board individually 141 for board in boards: 142 143 if not board: 144 board_sql = " board = '' OR board = NULL" 145 else: 146 board_sql = " board='" + board + "'" 147 148 # Midnight of this day in UTC epoch timestamp 149 midnight = int( 150 datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp()) 151 152 # We only count passed days 153 time_sql = "timestamp < " + str(midnight) 154 155 # If the datasource is dynamic, we also only update days 156 # that haven't been added yet - these are heavy queries. 157 if not is_static: 158 days_added = self.db.fetchall( 159 "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % ( 160 database_db_id, board)) 161 162 if days_added: 163 164 last_day_added = max([row["date"] for row in days_added]) 165 last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace( 166 tzinfo=timezone.utc) 167 168 # If the last day added is today, there's no need to update yet 169 if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date(): 170 self.log.info( 171 "No new posts per day to count for %s%s" % (datasource_id, "/" + board)) 172 continue 173 174 # Change to UTC epoch timestamp for postgres query 175 after_timestamp = int(last_day_added.timestamp()) 176 177 time_sql += " AND timestamp > " + str(after_timestamp) + " " 178 179 self.log.info( 180 "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board)) 181 182 # Get those counts 183 query = """ 184 SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT 185 FROM %s 186 WHERE %s AND %s 187 GROUP BY metric, datasource, board, date; 188 """ % (database_db_id, posts_table, board_sql, time_sql) 189 # Add to metrics table 190 rows = [dict(row) for row in self.db.fetchall(query)] 191 192 if rows: 193 for row in rows: 194 self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"]) 195 196 # ------------------------------- 197 # no other metrics added yet 198 # ------------------------------- 199 200 self.job.finish()
Go through all local datasources, and update the posts per day if they haven't been calculated yet. These data can then be used to calculate e.g. posts per month.