Edit on GitHub

backend.workers.datasource_metrics

Calculate various 4CAT metrics

Two types of metrics are currently calculated:

  • General metrics. Currently this is mostly the total dataset size, which is useful to know for admins
  • Datasource metrics. This is used for both processors (e.g. to calculate relative) and to show how many posts a local datasource contains.
  1"""
  2Calculate various 4CAT metrics
  3
  4Two types of metrics are currently calculated:
  5
  6- General metrics. Currently this is mostly the total dataset size, which is
  7  useful to know for admins
  8- Datasource metrics. This is used for both processors (e.g. to calculate
  9  relative) and to show how many posts a local datasource contains.
 10"""
 11import os
 12
 13from datetime import datetime, time, timezone
 14
 15from backend.lib.worker import BasicWorker
 16from common.config_manager import config
 17
 18
 19class DatasourceMetrics(BasicWorker):
 20    """
 21    Calculate metrics
 22
 23    This will be stored in a separate PostgreSQL table.
 24    """
 25    type = "datasource-metrics"
 26    max_workers = 1
 27
 28    ensure_job = {"remote_id": "localhost", "interval": 43200}
 29
 30    def work(self):
 31        self.general_stats()
 32        self.data_stats()
 33
 34    @staticmethod
 35    def folder_size(path='.'):
 36        """
 37        Get the size of a folder using os.scandir for efficiency
 38        """
 39        total = 0
 40        for entry in os.scandir(path):
 41            if entry.is_file():
 42                total += entry.stat().st_size
 43            elif entry.is_dir():
 44                total += DatasourceMetrics.folder_size(entry.path)
 45        return total
 46
 47    def general_stats(self):
 48        """
 49        Calculate general 4CAT stats
 50
 51        These sizes can be very slow to calculate, which is why we do it in
 52        this worker instead of on demand.
 53        """
 54        metrics = {
 55            "size_data": DatasourceMetrics.folder_size(config.get("PATH_DATA")),
 56            "size_logs": DatasourceMetrics.folder_size(config.get("PATH_LOGS")),
 57            "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (config.get("DB_NAME"),))["num"]
 58        }
 59
 60        for metric, value in metrics.items():
 61            self.db.upsert("metrics", {
 62                "metric": metric,
 63                "count": value,
 64                "datasource": "4cat",
 65                "board": "",
 66                "date": "now"
 67            }, constraints=["metric", "datasource", "board", "date"])
 68
 69    def data_stats(self):
 70        """
 71        Go through all local datasources, and update the posts per day
 72        if they haven't been calculated yet. These data can then be used
 73        to calculate e.g. posts per month.
 74        :return:
 75        """
 76
 77        # Get a list of all database tables
 78        all_tables = [row["tablename"] for row in self.db.fetchall(
 79            "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")]
 80
 81        # Check if the metrics table is already present
 82        metrics_exists = True if "metrics" in all_tables else False
 83
 84        # If not, make it.
 85        if not metrics_exists:
 86            self.db.execute("""
 87				CREATE TABLE IF NOT EXISTS metrics (
 88				  metric             text,
 89				  datasource         text,
 90				  board              text,
 91				  date               text,
 92				  count              integer
 93				);
 94
 95			""")
 96
 97        added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")]
 98        enabled_datasources = config.get("datasources.enabled", {})
 99
