Edit on GitHub

backend.workers.datasource_metrics

Calculate various 4CAT metrics

Two types of metrics are currently calculated:

  • General metrics. Currently this is mostly the total dataset size, which is useful to know for admins
  • Datasource metrics. This is used for both processors (e.g. to calculate relative) and to show how many posts a local datasource contains.
  1"""
  2Calculate various 4CAT metrics
  3
  4Two types of metrics are currently calculated:
  5
  6- General metrics. Currently this is mostly the total dataset size, which is
  7  useful to know for admins
  8- Datasource metrics. This is used for both processors (e.g. to calculate
  9  relative) and to show how many posts a local datasource contains.
 10"""
 11import os
 12
 13from datetime import datetime, time, timezone
 14
 15from backend.lib.worker import BasicWorker
 16
 17
 18class DatasourceMetrics(BasicWorker):
 19    """
 20    Calculate metrics
 21
 22    This will be stored in a separate PostgreSQL table.
 23    """
 24    type = "datasource-metrics"
 25    max_workers = 1
 26
 27    ensure_job = {"remote_id": "localhost", "interval": 43200}
 28
 29    def work(self):
 30        self.general_stats()
 31        self.data_stats()
 32
 33    @staticmethod
 34    def folder_size(path='.'):
 35        """
 36        Get the size of a folder using os.scandir for efficiency
 37        """
 38        total = 0
 39        for entry in os.scandir(path):
 40            if entry.is_file():
 41                total += entry.stat().st_size
 42            elif entry.is_dir():
 43                total += DatasourceMetrics.folder_size(entry.path)
 44        return total
 45
 46    def general_stats(self):
 47        """
 48        Calculate general 4CAT stats
 49
 50        These sizes can be very slow to calculate, which is why we do it in
 51        this worker instead of on demand.
 52        """
 53        metrics = {
 54            "size_data": DatasourceMetrics.folder_size(self.config.get("PATH_DATA")),
 55            "size_logs": DatasourceMetrics.folder_size(self.config.get("PATH_LOGS")),
 56            "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (self.config.get("DB_NAME"),))["num"]
 57        }
 58
 59        for metric, value in metrics.items():
 60            self.db.upsert("metrics", {
 61                "metric": metric,
 62                "count": value,
 63                "datasource": "4cat",
 64                "board": "",
 65                "date": "now"
 66            }, constraints=["metric", "datasource", "board", "date"])
 67
 68    def data_stats(self):
 69        """
 70        Go through all local datasources, and update the posts per day
 71        if they haven't been calculated yet. These data can then be used
 72        to calculate e.g. posts per month.
 73        :return:
 74        """
 75
 76        # Get a list of all database tables
 77        all_tables = [row["tablename"] for row in self.db.fetchall(
 78            "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")]
 79
 80        # Check if the metrics table is already present
 81        metrics_exists = True if "metrics" in all_tables else False
 82
 83        # If not, make it.
 84        if not metrics_exists:
 85            self.db.execute("""
 86				CREATE TABLE IF NOT EXISTS metrics (
 87				  metric             text,
 88				  datasource         text,
 89				  board              text,
 90				  date               text,
 91				  count              integer
 92				);
 93
 94			""")
 95
 96        added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")]
 97        enabled_datasources = self.config.get("datasources.enabled", {})
 98
 99        for datasource_id in self.modules.datasources:
100            if datasource_id not in enabled_datasources:
101                continue
102
103            datasource = self.modules.workers.get(datasource_id + "-search")
104            if not datasource:
105                continue
106
107            # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
108            database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id
109
110            is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
111            is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False
112
113            # Only update local datasources
114            if is_local:
115
116                # Some translating..
117                settings_id = datasource_id
118                if datasource_id == "4chan":
119                    settings_id = "fourchan"
120                elif datasource_id == "8chan":
121                    settings_id = "eightchan"
122
123                boards = [b for b in self.config.get(settings_id + "-search.boards", [])]
124
125                # If a datasource is static (so not updated) and it
126                # is already present in the metrics table, we don't
127                # need to update its metrics anymore.
128                if is_static and datasource_id in added_datasources:
129                    continue
130                else:
131
132                    # -------------------------
133                    #   Posts per day metric
134                    # -------------------------
135
136                    # Get the name of the posts table for this datasource
137                    posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id
138
139                    # Count and update for every board individually
140                    for board in boards:
141
142                        if not board:
143                            board_sql = " board = '' OR board = NULL"
144                        else:
145                            board_sql = " board='" + board + "'"
146
147                        # Midnight of this day in UTC epoch timestamp
148                        midnight = int(
149                            datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp())
150
151                        # We only count passed days
152                        time_sql = "timestamp < " + str(midnight)
153
154                        # If the datasource is dynamic, we also only update days
155                        # that haven't been added yet - these are heavy queries.
156                        if not is_static:
157                            days_added = self.db.fetchall(
158                                "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (
159                                database_db_id, board))
160
161                            if days_added:
162
163                                last_day_added = max([row["date"] for row in days_added])
164                                last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace(
165                                    tzinfo=timezone.utc)
166
167                                # If the last day added is today, there's no need to update yet
168                                if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date():
169                                    self.log.info(
170                                        "No new posts per day to count for %s%s" % (datasource_id, "/" + board))
171                                    continue
172
173                                # Change to UTC epoch timestamp for postgres query
174                                after_timestamp = int(last_day_added.timestamp())
175
176                                time_sql += " AND timestamp > " + str(after_timestamp) + " "
177
178                        self.log.info(
179                            "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board))
180
181                        # Get those counts
182                        query = """
183							SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT
184							FROM %s
185							WHERE %s AND %s
186							GROUP BY metric, datasource, board, date;
187							""" % (database_db_id, posts_table, board_sql, time_sql)
188                        # Add to metrics table
189                        rows = [dict(row) for row in self.db.fetchall(query)]
190
191                        if rows:
192                            for row in rows:
193                                self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"])
194
195                # -------------------------------
196                #   no other metrics added yet
197                # -------------------------------
198
199        self.job.finish()
class DatasourceMetrics(backend.lib.worker.BasicWorker):
 19class DatasourceMetrics(BasicWorker):
 20    """
 21    Calculate metrics
 22
 23    This will be stored in a separate PostgreSQL table.
 24    """
 25    type = "datasource-metrics"
 26    max_workers = 1
 27
 28    ensure_job = {"remote_id": "localhost", "interval": 43200}
 29
 30    def work(self):
 31        self.general_stats()
 32        self.data_stats()
 33
 34    @staticmethod
 35    def folder_size(path='.'):
 36        """
 37        Get the size of a folder using os.scandir for efficiency
 38        """
 39        total = 0
 40        for entry in os.scandir(path):
 41            if entry.is_file():
 42                total += entry.stat().st_size
 43            elif entry.is_dir():
 44                total += DatasourceMetrics.folder_size(entry.path)
 45        return total
 46
 47    def general_stats(self):
 48        """
 49        Calculate general 4CAT stats
 50
 51        These sizes can be very slow to calculate, which is why we do it in
 52        this worker instead of on demand.
 53        """
 54        metrics = {
 55            "size_data": DatasourceMetrics.folder_size(self.config.get("PATH_DATA")),
 56            "size_logs": DatasourceMetrics.folder_size(self.config.get("PATH_LOGS")),
 57            "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (self.config.get("DB_NAME"),))["num"]
 58        }
 59
 60        for metric, value in metrics.items():
 61            self.db.upsert("metrics", {
 62                "metric": metric,
 63                "count": value,
 64                "datasource": "4cat",
 65                "board": "",
 66                "date": "now"
 67            }, constraints=["metric", "datasource", "board", "date"])
 68
 69    def data_stats(self):
 70        """
 71        Go through all local datasources, and update the posts per day
 72        if they haven't been calculated yet. These data can then be used
 73        to calculate e.g. posts per month.
 74        :return:
 75        """
 76
 77        # Get a list of all database tables
 78        all_tables = [row["tablename"] for row in self.db.fetchall(
 79            "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")]
 80
 81        # Check if the metrics table is already present
 82        metrics_exists = True if "metrics" in all_tables else False
 83
 84        # If not, make it.
 85        if not metrics_exists:
 86            self.db.execute("""
 87				CREATE TABLE IF NOT EXISTS metrics (
 88				  metric             text,
 89				  datasource         text,
 90				  board              text,
 91				  date               text,
 92				  count              integer
 93				);
 94
 95			""")
 96
 97        added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")]
 98        enabled_datasources = self.config.get("datasources.enabled", {})
 99
100        for datasource_id in self.modules.datasources:
101            if datasource_id not in enabled_datasources:
102                continue
103
104            datasource = self.modules.workers.get(datasource_id + "-search")
105            if not datasource:
106                continue
107
108            # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
109            database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id
110
111            is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
112            is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False
113
114            # Only update local datasources
115            if is_local:
116
117                # Some translating..
118                settings_id = datasource_id
119                if datasource_id == "4chan":
120                    settings_id = "fourchan"
121                elif datasource_id == "8chan":
122                    settings_id = "eightchan"
123
124                boards = [b for b in self.config.get(settings_id + "-search.boards", [])]
125
126                # If a datasource is static (so not updated) and it
127                # is already present in the metrics table, we don't
128                # need to update its metrics anymore.
129                if is_static and datasource_id in added_datasources:
130                    continue
131                else:
132
133                    # -------------------------
134                    #   Posts per day metric
135                    # -------------------------
136
137                    # Get the name of the posts table for this datasource
138                    posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id
139
140                    # Count and update for every board individually
141                    for board in boards:
142
143                        if not board:
144                            board_sql = " board = '' OR board = NULL"
145                        else:
146                            board_sql = " board='" + board + "'"
147
148                        # Midnight of this day in UTC epoch timestamp
149                        midnight = int(
150                            datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp())
151
152                        # We only count passed days
153                        time_sql = "timestamp < " + str(midnight)
154
155                        # If the datasource is dynamic, we also only update days
156                        # that haven't been added yet - these are heavy queries.
157                        if not is_static:
158                            days_added = self.db.fetchall(
159                                "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (
160                                database_db_id, board))
161
162                            if days_added:
163
164                                last_day_added = max([row["date"] for row in days_added])
165                                last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace(
166                                    tzinfo=timezone.utc)
167
168                                # If the last day added is today, there's no need to update yet
169                                if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date():
170                                    self.log.info(
171                                        "No new posts per day to count for %s%s" % (datasource_id, "/" + board))
172                                    continue
173
174                                # Change to UTC epoch timestamp for postgres query
175                                after_timestamp = int(last_day_added.timestamp())
176
177                                time_sql += " AND timestamp > " + str(after_timestamp) + " "
178
179                        self.log.info(
180                            "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board))
181
182                        # Get those counts
183                        query = """
184							SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT
185							FROM %s
186							WHERE %s AND %s
187							GROUP BY metric, datasource, board, date;
188							""" % (database_db_id, posts_table, board_sql, time_sql)
189                        # Add to metrics table
190                        rows = [dict(row) for row in self.db.fetchall(query)]
191
192                        if rows:
193                            for row in rows:
194                                self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"])
195
196                # -------------------------------
197                #   no other metrics added yet
198                # -------------------------------
199
200        self.job.finish()

Calculate metrics

This will be stored in a separate PostgreSQL table.

type = 'datasource-metrics'
max_workers = 1
ensure_job = {'remote_id': 'localhost', 'interval': 43200}
def work(self):
30    def work(self):
31        self.general_stats()
32        self.data_stats()

This is where the actual work happens

Whatever the worker is supposed to do, it should happen (or be initiated from) this method. By default it does nothing, descending classes should implement this method.

@staticmethod
def folder_size(path='.'):
34    @staticmethod
35    def folder_size(path='.'):
36        """
37        Get the size of a folder using os.scandir for efficiency
38        """
39        total = 0
40        for entry in os.scandir(path):
41            if entry.is_file():
42                total += entry.stat().st_size
43            elif entry.is_dir():
44                total += DatasourceMetrics.folder_size(entry.path)
45        return total

Get the size of a folder using os.scandir for efficiency

def general_stats(self):
47    def general_stats(self):
48        """
49        Calculate general 4CAT stats
50
51        These sizes can be very slow to calculate, which is why we do it in
52        this worker instead of on demand.
53        """
54        metrics = {
55            "size_data": DatasourceMetrics.folder_size(self.config.get("PATH_DATA")),
56            "size_logs": DatasourceMetrics.folder_size(self.config.get("PATH_LOGS")),
57            "size_db": self.db.fetchone("SELECT pg_database_size(%s) AS num", (self.config.get("DB_NAME"),))["num"]
58        }
59
60        for metric, value in metrics.items():
61            self.db.upsert("metrics", {
62                "metric": metric,
63                "count": value,
64                "datasource": "4cat",
65                "board": "",
66                "date": "now"
67            }, constraints=["metric", "datasource", "board", "date"])

Calculate general 4CAT stats

These sizes can be very slow to calculate, which is why we do it in this worker instead of on demand.

def data_stats(self):
 69    def data_stats(self):
 70        """
 71        Go through all local datasources, and update the posts per day
 72        if they haven't been calculated yet. These data can then be used
 73        to calculate e.g. posts per month.
 74        :return:
 75        """
 76
 77        # Get a list of all database tables
 78        all_tables = [row["tablename"] for row in self.db.fetchall(
 79            "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")]
 80
 81        # Check if the metrics table is already present
 82        metrics_exists = True if "metrics" in all_tables else False
 83
 84        # If not, make it.
 85        if not metrics_exists:
 86            self.db.execute("""
 87				CREATE TABLE IF NOT EXISTS metrics (
 88				  metric             text,
 89				  datasource         text,
 90				  board              text,
 91				  date               text,
 92				  count              integer
 93				);
 94
 95			""")
 96
 97        added_datasources = [row["datasource"] for row in self.db.fetchall("SELECT DISTINCT(datasource) FROM metrics")]
 98        enabled_datasources = self.config.get("datasources.enabled", {})
 99
100        for datasource_id in self.modules.datasources:
101            if datasource_id not in enabled_datasources:
102                continue
103
104            datasource = self.modules.workers.get(datasource_id + "-search")
105            if not datasource:
106                continue
107
108            # Database IDs may be different from the Datasource ID (e.g. the datasource "4chan" became "fourchan" but the database ID remained "4chan")
109            database_db_id = datasource.prefix if hasattr(datasource, "prefix") else datasource_id
110
111            is_local = True if hasattr(datasource, "is_local") and datasource.is_local else False
112            is_static = True if hasattr(datasource, "is_static") and datasource.is_static else False
113
114            # Only update local datasources
115            if is_local:
116
117                # Some translating..
118                settings_id = datasource_id
119                if datasource_id == "4chan":
120                    settings_id = "fourchan"
121                elif datasource_id == "8chan":
122                    settings_id = "eightchan"
123
124                boards = [b for b in self.config.get(settings_id + "-search.boards", [])]
125
126                # If a datasource is static (so not updated) and it
127                # is already present in the metrics table, we don't
128                # need to update its metrics anymore.
129                if is_static and datasource_id in added_datasources:
130                    continue
131                else:
132
133                    # -------------------------
134                    #   Posts per day metric
135                    # -------------------------
136
137                    # Get the name of the posts table for this datasource
138                    posts_table = datasource_id if "posts_" + database_db_id not in all_tables else "posts_" + database_db_id
139
140                    # Count and update for every board individually
141                    for board in boards:
142
143                        if not board:
144                            board_sql = " board = '' OR board = NULL"
145                        else:
146                            board_sql = " board='" + board + "'"
147
148                        # Midnight of this day in UTC epoch timestamp
149                        midnight = int(
150                            datetime.combine(datetime.today(), time.min).replace(tzinfo=timezone.utc).timestamp())
151
152                        # We only count passed days
153                        time_sql = "timestamp < " + str(midnight)
154
155                        # If the datasource is dynamic, we also only update days
156                        # that haven't been added yet - these are heavy queries.
157                        if not is_static:
158                            days_added = self.db.fetchall(
159                                "SELECT date FROM metrics WHERE datasource = '%s' AND board = '%s' AND metric = 'posts_per_day';" % (
160                                database_db_id, board))
161
162                            if days_added:
163
164                                last_day_added = max([row["date"] for row in days_added])
165                                last_day_added = datetime.strptime(last_day_added, '%Y-%m-%d').replace(
166                                    tzinfo=timezone.utc)
167
168                                # If the last day added is today, there's no need to update yet
169                                if last_day_added.date() == datetime.today().replace(tzinfo=timezone.utc).date():
170                                    self.log.info(
171                                        "No new posts per day to count for %s%s" % (datasource_id, "/" + board))
172                                    continue
173
174                                # Change to UTC epoch timestamp for postgres query
175                                after_timestamp = int(last_day_added.timestamp())
176
177                                time_sql += " AND timestamp > " + str(after_timestamp) + " "
178
179                        self.log.info(
180                            "Calculating metric posts_per_day for datasource %s%s" % (datasource_id, "/" + board))
181
182                        # Get those counts
183                        query = """
184							SELECT 'posts_per_day' AS metric, '%s' AS datasource, board, to_char(to_timestamp(timestamp), 'YYYY-MM-DD') AS date, count(*)COUNT
185							FROM %s
186							WHERE %s AND %s
187							GROUP BY metric, datasource, board, date;
188							""" % (database_db_id, posts_table, board_sql, time_sql)
189                        # Add to metrics table
190                        rows = [dict(row) for row in self.db.fetchall(query)]
191
192                        if rows:
193                            for row in rows:
194                                self.db.upsert("metrics", row, constraints=["metric", "datasource", "board", "date"])
195
196                # -------------------------------
197                #   no other metrics added yet
198                # -------------------------------
199
200        self.job.finish()

Go through all local datasources, and update the posts per day if they haven't been calculated yet. These data can then be used to calculate e.g. posts per month.

Returns