100        for datasource_id in self.modules.datasources:
101            if datasource_id not in enabled_datasources:
102                continue
103
104            datasource = self.modules.workers.get(datasource_id + "-search")
105            if not datasource:
106                continue
107
108            # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
109            database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id
110
111            is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
112            is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False
113
114            # Only update local datasources
115            if is_local:
116
117                # Some translating..
118                settings_id = datasource_id
119                if datasource_id == "4chan":
120                    settings_id = "fourchan"
121                elif datasource_id == "8chan":
122                    settings_id = "eightchan"
123
124                boards = [b for b in config.get(settings_id + "-search.boards", [])]
125
126                # If a datasource is static (so not updated) and it
127                # is already present in the metrics table, we don't
128                # need to update its metrics anymore.
129                if is_static and datasource_id in added_datasources:
130                    continue
131                else:
132
133                    # -------------------------
134                    #   Posts per day metric
135                    # -------------------------
136
137                    # Get the name of the posts table for this datasource
138                    posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id
139
140                    # Count and update for every board individually
141                    for board in boards:
142
143                        if not board:
144                            board_sql = " board = '' OR board = NULL"
145                        else:
146                            board_sql = " board='" + board + "'"
147
148                        # Midnight of this day in UTC epoch timestamp
149                        midnight = int(
150                            datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp())
151
152                        # We only count passed days
153                        time_sql = "timestamp < " + str(midnight)
154
155                        # If the datasource is dynamic, we also only update days
156                        # that haven't been added yet - these are heavy queries.
157                        if not is_static:
158                            days_added = self.db.fetchall(
159                                "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (
160                                database_db_id, board))
161
162                            if days_added:
163
164                                last_day_added = max([row["date"] for row in days_added])
165                                last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace(
166                                    tzinfo=timezone.utc)
167
168                                # If the last day added is today, there's no need to update yet
169                                if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date():
170                                    self.log.info(
171                                        "No new posts per day to count for %s%s" % (datasource_id, "/" + board))
172                                    continue
173
174                                # Change to UTC epoch timestamp for postgres query
175                                after_timestamp = int(last_day_added.timestamp())
176
177                                time_sql += " AND timestamp > " + str(after_timestamp) + " "
178
179                        self.log.info(
180                            "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board))
181
182                        # Get those counts
183                        query = """
184							SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT
185							FROM %s
186							WHERE %s AND %s
187							GROUP BY metric, datasource, board, date;
188							""" % (database_db_id, posts_table, board_sql, time_sql)
189                        # Add to metrics table
190                        rows = [dict(row) for row in self.db.fetchall(query)]
191
192                        if rows:
193                            for row in rows:
194                                self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"])
195
196                # -------------------------------
197                #   no other metrics added yet
198                # -------------------------------
199
200        self.job.finish()
class DatasourceMetrics(backend.lib.worker.BasicWorker):
 20class DatasourceMetrics(BasicWorker):
 21    """
 22    Calculate metrics
 23
 24    This will be stored in a separate PostgreSQL table.
 25    """
 26    type = "datasource-metrics"
 27    max_workers = 1
 28
 29    ensure_job = {"remote_id": "localhost", "interval": 43200}
 30
 31    def work(self):
 32        self.general_stats()
 33        self.data_stats()
 34
 35    @staticmethod
 36    def folder_size(path='.'):
 37        """
 38        Get the size of a folder using os.scandir for efficiency
 39        """
 40        total = 0
 41        for entry in os.scandir(path):
 42            if entry.is_file():
 43                total += entry.stat().st_size
 44            elif entry.is_dir():
 45                total += DatasourceMetrics.folder_size(entry.path)
 46        return total
 47
 48    def general_stats(self):
 49        """
 50        Calculate general 4CAT stats
 51
 52        These sizes can be very slow to calculate, which is why we do it in
 53        this worker instead of on demand.
 54        """
 55        metrics = {
 56            "size_data": DatasourceMetrics.folder_size(config.get("PATH_DATA")),
 57            "size_logs": DatasourceMetrics.folder_size(config.get("PATH_LOGS")),
 58            "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (config.get("DB_NAME"),))["num"]
 59        }
 60
 61        for metric, value in metrics.items():
 62            self.db.upsert("metrics", {
 63                "metric": metric,
 64                "count": value,
 65                "datasource": "4cat",
 66                "board": "",
 67                "date": "now"
 68            }, constraints=["metric", "datasource", "board", "date"])
 69
 70    def data_stats(self):
 71        """
 72        Go through all local datasources, and update the posts per day
 73        if they haven't been calculated yet. These data can then be used
 74        to calculate e.g. posts per month.
 75        :return:
 76        """
 77
 78        # Get a list of all database tables
 79        all_tables = [row["tablename"] for row in self.db.fetchall(
 80            "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")]
 81
 82        # Check if the metrics table is already present
 83        metrics_exists = True if "metrics" in all_tables else False
 84
 85        # If not, make it.
 86        if not metrics_exists:
 87            self.db.execute("""
 88				CREATE TABLE IF NOT EXISTS metrics (
 89				  metric             text,
 90				  datasource         text,
 91				  board              text,
 92				  date               text,
 93				  count              integer
 94				);
 95
 96			""")
 97
 98        added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")]
 99        enabled_datasources = config.get("datasources.enabled", {})
100
101        for datasource_id in self.modules.datasources:
102            if datasource_id not in enabled_datasources:
103                continue
104
105            datasource = self.modules.workers.get(datasource_id + "-search")
106            if not datasource:
107                continue
108
109            # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
110            database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id
111
112            is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
113            is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False
114
115            # Only update local datasources
116            if is_local:
117
118                # Some translating..
119                settings_id = datasource_id
120                if datasource_id == "4chan":
121                    settings_id = "fourchan"
122                elif datasource_id == "8chan":
123                    settings_id = "eightchan"
124
125                boards = [b for b in config.get(settings_id + "-search.boards", [])]
126
127                # If a datasource is static (so not updated) and it
128                # is already present in the metrics table, we don't
129                # need to update its metrics anymore.
130                if is_static and datasource_id in added_datasources:
131                    continue
132                else:
133
134                    # -------------------------
135                    #   Posts per day metric
136                    # -------------------------
137
138                    # Get the name of the posts table for this datasource
139                    posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id
140
141                    # Count and update for every board individually
142                    for board in boards:
143
144                        if not board:
145                            board_sql = " board = '' OR board = NULL"
146                        else:
147                            board_sql = " board='" + board + "'"
148
149                        # Midnight of this day in UTC epoch timestamp
150                        midnight = int(
151                            datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp())
152
153                        # We only count passed days
154                        time_sql = "timestamp < " + str(midnight)
155
156                        # If the datasource is dynamic, we also only update days
157                        # that haven't been added yet - these are heavy queries.
158                        if not is_static:
159                            days_added = self.db.fetchall(
160                                "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (
161                                database_db_id, board))
162
163                            if days_added:
164
165                                last_day_added = max([row["date"] for row in days_added])
166                                last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace(
167                                    tzinfo=timezone.utc)
168
169                                # If the last day added is today, there's no need to update yet
170                                if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date():
171                                    self.log.info(
172                                        "No new posts per day to count for %s%s" % (datasource_id, "/" + board))
173                                    continue
174
175                                # Change to UTC epoch timestamp for postgres query
176                                after_timestamp = int(last_day_added.timestamp())
177
178                                time_sql += " AND timestamp > " + str(after_timestamp) + " "
179
180                        self.log.info(
181                            "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board))
182
183                        # Get those counts
184                        query = """
185							SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT
186							FROM %s
187							WHERE %s AND %s
188							GROUP BY metric, datasource, board, date;
189							""" % (database_db_id, posts_table, board_sql, time_sql)
190                        # Add to metrics table
191                        rows = [dict(row) for row in self.db.fetchall(query)]
192
193                        if rows:
194                            for row in rows:
195                                self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"])
196
197                # -------------------------------
198                #   no other metrics added yet
199                # -------------------------------
200
201        self.job.finish()

Calculate metrics

This will be stored in a separate PostgreSQL table.

type = 'datasource-metrics'
max_workers = 1
ensure_job = {'remote_id': 'localhost', 'interval': 43200}
def work(self):
31    def work(self):
32        self.general_stats()
33        self.data_stats()

This is where the actual work happens

Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.

@staticmethod
def folder_size(path='.'):
35    @staticmethod
36    def folder_size(path='.'):
37        """
38        Get the size of a folder using os.scandir for efficiency
39        """
40        total = 0
41        for entry in os.scandir(path):
42            if entry.is_file():
43                total += entry.stat().st_size
44            elif entry.is_dir():
45                total += DatasourceMetrics.folder_size(entry.path)
46        return total

Get the size of a folder using os.scandir for efficiency

def general_stats(self):
48    def general_stats(self):
49        """
50        Calculate general 4CAT stats
51
52        These sizes can be very slow to calculate, which is why we do it in
53        this worker instead of on demand.
54        """
55        metrics = {
56            "size_data": DatasourceMetrics.folder_size(config.get("PATH_DATA")),
57            "size_logs": DatasourceMetrics.folder_size(config.get("PATH_LOGS")),
58            "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (config.get("DB_NAME"),))["num"]
59        }
60
61        for metric, value in metrics.items():
62            self.db.upsert("metrics", {
63                "metric": metric,
64                "count": value,
65                "datasource": "4cat",
66                "board": "",
67                "date": "now"
68            }, constraints=["metric", "datasource", "board", "date"])

Calculate general 4CAT stats

These sizes can be very slow to calculate, which is why we do it in this worker instead of on demand.

def data_stats(self):
 70    def data_stats(self):
 71        """
 72        Go through all local datasources, and update the posts per day
 73        if they haven't been calculated yet. These data can then be used
 74        to calculate e.g. posts per month.
 75        :return:
 76        """
 77
 78        # Get a list of all database tables
 79        all_tables = [row["tablename"] for row in self.db.fetchall(
 80            "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")]
 81
 82        # Check if the metrics table is already present
 83        metrics_exists = True if "metrics" in all_tables else False
 84
 85        # If not, make it.
 86        if not metrics_exists:
 87            self.db.execute("""
 88				CREATE TABLE IF NOT EXISTS metrics (
 89				  metric             text,
 90				  datasource         text,
 91				  board              text,
 92				  date               text,
 93				  count              integer
 94				);
 95
 96			""")
 97
 98        added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")]
 99        enabled_datasources = config.get("datasources.enabled", {})
100
101        for datasource_id in self.modules.datasources:
102            if datasource_id not in enabled_datasources:
103                continue
104
105            datasource = self.modules.workers.get(datasource_id + "-search")
106            if not datasource:
107                continue
108
109            # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
110            database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id
111
112            is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
113            is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False
114
115            # Only update local datasources
116            if is_local:
117
118                # Some translating..
119                settings_id = datasource_id
120                if datasource_id == "4chan":
121                    settings_id = "fourchan"
122                elif datasource_id == "8chan":
123                    settings_id = "eightchan"
124
125                boards = [b for b in config.get(settings_id + "-search.boards", [])]
126
127                # If a datasource is static (so not updated) and it
128                # is already present in the metrics table, we don't
129                # need to update its metrics anymore.
130                if is_static and datasource_id in added_datasources:
131                    continue
132                else:
133
134                    # -------------------------
135                    #   Posts per day metric
136                    # -------------------------
137
138                    # Get the name of the posts table for this datasource
139                    posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id
140
141                    # Count and update for every board individually
142                    for board in boards:
143
144                        if not board:
145                            board_sql = " board = '' OR board = NULL"
146                        else:
147                            board_sql = " board='" + board + "'"
148
149                        # Midnight of this day in UTC epoch timestamp
150                        midnight = int(
151                            datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp())
152
153                        # We only count passed days
154                        time_sql = "timestamp < " + str(midnight)
155
156                        # If the datasource is dynamic, we also only update days
157                        # that haven't been added yet - these are heavy queries.
158                        if not is_static:
159                            days_added = self.db.fetchall(
160                                "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (
161                                database_db_id, board))
162
163                            if days_added:
164
165                                last_day_added = max([row["date"] for row in days_added])
166                                last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace(
167                                    tzinfo=timezone.utc)
168
169                                # If the last day added is today, there's no need to update yet
170                                if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date():
171                                    self.log.info(
172                                        "No new posts per day to count for %s%s" % (datasource_id, "/" + board))
173                                    continue
174
175                                # Change to UTC epoch timestamp for postgres query
176                                after_timestamp = int(last_day_added.timestamp())
177
178                                time_sql += " AND timestamp > " + str(after_timestamp) + " "
179
180                        self.log.info(
181                            "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board))
182
183                        # Get those counts
184                        query = """
185							SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT
186							FROM %s
187							WHERE %s AND %s
188							GROUP BY metric, datasource, board, date;
189							""" % (database_db_id, posts_table, board_sql, time_sql)
190                        # Add to metrics table
191                        rows = [dict(row) for row in self.db.fetchall(query)]
192
193                        if rows:
194                            for row in rows:
195                                self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"])
196
197                # -------------------------------
198                #   no other metrics added yet
199                # -------------------------------
200
201        self.job.finish()

Go through all local datasources, and update the posts per day if they haven't been calculated yet. These data can then be used to calculate e.g. posts per month.

Returns