Edit on GitHub

common.lib.helpers

Miscellaneous helper functions for the 4CAT backend

   1"""
   2Miscellaneous helper functions for the 4CAT backend
   3"""
   4import subprocess
   5import imagehash
   6import hashlib
   7import requests
   8import datetime
   9import smtplib
  10import fnmatch
  11import socket
  12import shlex
  13import copy
  14import time
  15import json
  16import math
  17import csv
  18import ssl
  19import re
  20import os
  21import io
  22
  23from pathlib import Path
  24from collections.abc import MutableMapping
  25from html.parser import HTMLParser
  26from urllib.parse import urlparse, urlunparse
  27from calendar import monthrange
  28from packaging import version
  29from PIL import Image
  30
  31from common.lib.user_input import UserInput
  32from common.config_manager import config
  33
  34
  35def init_datasource(database, logger, queue, name):
  36    """
  37    Initialize data source
  38
  39    Queues jobs to scrape the boards that were configured to be scraped in the
  40    4CAT configuration file. If none were configured, nothing happens.
  41
  42    :param Database database:  Database connection instance
  43    :param Logger logger:  Log handler
  44    :param JobQueue queue:  Job Queue instance
  45    :param string name:  ID of datasource that is being initialised
  46    """
  47    pass
  48
  49def strip_tags(html, convert_newlines=True):
  50    """
  51    Strip HTML from a string
  52
  53    :param html: HTML to strip
  54    :param convert_newlines: Convert <br> and </p> tags to \n before stripping
  55    :return: Stripped HTML
  56    """
  57    if not html:
  58        return ""
  59
  60    deduplicate_newlines = re.compile(r"\n+")
  61
  62    if convert_newlines:
  63        html = html.replace("<br>", "\n").replace("</p>", "</p>\n")
  64        html = deduplicate_newlines.sub("\n", html)
  65
  66    class HTMLStripper(HTMLParser):
  67        def __init__(self):
  68            super().__init__()
  69            self.reset()
  70            self.strict = False
  71            self.convert_charrefs = True
  72            self.fed = []
  73
  74        def handle_data(self, data):
  75            self.fed.append(data)
  76
  77        def get_data(self):
  78            return "".join(self.fed)
  79
  80    stripper = HTMLStripper()
  81    stripper.feed(html)
  82    return stripper.get_data()
  83
  84
  85def sniff_encoding(file):
  86    """
  87    Determine encoding from raw file bytes
  88
  89    Currently only distinguishes UTF-8 and UTF-8 with BOM
  90
  91    :param file:
  92    :return:
  93    """
  94    if type(file) == bytearray:
  95        maybe_bom = file[:3]
  96    elif hasattr(file, "getbuffer"):
  97        buffer = file.getbuffer()
  98        maybe_bom = buffer[:3].tobytes()
  99    elif hasattr(file, "peek"):
 100        buffer = file.peek(32)
 101        maybe_bom = buffer[:3]
 102    else:
 103        maybe_bom = False
 104
 105    return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8"
 106
 107def sniff_csv_dialect(csv_input):
 108    """
 109    Determine CSV dialect for an input stream
 110
 111    :param csv_input:  Input stream
 112    :return tuple:  Tuple: Dialect object and a boolean representing whether
 113    the CSV file seems to have a header
 114    """
 115    encoding = sniff_encoding(csv_input)
 116    if type(csv_input) is io.TextIOWrapper:
 117        wrapped_input = csv_input
 118    else:
 119        wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding)
 120    wrapped_input.seek(0)
 121    sample = wrapped_input.read(1024 * 1024)
 122    wrapped_input.seek(0)
 123    has_header = csv.Sniffer().has_header(sample)
 124    dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 125
 126    return dialect, has_header
 127
 128
 129def get_git_branch():
 130    """
 131    Get current git branch
 132
 133    If the 4CAT root folder is a git repository, this function will return the
 134    name of the currently checked-out branch. If the folder is not a git
 135    repository or git is not installed an empty string is returned.
 136    """
 137    try:
 138        root_dir = str(config.get('PATH_ROOT').resolve())
 139        branch = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE)
 140        if branch.returncode != 0:
 141            raise ValueError()
 142        branch_name = branch.stdout.decode("utf-8").strip()
 143        if not branch_name:
 144            # Check for detached HEAD state
 145            # Most likely occuring because of checking out release tags (which are not branches) or commits
 146            head_status = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} status"), stdout=subprocess.PIPE)
 147            if head_status.returncode == 0:
 148                for line in head_status.stdout.decode("utf-8").split("\n"):
 149                    if "HEAD detached at" in line:
 150                        branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1]
 151                        return branch_name
 152    except (subprocess.SubprocessError, ValueError, FileNotFoundError):
 153        return ""
 154
 155
 156def get_software_commit(worker=None):
 157    """
 158    Get current 4CAT git commit hash
 159
 160    Use `get_software_version()` instead if you need the release version
 161    number rather than the precise commit hash.
 162
 163    If no version file is available, run `git show` to test if there is a git
 164    repository in the 4CAT root folder, and if so, what commit is currently
 165    checked out in it.
 166
 167    For extensions, get the repository information for that extension, or if
 168    the extension is not a git repository, return empty data.
 169
 170    :param BasicWorker processor:  Worker to get commit for. If not given, get
 171    version information for the main 4CAT installation.
 172
 173    :return tuple:  4CAT git commit hash, repository name
 174    """
 175    # try git command line within the 4CAT root folder
 176    # if it is a checked-out git repository, it will tell us the hash of
 177    # the currently checked-out commit
 178
 179    # path has no Path.relative()...
 180    relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent
 181    try:
 182        # if extension, go to the extension file's path
 183        # we will run git here - if it is not its own repository, we have no
 184        # useful version info (since the extension is by definition not in the
 185        # main 4CAT repository) and will return an empty value
 186        if worker and worker.is_extension:
 187            working_dir = str(config.get("PATH_ROOT").joinpath(relative_filepath).resolve())
 188            # check if we are in the extensions' own repo or 4CAT's
 189            git_cmd = f"git -C {shlex.quote(working_dir)} rev-parse --show-toplevel"
 190            repo_level = subprocess.run(shlex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 191            if Path(repo_level.stdout.decode("utf-8")) == config.get("PATH_ROOT"):
 192                # not its own repository
 193                return ("", "")
 194
 195        else:
 196            working_dir = str(config.get("PATH_ROOT").resolve())
 197
 198        show = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 199        if show.returncode != 0:
 200            raise ValueError()
 201        commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1]
 202
 203        # now get the repository the commit belongs to, if we can
 204        origin = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 205        if origin.returncode != 0 or not origin.stdout:
 206            raise ValueError()
 207        repository = origin.stdout.decode("utf-8").strip()
 208        if repository.endswith(".git"):
 209            repository = repository[:-4]
 210
 211    except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e:
 212        return ("", "")
 213
 214    return (commit, repository)
 215
 216def get_software_version():
 217    """
 218    Get current 4CAT version
 219
 220    This is the actual software version, i.e. not the commit hash (see
 221    `get_software_hash()` for that). The current version is stored in a file
 222    with a canonical location: if the file doesn't exist, an empty string is
 223    returned.
 224
 225    :return str:  Software version, for example `1.37`.
 226    """
 227    current_version_file = config.get("PATH_ROOT").joinpath("config/.current-version")
 228    if not current_version_file.exists():
 229        return ""
 230
 231    with current_version_file.open() as infile:
 232        return infile.readline().strip()
 233
 234def get_github_version(timeout=5):
 235    """
 236    Get latest release tag version from GitHub
 237
 238    Will raise a ValueError if it cannot retrieve information from GitHub.
 239
 240    :param int timeout:  Timeout in seconds for HTTP request
 241
 242    :return tuple:  Version, e.g. `1.26`, and release URL.
 243    """
 244    repo_url = config.get("4cat.github_url")
 245    if not repo_url.endswith("/"):
 246        repo_url += "/"
 247
 248    repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url))
 249
 250    api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id
 251    response = requests.get(api_url, timeout=timeout)
 252    response = response.json()
 253    if response.get("message") == "Not Found":
 254        raise ValueError("Invalid GitHub URL or repository name")
 255
 256    latest_tag = response.get("tag_name", "unknown")
 257    if latest_tag.startswith("v"):
 258        latest_tag = re.sub(r"^v", "", latest_tag)
 259
 260    return (latest_tag, response.get("html_url"))
 261
 262def get_ffmpeg_version(ffmpeg_path):
 263    """
 264    Determine ffmpeg version
 265
 266    This can be necessary when using commands that change name between versions.
 267
 268    :param ffmpeg_path: ffmpeg executable path
 269    :return packaging.version:  Comparable ersion
 270    """
 271    command = [ffmpeg_path, "-version"]
 272    ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
 273                                    stderr=subprocess.PIPE)
 274
 275    ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1]
 276    ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0]
 277
 278    return version.parse(ffmpeg_version)
 279
 280
 281def find_extensions():
 282    """
 283    Find 4CAT extensions and load their metadata
 284
 285    Looks for subfolders of the extension folder, and loads additional metadata
 286    where available.
 287
 288    :return tuple:  A tuple with two items; the extensions, as an ID -> metadata
 289    dictionary, and a list of (str) errors encountered while loading
 290    """
 291    extension_path = config.get("PATH_ROOT").joinpath("extensions")
 292    errors = []
 293    if not extension_path.exists() or not extension_path.is_dir():
 294        return [], None
 295
 296    # each folder in the extensions folder is an extension
 297    extensions = {
 298        extension.name: {
 299            "name": extension.name,
 300            "version": "",
 301            "url": "",
 302            "git_url": "",
 303            "is_git": False
 304        } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir()
 305    }
 306
 307    # collect metadata for extensions
 308    allowed_metadata_keys = ("name", "version", "url")
 309    for extension in extensions:
 310        extension_folder = extension_path.joinpath(extension)
 311        metadata_file = extension_folder.joinpath("metadata.json")
 312        if metadata_file.exists():
 313            with metadata_file.open() as infile:
 314                try:
 315                    metadata = json.load(infile)
 316                    extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys})
 317                except (TypeError, ValueError) as e:
 318                    errors.append(f"Error reading metadata file for extension '{extension}' ({e})")
 319                    continue
 320
 321        extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists()
 322        if extensions[extension]["is_git"]:
 323            # try to get remote URL
 324            try:
 325                extension_root = str(extension_folder.resolve())
 326                origin = subprocess.run(shlex.split(f"git -C {shlex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE,
 327                                        stdout=subprocess.PIPE)
 328                if origin.returncode != 0 or not origin.stdout:
 329                    raise ValueError()
 330                repository = origin.stdout.decode("utf-8").strip()
 331                if repository.endswith(".git") and "github.com" in repository:
 332                    # use repo URL
 333                    repository = repository[:-4]
 334                extensions[extension]["git_url"] = repository
 335            except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e:
 336                print(e)
 337                pass
 338
 339    return extensions, errors
 340
 341
 342def convert_to_int(value, default=0):
 343    """
 344    Convert a value to an integer, with a fallback
 345
 346    The fallback is used if an Error is thrown during converstion to int.
 347    This is a convenience function, but beats putting try-catches everywhere
 348    we're using user input as an integer.
 349
 350    :param value:  Value to convert
 351    :param int default:  Default value, if conversion not possible
 352    :return int:  Converted value
 353    """
 354    try:
 355        return int(value)
 356    except (ValueError, TypeError):
 357        return default
 358
 359
 360def timify_long(number):
 361    """
 362    Make a number look like an indication of time
 363
 364    :param number:  Number to convert. If the number is larger than the current
 365    UNIX timestamp, decrease by that amount
 366    :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes`
 367    """
 368    number = int(number)
 369
 370    components = []
 371    if number > time.time():
 372        number = time.time() - number
 373
 374    month_length = 30.42 * 86400
 375    months = math.floor(number / month_length)
 376    if months:
 377        components.append("%i month%s" % (months, "s" if months != 1 else ""))
 378        number -= (months * month_length)
 379
 380    week_length = 7 * 86400
 381    weeks = math.floor(number / week_length)
 382    if weeks:
 383        components.append("%i week%s" % (weeks, "s" if weeks != 1 else ""))
 384        number -= (weeks * week_length)
 385
 386    day_length = 86400
 387    days = math.floor(number / day_length)
 388    if days:
 389        components.append("%i day%s" % (days, "s" if days != 1 else ""))
 390        number -= (days * day_length)
 391
 392    hour_length = 3600
 393    hours = math.floor(number / hour_length)
 394    if hours:
 395        components.append("%i hour%s" % (hours, "s" if hours != 1 else ""))
 396        number -= (hours * hour_length)
 397
 398    minute_length = 60
 399    minutes = math.floor(number / minute_length)
 400    if minutes:
 401        components.append("%i minute%s" % (minutes, "s" if minutes != 1 else ""))
 402
 403    if not components:
 404        components.append("less than a minute")
 405
 406    last_str = components.pop()
 407    time_str = ""
 408    if components:
 409        time_str = ", ".join(components)
 410        time_str += " and "
 411
 412    return time_str + last_str
 413
 414def andify(items):
 415    """
 416    Format a list of items for use in text
 417
 418    Returns a comma-separated list, the last item preceded by "and"
 419
 420    :param items:  Iterable list
 421    :return str:  Formatted string
 422    """
 423    if len(items) == 0:
 424        return ""
 425    elif len(items) == 1:
 426        return str(items[1])
 427
 428    result = f" and {items.pop()}"
 429    return ", ".join([str(item) for item in items]) + result
 430
 431
 432def hash_file(image_file, hash_type="file-hash"):
 433    """
 434    Generate an image hash
 435
 436    :param Path image_file:  Image file to hash
 437    :param str hash_type:  Hash type, one of `file-hash`, `colorhash`,
 438    `phash`, `average_hash`, `dhash`
 439    :return str:  Hexadecimal hash value
 440    """
 441    if not image_file.exists():
 442        raise FileNotFoundError()
 443
 444    if hash_type == "file-hash":
 445        hasher = hashlib.sha1()
 446
 447        # Open the file in binary mode
 448        with image_file.open("rb") as infile:
 449            # Read and update hash in chunks to handle large files
 450            while chunk := infile.read(1024):
 451                hasher.update(chunk)
 452
 453        return hasher.hexdigest()
 454
 455    elif hash_type in ("colorhash", "phash", "average_hash", "dhash"):
 456        image = Image.open(image_file)
 457
 458        return str(getattr(imagehash, hash_type)(image))
 459
 460    else:
 461        raise NotImplementedError(f"Unknown hash type '{hash_type}'")
 462
 463def get_yt_compatible_ids(yt_ids):
 464    """
 465    :param yt_ids list, a list of strings
 466    :returns list, a ist of joined strings in pairs of 50
 467
 468    Takes a list of IDs and returns list of joined strings
 469    in pairs of fifty. This should be done for the YouTube API
 470    that requires a comma-separated string and can only return
 471    max fifty results.
 472    """
 473
 474    # If there's only one item, return a single list item
 475    if isinstance(yt_ids, str):
 476        return [yt_ids]
 477
 478    ids = []
 479    last_i = 0
 480    for i, yt_id in enumerate(yt_ids):
 481
 482        # Add a joined string per fifty videos
 483        if i % 50 == 0 and i != 0:
 484            ids_string = ",".join(yt_ids[last_i:i])
 485            ids.append(ids_string)
 486            last_i = i
 487
 488        # If the end of the list is reached, add the last data
 489        elif i == (len(yt_ids) - 1):
 490            ids_string = ",".join(yt_ids[last_i:i])
 491            ids.append(ids_string)
 492
 493    return ids
 494
 495
 496def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None,
 497                    fontsize_small=None, fontsize_large=None):
 498    """
 499    Get a standard SVG canvas to draw 4CAT graphs to
 500
 501    Adds a border, footer, header, and some basic text styling
 502
 503    :param path:  The path where the SVG graph will be saved
 504    :param width:  Width of the canvas
 505    :param height:  Height of the canvas
 506    :param header:  Header, if necessary to draw
 507    :param footer:  Footer text, if necessary to draw. Defaults to shameless
 508    4CAT advertisement.
 509    :param fontsize_normal:  Font size of normal text
 510    :param fontsize_small:  Font size of small text (e.g. footer)
 511    :param fontsize_large:  Font size of large text (e.g. header)
 512    :return SVG:  SVG canvas (via svgwrite) that can be drawn to
 513    """
 514    from svgwrite.container import SVG, Hyperlink
 515    from svgwrite.drawing import Drawing
 516    from svgwrite.shapes import Rect
 517    from svgwrite.text import Text
 518
 519    if fontsize_normal is None:
 520        fontsize_normal = width / 75
 521
 522    if fontsize_small is None:
 523        fontsize_small = width / 100
 524
 525    if fontsize_large is None:
 526        fontsize_large = width / 50
 527
 528    # instantiate with border and white background
 529    canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal)
 530    canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF"))
 531
 532    # header
 533    if header:
 534        header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2))
 535        header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
 536        header_shape.add(
 537            Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
 538                 style="font-size:%ipx" % fontsize_large))
 539        canvas.add(header_shape)
 540
 541    # footer (i.e. 4cat banner)
 542    if footer:
 543        footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2)
 544        footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize)
 545        footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
 546        link = Hyperlink(href="https://4cat.nl")
 547        link.add(
 548            Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
 549                 style="font-size:%ipx" % fontsize_small))
 550        footer_shape.add(link)
 551        canvas.add(footer_shape)
 552
 553    return canvas
 554
 555
 556def call_api(action, payload=None, wait_for_response=True):
 557    """
 558    Send message to server
 559
 560    Calls the internal API and returns interpreted response.
 561
 562    :param str action: API action
 563    :param payload: API payload
 564    :param bool wait_for_response:  Wait for response? If not close connection
 565    immediately after sending data.
 566
 567    :return: API response, or timeout message in case of timeout
 568    """
 569    connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 570    connection.settimeout(15)
 571    connection.connect((config.get('API_HOST'), config.get('API_PORT')))
 572
 573    msg = json.dumps({"request": action, "payload": payload})
 574    connection.sendall(msg.encode("ascii", "ignore"))
 575
 576    if wait_for_response:
 577        try:
 578            response = ""
 579            while True:
 580                bytes = connection.recv(2048)
 581                if not bytes:
 582                    break
 583
 584                response += bytes.decode("ascii", "ignore")
 585        except (socket.timeout, TimeoutError):
 586            response = "(Connection timed out)"
 587
 588    try:
 589        connection.shutdown(socket.SHUT_RDWR)
 590    except OSError:
 591        # already shut down automatically
 592        pass
 593    connection.close()
 594
 595    try:
 596        return json.loads(response) if wait_for_response else None
 597    except json.JSONDecodeError:
 598        return response
 599
 600
 601def get_interval_descriptor(item, interval):
 602    """
 603    Get interval descriptor based on timestamp
 604
 605    :param dict item:  Item to generate descriptor for, should have a
 606    "timestamp" key
 607    :param str interval:  Interval, one of "all", "overall", "year",
 608    "month", "week", "day"
 609    :return str:  Interval descriptor, e.g. "overall", "2020", "2020-08",
 610    "2020-43", "2020-08-01"
 611    """
 612    if interval in ("all", "overall"):
 613        return interval
 614
 615    if "timestamp" not in item:
 616        raise ValueError("No date available for item in dataset")
 617
 618    # Catch cases where a custom timestamp has an epoch integer as value.
 619    try:
 620        timestamp = int(item["timestamp"])
 621        try:
 622            timestamp = datetime.datetime.fromtimestamp(timestamp)
 623        except (ValueError, TypeError) as e:
 624            raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"]))
 625    except:
 626        try:
 627            timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S")
 628        except (ValueError, TypeError) as e:
 629            raise ValueError("Invalid date '%s'" % str(item["timestamp"]))
 630
 631    if interval == "year":
 632        return str(timestamp.year)
 633    elif interval == "month":
 634        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2)
 635    elif interval == "week":
 636        return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2)
 637    elif interval == "hour":
 638        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
 639            2) + " " + str(timestamp.hour).zfill(2)
 640    elif interval == "minute":
 641        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
 642            2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2)
 643    else:
 644        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2)
 645
 646
 647def pad_interval(intervals, first_interval=None, last_interval=None):
 648    """
 649    Pad an interval so all intermediate intervals are filled
 650
 651    Warning, ugly code (PRs very welcome)
 652
 653    :param dict intervals:  A dictionary, with dates (YYYY{-MM}{-DD}) as keys
 654    and a numerical value.
 655    :param first_interval:
 656    :param last_interval:
 657    :return:
 658    """
 659    missing = 0
 660    test_key = list(intervals.keys())[0]
 661
 662    # first determine the boundaries of the interval
 663    # these may be passed as parameters, or they can be inferred from the
 664    # interval given
 665    if first_interval:
 666        first_interval = str(first_interval)
 667        first_year = int(first_interval[0:4])
 668        if len(first_interval) > 4:
 669            first_month = int(first_interval[5:7])
 670        if len(first_interval) > 7:
 671            first_day = int(first_interval[8:10])
 672        if len(first_interval) > 10:
 673            first_hour = int(first_interval[11:13])
 674        if len(first_interval) > 13:
 675            first_minute = int(first_interval[14:16])
 676
 677    else:
 678        first_year = min([int(i[0:4]) for i in intervals])
 679        if len(test_key) > 4:
 680            first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year])
 681        if len(test_key) > 7:
 682            first_day = min(
 683                [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month])
 684        if len(test_key) > 10:
 685            first_hour = min(
 686                [int(i[11:13]) for i in intervals if
 687                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day])
 688        if len(test_key) > 13:
 689            first_minute = min(
 690                [int(i[14:16]) for i in intervals if
 691                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int(
 692                     i[11:13]) == first_hour])
 693
 694    if last_interval:
 695        last_interval = str(last_interval)
 696        last_year = int(last_interval[0:4])
 697        if len(last_interval) > 4:
 698            last_month = int(last_interval[5:7])
 699        if len(last_interval) > 7:
 700            last_day = int(last_interval[8:10])
 701        if len(last_interval) > 10:
 702            last_hour = int(last_interval[11:13])
 703        if len(last_interval) > 13:
 704            last_minute = int(last_interval[14:16])
 705    else:
 706        last_year = max([int(i[0:4]) for i in intervals])
 707        if len(test_key) > 4:
 708            last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year])
 709        if len(test_key) > 7:
 710            last_day = max(
 711                [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month])
 712        if len(test_key) > 10:
 713            last_hour = max(
 714                [int(i[11:13]) for i in intervals if
 715                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day])
 716        if len(test_key) > 13:
 717            last_minute = max(
 718                [int(i[14:16]) for i in intervals if
 719                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int(
 720                     i[11:13]) == last_hour])
 721
 722    has_month = re.match(r"^[0-9]{4}-[0-9]", test_key)
 723    has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key)
 724    has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key)
 725    has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key)
 726
 727    all_intervals = []
 728    for year in range(first_year, last_year + 1):
 729        year_interval = str(year)
 730
 731        if not has_month:
 732            all_intervals.append(year_interval)
 733            continue
 734
 735        start_month = first_month if year == first_year else 1
 736        end_month = last_month if year == last_year else 12
 737        for month in range(start_month, end_month + 1):
 738            month_interval = year_interval + "-" + str(month).zfill(2)
 739
 740            if not has_day:
 741                all_intervals.append(month_interval)
 742                continue
 743
 744            start_day = first_day if all((year == first_year, month == first_month)) else 1
 745            end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1]
 746            for day in range(start_day, end_day + 1):
 747                day_interval = month_interval + "-" + str(day).zfill(2)
 748
 749                if not has_hour:
 750                    all_intervals.append(day_interval)
 751                    continue
 752
 753                start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0
 754                end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23
 755                for hour in range(start_hour, end_hour + 1):
 756                    hour_interval = day_interval + " " + str(hour).zfill(2)
 757
 758                    if not has_minute:
 759                        all_intervals.append(hour_interval)
 760                        continue
 761
 762                    start_minute = first_minute if all(
 763                        (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0
 764                    end_minute = last_minute if all(
 765                        (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59
 766
 767                    for minute in range(start_minute, end_minute + 1):
 768                        minute_interval = hour_interval + ":" + str(minute).zfill(2)
 769                        all_intervals.append(minute_interval)
 770
 771    for interval in all_intervals:
 772        if interval not in intervals:
 773            intervals[interval] = 0
 774            missing += 1
 775
 776    # sort while we're at it
 777    intervals = {key: intervals[key] for key in sorted(intervals)}
 778
 779    return missing, intervals
 780
 781
 782def remove_nuls(value):
 783    """
 784    Remove \0 from a value
 785
 786    The CSV library cries about a null byte when it encounters one :( :( :(
 787    poor little csv cannot handle a tiny little null byte
 788
 789    So remove them from the data because they should not occur in utf-8 data
 790    anyway.
 791
 792    :param value:  Value to remove nulls from. For dictionaries, sets, tuples
 793    and lists all items are parsed recursively.
 794    :return value:  Cleaned value
 795    """
 796    if type(value) is dict:
 797        for field in value:
 798            value[field] = remove_nuls(value[field])
 799    elif type(value) is list:
 800        value = [remove_nuls(item) for item in value]
 801    elif type(value) is tuple:
 802        value = tuple([remove_nuls(item) for item in value])
 803    elif type(value) is set:
 804        value = set([remove_nuls(item) for item in value])
 805    elif type(value) is str:
 806        value = value.replace("\0", "")
 807
 808    return value
 809
 810
 811class NullAwareTextIOWrapper(io.TextIOWrapper):
 812    """
 813    TextIOWrapper that skips null bytes
 814
 815    This can be used as a file reader that silently discards any null bytes it
 816    encounters.
 817    """
 818
 819    def __next__(self):
 820        value = super().__next__()
 821        return remove_nuls(value)
 822
 823
 824class HashCache:
 825    """
 826    Simple cache handler to cache hashed values
 827
 828    Avoids having to calculate a hash for values that have been hashed before
 829    """
 830
 831    def __init__(self, hasher):
 832        self.hash_cache = {}
 833        self.hasher = hasher
 834
 835    def update_cache(self, value):
 836        """
 837        Checks the hash_cache to see if the value has been cached previously,
 838        updates the hash_cache if needed, and returns the hashed value.
 839        """
 840        # value = str(value)
 841        if value not in self.hash_cache:
 842            author_hasher = self.hasher.copy()
 843            author_hasher.update(str(value).encode("utf-8"))
 844            self.hash_cache[value] = author_hasher.hexdigest()
 845            del author_hasher
 846        return self.hash_cache[value]
 847
 848
 849def dict_search_and_update(item, keyword_matches, function):
 850    """
 851    Filter fields in an object recursively
 852
 853    Apply a function to every item and sub item of a dictionary if the key
 854    contains one of the provided match terms.
 855
 856    Function loops through a dictionary or list and compares dictionary keys to
 857    the strings defined by keyword_matches. It then applies the change_function
 858    to corresponding values.
 859
 860    Note: if a matching term is found, all nested values will have the function
 861    applied to them. e.g., all these values would be changed even those with
 862    not_key_match:
 863
 864    {'key_match' : 'changed',
 865    'also_key_match' : {'not_key_match' : 'but_value_still_changed'},
 866    'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]}
 867
 868    This is a comprehensive (and expensive) approach to updating a dictionary.
 869    IF a dictionary structure is known, a better solution would be to update
 870    using specific keys.
 871
 872    :param Dict/List item:  dictionary/list/json to loop through
 873    :param String keyword_matches:  list of strings that will be matched to
 874    dictionary keys. Can contain wildcards which are matched using fnmatch.
 875    :param Function function:  function appled to all values of any items
 876    nested under a matching key
 877
 878    :return Dict/List: Copy of original item, but filtered
 879    """
 880
 881    def loop_helper_function(d_or_l, match_terms, change_function):
 882        """
 883        Recursive helper function that updates item in place
 884        """
 885        if isinstance(d_or_l, dict):
 886            # Iterate through dictionary
 887            for key, value in iter(d_or_l.items()):
 888                if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]):
 889                    # Match found; apply function to all items and sub-items
 890                    if isinstance(value, (list, dict)):
 891                        # Pass item through again with match_terms = True
 892                        loop_helper_function(value, 'True', change_function)
 893                    elif value is None:
 894                        pass
 895                    else:
 896                        # Update the value
 897                        d_or_l[key] = change_function(value)
 898                elif isinstance(value, (list, dict)):
 899                    # Continue search
 900                    loop_helper_function(value, match_terms, change_function)
 901        elif isinstance(d_or_l, list):
 902            # Iterate through list
 903            for n, value in enumerate(d_or_l):
 904                if isinstance(value, (list, dict)):
 905                    # Continue search
 906                    loop_helper_function(value, match_terms, change_function)
 907                elif match_terms == 'True':
 908                    # List item nested in matching
 909                    d_or_l[n] = change_function(value)
 910        else:
 911            raise Exception('Must pass list or dictionary')
 912
 913    # Lowercase keyword_matches
 914    keyword_matches = [keyword.lower() for keyword in keyword_matches]
 915
 916    # Create deepcopy and return new item
 917    temp_item = copy.deepcopy(item)
 918    loop_helper_function(temp_item, keyword_matches, function)
 919    return temp_item
 920
 921
 922def get_last_line(filepath):
 923    """
 924    Seeks from end of file for '\n' and returns that line
 925
 926    :param str filepath:  path to file
 927    :return str: last line of file
 928    """
 929    with open(filepath, "rb") as file:
 930        try:
 931            # start at the end of file
 932            file.seek(-2, os.SEEK_END)
 933            # check if NOT endline i.e. '\n'
 934            while file.read(1) != b'\n':
 935                # if not '\n', back up two characters and check again
 936                file.seek(-2, os.SEEK_CUR)
 937        except OSError:
 938            file.seek(0)
 939        last_line = file.readline().decode()
 940    return last_line
 941
 942
 943def add_notification(db, user, notification, expires=None, allow_dismiss=True):
 944    db.insert("users_notifications", {
 945        "username": user,
 946        "notification": notification,
 947        "timestamp_expires": expires,
 948        "allow_dismiss": allow_dismiss
 949    }, safe=True)
 950
 951
 952def send_email(recipient, message):
 953    """
 954    Send an e-mail using the configured SMTP settings
 955
 956    Just a thin wrapper around smtplib, so we don't have to repeat ourselves.
 957    Exceptions are to be handled outside the function.
 958
 959    :param list recipient:  Recipient e-mail addresses
 960    :param MIMEMultipart message:  Message to send
 961    """
 962    # Create a secure SSL context
 963    context = ssl.create_default_context()
 964
 965    # Decide which connection type
 966    with smtplib.SMTP_SSL(config.get('mail.server'), port=config.get('mail.port', 0), context=context) if config.get(
 967            'mail.ssl') == 'ssl' else smtplib.SMTP(config.get('mail.server'),
 968                                                   port=config.get('mail.port', 0)) as server:
 969        if config.get('mail.ssl') == 'tls':
 970            # smtplib.SMTP adds TLS context here
 971            server.starttls(context=context)
 972
 973        # Log in
 974        if config.get('mail.username') and config.get('mail.password'):
 975            server.ehlo()
 976            server.login(config.get('mail.username'), config.get('mail.password'))
 977
 978        # Send message
 979        if type(message) == str:
 980            server.sendmail(config.get('mail.noreply'), recipient, message)
 981        else:
 982            server.sendmail(config.get('mail.noreply'), recipient, message.as_string())
 983
 984
 985def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'):
 986    """
 987    Return a flattened dictionary where nested dictionary objects are given new
 988    keys using the partent key combined using the seperator with the child key.
 989
 990    Lists will be converted to json strings via json.dumps()
 991
 992    :param MutableMapping d:  Dictionary like object
 993    :param str partent_key: The original parent key prepending future nested keys
 994    :param str sep: A seperator string used to combine parent and child keys
 995    :return dict:  A new dictionary with the no nested values
 996    """
 997
 998    def _flatten_dict_gen(d, parent_key, sep):
 999        for k, v in d.items():
1000            new_key = parent_key + sep + k if parent_key else k
1001            if isinstance(v, MutableMapping):
1002                yield from flatten_dict(v, new_key, sep=sep).items()
1003            elif isinstance(v, (list, set)):
1004                yield new_key, json.dumps(
1005                    [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v])
1006            else:
1007                yield new_key, v
1008
1009    return dict(_flatten_dict_gen(d, parent_key, sep))
1010
1011
1012def sets_to_lists(d: MutableMapping):
1013    """
1014    Return a dictionary where all nested sets have been converted to lists.
1015
1016    :param MutableMapping d:  Dictionary like object
1017    :return dict:  A new dictionary with the no nested sets
1018    """
1019
1020    def _check_list(l):
1021        return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, (
1022        set, list)) else item for item in l]
1023
1024    def _sets_to_lists_gen(d):
1025        for k, v in d.items():
1026            if isinstance(v, MutableMapping):
1027                yield k, sets_to_lists(v)
1028            elif isinstance(v, (list, set)):
1029                yield k, _check_list(v)
1030            else:
1031                yield k, v
1032
1033    return dict(_sets_to_lists_gen(d))
1034
1035
1036def url_to_hash(url, remove_scheme=True, remove_www=True):
1037    """
1038    Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the
1039    rest of the URL.
1040    """
1041    parsed_url = urlparse(url.lower())
1042    if parsed_url:
1043        if remove_scheme:
1044            parsed_url = parsed_url._replace(scheme="")
1045        if remove_www:
1046            netloc = re.sub(r"^www\.", "", parsed_url.netloc)
1047            parsed_url = parsed_url._replace(netloc=netloc)
1048
1049        url = re.sub(r"[^0-9a-z]+", "_", urlunparse(parsed_url).strip("/"))
1050    else:
1051        # Unable to parse URL; use regex
1052        if remove_scheme:
1053            url = re.sub(r"^https?://", "", url)
1054        if remove_www:
1055            if not remove_scheme:
1056                scheme = re.match(r"^https?://", url).group()
1057                temp_url = re.sub(r"^https?://", "", url)
1058                url = scheme + re.sub(r"^www\.", "", temp_url)
1059            else:
1060                url = re.sub(r"^www\.", "", url)
1061
1062        url = re.sub(r"[^0-9a-z]+", "_", url.lower().strip("/"))
1063
1064    return hashlib.blake2b(url.encode("utf-8"), digest_size=24).hexdigest()
1065
1066
1067def split_urls(url_string, allowed_schemes=None):
1068    """
1069    Split URL text by \n and commas.
1070
1071    4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs
1072    and also check for commas within URLs using schemes.
1073
1074    Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com);
1075    this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail).
1076    """
1077    if allowed_schemes is None:
1078        allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://')
1079    potential_urls = []
1080    # Split the text by \n
1081    for line in url_string.split('\n'):
1082        # Handle commas that may exist within URLs
1083        parts = line.split(',')
1084        recombined_url = ""
1085        for part in parts:
1086            if part.startswith(allowed_schemes):  # Other schemes exist
1087                # New URL start detected
1088                if recombined_url:
1089                    # Already have a URL, add to list
1090                    potential_urls.append(recombined_url)
1091                # Start new URL
1092                recombined_url = part
1093            elif part:
1094                if recombined_url:
1095                    # Add to existing URL
1096                    recombined_url += "," + part
1097                else:
1098                    # No existing URL, start new
1099                    recombined_url = part
1100            else:
1101                # Ignore empty strings
1102                pass
1103        if recombined_url:
1104            # Add any remaining URL
1105            potential_urls.append(recombined_url)
1106    return potential_urls
1107
1108
1109def folder_size(path='.'):
1110    """
1111    Get the size of a folder using os.scandir for efficiency
1112    """
1113    total = 0
1114    for entry in os.scandir(path):
1115        if entry.is_file():
1116            total += entry.stat().st_size
1117        elif entry.is_dir():
1118            total += folder_size(entry.path)
1119    return total
def init_datasource(database, logger, queue, name):
36def init_datasource(database, logger, queue, name):
37    """
38    Initialize data source
39
40    Queues jobs to scrape the boards that were configured to be scraped in the
41    4CAT configuration file. If none were configured, nothing happens.
42
43    :param Database database:  Database connection instance
44    :param Logger logger:  Log handler
45    :param JobQueue queue:  Job Queue instance
46    :param string name:  ID of datasource that is being initialised
47    """
48    pass

Initialize data source

Queues jobs to scrape the boards that were configured to be scraped in the 4CAT configuration file. If none were configured, nothing happens.

Parameters
  • Database database: Database connection instance
  • Logger logger: Log handler
  • JobQueue queue: Job Queue instance
  • string name: ID of datasource that is being initialised
def strip_tags(html, convert_newlines=True):
50def strip_tags(html, convert_newlines=True):
51    """
52    Strip HTML from a string
53
54    :param html: HTML to strip
55    :param convert_newlines: Convert <br> and </p> tags to \n before stripping
56    :return: Stripped HTML
57    """
58    if not html:
59        return ""
60
61    deduplicate_newlines = re.compile(r"\n+")
62
63    if convert_newlines:
64        html = html.replace("<br>", "\n").replace("</p>", "</p>\n")
65        html = deduplicate_newlines.sub("\n", html)
66
67    class HTMLStripper(HTMLParser):
68        def __init__(self):
69            super().__init__()
70            self.reset()
71            self.strict = False
72            self.convert_charrefs = True
73            self.fed = []
74
75        def handle_data(self, data):
76            self.fed.append(data)
77
78        def get_data(self):
79            return "".join(self.fed)
80
81    stripper = HTMLStripper()
82    stripper.feed(html)
83    return stripper.get_data()

Strip HTML from a string

:param html: HTML to strip :param convert_newlines: Convert
and

tags to before stripping :return: Stripped HTML

def sniff_encoding(file):
 86def sniff_encoding(file):
 87    """
 88    Determine encoding from raw file bytes
 89
 90    Currently only distinguishes UTF-8 and UTF-8 with BOM
 91
 92    :param file:
 93    :return:
 94    """
 95    if type(file) == bytearray:
 96        maybe_bom = file[:3]
 97    elif hasattr(file, "getbuffer"):
 98        buffer = file.getbuffer()
 99        maybe_bom = buffer[:3].tobytes()
100    elif hasattr(file, "peek"):
101        buffer = file.peek(32)
102        maybe_bom = buffer[:3]
103    else:
104        maybe_bom = False
105
106    return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8"

Determine encoding from raw file bytes

Currently only distinguishes UTF-8 and UTF-8 with BOM

Parameters
  • file:
Returns
def sniff_csv_dialect(csv_input):
108def sniff_csv_dialect(csv_input):
109    """
110    Determine CSV dialect for an input stream
111
112    :param csv_input:  Input stream
113    :return tuple:  Tuple: Dialect object and a boolean representing whether
114    the CSV file seems to have a header
115    """
116    encoding = sniff_encoding(csv_input)
117    if type(csv_input) is io.TextIOWrapper:
118        wrapped_input = csv_input
119    else:
120        wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding)
121    wrapped_input.seek(0)
122    sample = wrapped_input.read(1024 * 1024)
123    wrapped_input.seek(0)
124    has_header = csv.Sniffer().has_header(sample)
125    dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
126
127    return dialect, has_header

Determine CSV dialect for an input stream

Parameters
  • csv_input: Input stream
Returns

Dialect object and a boolean representing whether the CSV file seems to have a header

def get_git_branch():
130def get_git_branch():
131    """
132    Get current git branch
133
134    If the 4CAT root folder is a git repository, this function will return the
135    name of the currently checked-out branch. If the folder is not a git
136    repository or git is not installed an empty string is returned.
137    """
138    try:
139        root_dir = str(config.get('PATH_ROOT').resolve())
140        branch = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE)
141        if branch.returncode != 0:
142            raise ValueError()
143        branch_name = branch.stdout.decode("utf-8").strip()
144        if not branch_name:
145            # Check for detached HEAD state
146            # Most likely occuring because of checking out release tags (which are not branches) or commits
147            head_status = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} status"), stdout=subprocess.PIPE)
148            if head_status.returncode == 0:
149                for line in head_status.stdout.decode("utf-8").split("\n"):
150                    if "HEAD detached at" in line:
151                        branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1]
152                        return branch_name
153    except (subprocess.SubprocessError, ValueError, FileNotFoundError):
154        return ""

Get current git branch

If the 4CAT root folder is a git repository, this function will return the name of the currently checked-out branch. If the folder is not a git repository or git is not installed an empty string is returned.

def get_software_commit(worker=None):
157def get_software_commit(worker=None):
158    """
159    Get current 4CAT git commit hash
160
161    Use `get_software_version()` instead if you need the release version
162    number rather than the precise commit hash.
163
164    If no version file is available, run `git show` to test if there is a git
165    repository in the 4CAT root folder, and if so, what commit is currently
166    checked out in it.
167
168    For extensions, get the repository information for that extension, or if
169    the extension is not a git repository, return empty data.
170
171    :param BasicWorker processor:  Worker to get commit for. If not given, get
172    version information for the main 4CAT installation.
173
174    :return tuple:  4CAT git commit hash, repository name
175    """
176    # try git command line within the 4CAT root folder
177    # if it is a checked-out git repository, it will tell us the hash of
178    # the currently checked-out commit
179
180    # path has no Path.relative()...
181    relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent
182    try:
183        # if extension, go to the extension file's path
184        # we will run git here - if it is not its own repository, we have no
185        # useful version info (since the extension is by definition not in the
186        # main 4CAT repository) and will return an empty value
187        if worker and worker.is_extension:
188            working_dir = str(config.get("PATH_ROOT").joinpath(relative_filepath).resolve())
189            # check if we are in the extensions' own repo or 4CAT's
190            git_cmd = f"git -C {shlex.quote(working_dir)} rev-parse --show-toplevel"
191            repo_level = subprocess.run(shlex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
192            if Path(repo_level.stdout.decode("utf-8")) == config.get("PATH_ROOT"):
193                # not its own repository
194                return ("", "")
195
196        else:
197            working_dir = str(config.get("PATH_ROOT").resolve())
198
199        show = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
200        if show.returncode != 0:
201            raise ValueError()
202        commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1]
203
204        # now get the repository the commit belongs to, if we can
205        origin = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
206        if origin.returncode != 0 or not origin.stdout:
207            raise ValueError()
208        repository = origin.stdout.decode("utf-8").strip()
209        if repository.endswith(".git"):
210            repository = repository[:-4]
211
212    except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e:
213        return ("", "")
214
215    return (commit, repository)

Get current 4CAT git commit hash

Use get_software_version() instead if you need the release version number rather than the precise commit hash.

If no version file is available, run git show to test if there is a git repository in the 4CAT root folder, and if so, what commit is currently checked out in it.

For extensions, get the repository information for that extension, or if the extension is not a git repository, return empty data.

Parameters
  • BasicWorker processor: Worker to get commit for. If not given, get version information for the main 4CAT installation.
Returns

4CAT git commit hash, repository name

def get_software_version():
217def get_software_version():
218    """
219    Get current 4CAT version
220
221    This is the actual software version, i.e. not the commit hash (see
222    `get_software_hash()` for that). The current version is stored in a file
223    with a canonical location: if the file doesn't exist, an empty string is
224    returned.
225
226    :return str:  Software version, for example `1.37`.
227    """
228    current_version_file = config.get("PATH_ROOT").joinpath("config/.current-version")
229    if not current_version_file.exists():
230        return ""
231
232    with current_version_file.open() as infile:
233        return infile.readline().strip()

Get current 4CAT version

This is the actual software version, i.e. not the commit hash (see get_software_hash() for that). The current version is stored in a file with a canonical location: if the file doesn't exist, an empty string is returned.

Returns

Software version, for example 1.37.

def get_github_version(timeout=5):
235def get_github_version(timeout=5):
236    """
237    Get latest release tag version from GitHub
238
239    Will raise a ValueError if it cannot retrieve information from GitHub.
240
241    :param int timeout:  Timeout in seconds for HTTP request
242
243    :return tuple:  Version, e.g. `1.26`, and release URL.
244    """
245    repo_url = config.get("4cat.github_url")
246    if not repo_url.endswith("/"):
247        repo_url += "/"
248
249    repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url))
250
251    api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id
252    response = requests.get(api_url, timeout=timeout)
253    response = response.json()
254    if response.get("message") == "Not Found":
255        raise ValueError("Invalid GitHub URL or repository name")
256
257    latest_tag = response.get("tag_name", "unknown")
258    if latest_tag.startswith("v"):
259        latest_tag = re.sub(r"^v", "", latest_tag)
260
261    return (latest_tag, response.get("html_url"))

Get latest release tag version from GitHub

Will raise a ValueError if it cannot retrieve information from GitHub.

Parameters
  • int timeout: Timeout in seconds for HTTP request
Returns

Version, e.g. 1.26, and release URL.

def get_ffmpeg_version(ffmpeg_path):
263def get_ffmpeg_version(ffmpeg_path):
264    """
265    Determine ffmpeg version
266
267    This can be necessary when using commands that change name between versions.
268
269    :param ffmpeg_path: ffmpeg executable path
270    :return packaging.version:  Comparable ersion
271    """
272    command = [ffmpeg_path, "-version"]
273    ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
274                                    stderr=subprocess.PIPE)
275
276    ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1]
277    ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0]
278
279    return version.parse(ffmpeg_version)

Determine ffmpeg version

This can be necessary when using commands that change name between versions.

Parameters
  • ffmpeg_path: ffmpeg executable path
Returns

Comparable ersion

def find_extensions():
282def find_extensions():
283    """
284    Find 4CAT extensions and load their metadata
285
286    Looks for subfolders of the extension folder, and loads additional metadata
287    where available.
288
289    :return tuple:  A tuple with two items; the extensions, as an ID -> metadata
290    dictionary, and a list of (str) errors encountered while loading
291    """
292    extension_path = config.get("PATH_ROOT").joinpath("extensions")
293    errors = []
294    if not extension_path.exists() or not extension_path.is_dir():
295        return [], None
296
297    # each folder in the extensions folder is an extension
298    extensions = {
299        extension.name: {
300            "name": extension.name,
301            "version": "",
302            "url": "",
303            "git_url": "",
304            "is_git": False
305        } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir()
306    }
307
308    # collect metadata for extensions
309    allowed_metadata_keys = ("name", "version", "url")
310    for extension in extensions:
311        extension_folder = extension_path.joinpath(extension)
312        metadata_file = extension_folder.joinpath("metadata.json")
313        if metadata_file.exists():
314            with metadata_file.open() as infile:
315                try:
316                    metadata = json.load(infile)
317                    extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys})
318                except (TypeError, ValueError) as e:
319                    errors.append(f"Error reading metadata file for extension '{extension}' ({e})")
320                    continue
321
322        extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists()
323        if extensions[extension]["is_git"]:
324            # try to get remote URL
325            try:
326                extension_root = str(extension_folder.resolve())
327                origin = subprocess.run(shlex.split(f"git -C {shlex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE,
328                                        stdout=subprocess.PIPE)
329                if origin.returncode != 0 or not origin.stdout:
330                    raise ValueError()
331                repository = origin.stdout.decode("utf-8").strip()
332                if repository.endswith(".git") and "github.com" in repository:
333                    # use repo URL
334                    repository = repository[:-4]
335                extensions[extension]["git_url"] = repository
336            except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e:
337                print(e)
338                pass
339
340    return extensions, errors

Find 4CAT extensions and load their metadata

Looks for subfolders of the extension folder, and loads additional metadata where available.

Returns

A tuple with two items; the extensions, as an ID -> metadata dictionary, and a list of (str) errors encountered while loading

def convert_to_int(value, default=0):
343def convert_to_int(value, default=0):
344    """
345    Convert a value to an integer, with a fallback
346
347    The fallback is used if an Error is thrown during converstion to int.
348    This is a convenience function, but beats putting try-catches everywhere
349    we're using user input as an integer.
350
351    :param value:  Value to convert
352    :param int default:  Default value, if conversion not possible
353    :return int:  Converted value
354    """
355    try:
356        return int(value)
357    except (ValueError, TypeError):
358        return default

Convert a value to an integer, with a fallback

The fallback is used if an Error is thrown during converstion to int. This is a convenience function, but beats putting try-catches everywhere we're using user input as an integer.

Parameters
  • value: Value to convert
  • int default: Default value, if conversion not possible
Returns

Converted value

def timify_long(number):
361def timify_long(number):
362    """
363    Make a number look like an indication of time
364
365    :param number:  Number to convert. If the number is larger than the current
366    UNIX timestamp, decrease by that amount
367    :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes`
368    """
369    number = int(number)
370
371    components = []
372    if number > time.time():
373        number = time.time() - number
374
375    month_length = 30.42 * 86400
376    months = math.floor(number / month_length)
377    if months:
378        components.append("%i month%s" % (months, "s" if months != 1 else ""))
379        number -= (months * month_length)
380
381    week_length = 7 * 86400
382    weeks = math.floor(number / week_length)
383    if weeks:
384        components.append("%i week%s" % (weeks, "s" if weeks != 1 else ""))
385        number -= (weeks * week_length)
386
387    day_length = 86400
388    days = math.floor(number / day_length)
389    if days:
390        components.append("%i day%s" % (days, "s" if days != 1 else ""))
391        number -= (days * day_length)
392
393    hour_length = 3600
394    hours = math.floor(number / hour_length)
395    if hours:
396        components.append("%i hour%s" % (hours, "s" if hours != 1 else ""))
397        number -= (hours * hour_length)
398
399    minute_length = 60
400    minutes = math.floor(number / minute_length)
401    if minutes:
402        components.append("%i minute%s" % (minutes, "s" if minutes != 1 else ""))
403
404    if not components:
405        components.append("less than a minute")
406
407    last_str = components.pop()
408    time_str = ""
409    if components:
410        time_str = ", ".join(components)
411        time_str += " and "
412
413    return time_str + last_str

Make a number look like an indication of time

Parameters
  • number: Number to convert. If the number is larger than the current UNIX timestamp, decrease by that amount
Returns

A nice, string, for example 1 month, 3 weeks, 4 hours and 2 minutes

def andify(items):
415def andify(items):
416    """
417    Format a list of items for use in text
418
419    Returns a comma-separated list, the last item preceded by "and"
420
421    :param items:  Iterable list
422    :return str:  Formatted string
423    """
424    if len(items) == 0:
425        return ""
426    elif len(items) == 1:
427        return str(items[1])
428
429    result = f" and {items.pop()}"
430    return ", ".join([str(item) for item in items]) + result

Format a list of items for use in text

Returns a comma-separated list, the last item preceded by "and"

Parameters
  • items: Iterable list
Returns

Formatted string

def hash_file(image_file, hash_type='file-hash'):
433def hash_file(image_file, hash_type="file-hash"):
434    """
435    Generate an image hash
436
437    :param Path image_file:  Image file to hash
438    :param str hash_type:  Hash type, one of `file-hash`, `colorhash`,
439    `phash`, `average_hash`, `dhash`
440    :return str:  Hexadecimal hash value
441    """
442    if not image_file.exists():
443        raise FileNotFoundError()
444
445    if hash_type == "file-hash":
446        hasher = hashlib.sha1()
447
448        # Open the file in binary mode
449        with image_file.open("rb") as infile:
450            # Read and update hash in chunks to handle large files
451            while chunk := infile.read(1024):
452                hasher.update(chunk)
453
454        return hasher.hexdigest()
455
456    elif hash_type in ("colorhash", "phash", "average_hash", "dhash"):
457        image = Image.open(image_file)
458
459        return str(getattr(imagehash, hash_type)(image))
460
461    else:
462        raise NotImplementedError(f"Unknown hash type '{hash_type}'")

Generate an image hash

Parameters
  • Path image_file: Image file to hash
  • str hash_type: Hash type, one of file-hash, colorhash, phash, average_hash, dhash
Returns

Hexadecimal hash value

def get_yt_compatible_ids(yt_ids):
464def get_yt_compatible_ids(yt_ids):
465    """
466    :param yt_ids list, a list of strings
467    :returns list, a ist of joined strings in pairs of 50
468
469    Takes a list of IDs and returns list of joined strings
470    in pairs of fifty. This should be done for the YouTube API
471    that requires a comma-separated string and can only return
472    max fifty results.
473    """
474
475    # If there's only one item, return a single list item
476    if isinstance(yt_ids, str):
477        return [yt_ids]
478
479    ids = []
480    last_i = 0
481    for i, yt_id in enumerate(yt_ids):
482
483        # Add a joined string per fifty videos
484        if i % 50 == 0 and i != 0:
485            ids_string = ",".join(yt_ids[last_i:i])
486            ids.append(ids_string)
487            last_i = i
488
489        # If the end of the list is reached, add the last data
490        elif i == (len(yt_ids) - 1):
491            ids_string = ",".join(yt_ids[last_i:i])
492            ids.append(ids_string)
493
494    return ids

:param yt_ids list, a list of strings :returns list, a ist of joined strings in pairs of 50

Takes a list of IDs and returns list of joined strings in pairs of fifty. This should be done for the YouTube API that requires a comma-separated string and can only return max fifty results.

def get_4cat_canvas( path, width, height, header=None, footer='made with 4CAT', fontsize_normal=None, fontsize_small=None, fontsize_large=None):
497def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None,
498                    fontsize_small=None, fontsize_large=None):
499    """
500    Get a standard SVG canvas to draw 4CAT graphs to
501
502    Adds a border, footer, header, and some basic text styling
503
504    :param path:  The path where the SVG graph will be saved
505    :param width:  Width of the canvas
506    :param height:  Height of the canvas
507    :param header:  Header, if necessary to draw
508    :param footer:  Footer text, if necessary to draw. Defaults to shameless
509    4CAT advertisement.
510    :param fontsize_normal:  Font size of normal text
511    :param fontsize_small:  Font size of small text (e.g. footer)
512    :param fontsize_large:  Font size of large text (e.g. header)
513    :return SVG:  SVG canvas (via svgwrite) that can be drawn to
514    """
515    from svgwrite.container import SVG, Hyperlink
516    from svgwrite.drawing import Drawing
517    from svgwrite.shapes import Rect
518    from svgwrite.text import Text
519
520    if fontsize_normal is None:
521        fontsize_normal = width / 75
522
523    if fontsize_small is None:
524        fontsize_small = width / 100
525
526    if fontsize_large is None:
527        fontsize_large = width / 50
528
529    # instantiate with border and white background
530    canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal)
531    canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF"))
532
533    # header
534    if header:
535        header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2))
536        header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
537        header_shape.add(
538            Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
539                 style="font-size:%ipx" % fontsize_large))
540        canvas.add(header_shape)
541
542    # footer (i.e. 4cat banner)
543    if footer:
544        footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2)
545        footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize)
546        footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
547        link = Hyperlink(href="https://4cat.nl")
548        link.add(
549            Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
550                 style="font-size:%ipx" % fontsize_small))
551        footer_shape.add(link)
552        canvas.add(footer_shape)
553
554    return canvas

Get a standard SVG canvas to draw 4CAT graphs to

Adds a border, footer, header, and some basic text styling

Parameters
  • path: The path where the SVG graph will be saved
  • width: Width of the canvas
  • height: Height of the canvas
  • header: Header, if necessary to draw
  • footer: Footer text, if necessary to draw. Defaults to shameless 4CAT advertisement.
  • fontsize_normal: Font size of normal text
  • fontsize_small: Font size of small text (e.g. footer)
  • fontsize_large: Font size of large text (e.g. header)
Returns

SVG canvas (via svgwrite) that can be drawn to

def call_api(action, payload=None, wait_for_response=True):
557def call_api(action, payload=None, wait_for_response=True):
558    """
559    Send message to server
560
561    Calls the internal API and returns interpreted response.
562
563    :param str action: API action
564    :param payload: API payload
565    :param bool wait_for_response:  Wait for response? If not close connection
566    immediately after sending data.
567
568    :return: API response, or timeout message in case of timeout
569    """
570    connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
571    connection.settimeout(15)
572    connection.connect((config.get('API_HOST'), config.get('API_PORT')))
573
574    msg = json.dumps({"request": action, "payload": payload})
575    connection.sendall(msg.encode("ascii", "ignore"))
576
577    if wait_for_response:
578        try:
579            response = ""
580            while True:
581                bytes = connection.recv(2048)
582                if not bytes:
583                    break
584
585                response += bytes.decode("ascii", "ignore")
586        except (socket.timeout, TimeoutError):
587            response = "(Connection timed out)"
588
589    try:
590        connection.shutdown(socket.SHUT_RDWR)
591    except OSError:
592        # already shut down automatically
593        pass
594    connection.close()
595
596    try:
597        return json.loads(response) if wait_for_response else None
598    except json.JSONDecodeError:
599        return response

Send message to server

Calls the internal API and returns interpreted response.

Parameters
  • str action: API action
  • payload: API payload
  • bool wait_for_response: Wait for response? If not close connection immediately after sending data.
Returns

API response, or timeout message in case of timeout

def get_interval_descriptor(item, interval):
602def get_interval_descriptor(item, interval):
603    """
604    Get interval descriptor based on timestamp
605
606    :param dict item:  Item to generate descriptor for, should have a
607    "timestamp" key
608    :param str interval:  Interval, one of "all", "overall", "year",
609    "month", "week", "day"
610    :return str:  Interval descriptor, e.g. "overall", "2020", "2020-08",
611    "2020-43", "2020-08-01"
612    """
613    if interval in ("all", "overall"):
614        return interval
615
616    if "timestamp" not in item:
617        raise ValueError("No date available for item in dataset")
618
619    # Catch cases where a custom timestamp has an epoch integer as value.
620    try:
621        timestamp = int(item["timestamp"])
622        try:
623            timestamp = datetime.datetime.fromtimestamp(timestamp)
624        except (ValueError, TypeError) as e:
625            raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"]))
626    except:
627        try:
628            timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S")
629        except (ValueError, TypeError) as e:
630            raise ValueError("Invalid date '%s'" % str(item["timestamp"]))
631
632    if interval == "year":
633        return str(timestamp.year)
634    elif interval == "month":
635        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2)
636    elif interval == "week":
637        return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2)
638    elif interval == "hour":
639        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
640            2) + " " + str(timestamp.hour).zfill(2)
641    elif interval == "minute":
642        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
643            2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2)
644    else:
645        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2)

Get interval descriptor based on timestamp

Parameters
  • dict item: Item to generate descriptor for, should have a "timestamp" key
  • str interval: Interval, one of "all", "overall", "year", "month", "week", "day"
Returns

Interval descriptor, e.g. "overall", "2020", "2020-08", "2020-43", "2020-08-01"

def pad_interval(intervals, first_interval=None, last_interval=None):
648def pad_interval(intervals, first_interval=None, last_interval=None):
649    """
650    Pad an interval so all intermediate intervals are filled
651
652    Warning, ugly code (PRs very welcome)
653
654    :param dict intervals:  A dictionary, with dates (YYYY{-MM}{-DD}) as keys
655    and a numerical value.
656    :param first_interval:
657    :param last_interval:
658    :return:
659    """
660    missing = 0
661    test_key = list(intervals.keys())[0]
662
663    # first determine the boundaries of the interval
664    # these may be passed as parameters, or they can be inferred from the
665    # interval given
666    if first_interval:
667        first_interval = str(first_interval)
668        first_year = int(first_interval[0:4])
669        if len(first_interval) > 4:
670            first_month = int(first_interval[5:7])
671        if len(first_interval) > 7:
672            first_day = int(first_interval[8:10])
673        if len(first_interval) > 10:
674            first_hour = int(first_interval[11:13])
675        if len(first_interval) > 13:
676            first_minute = int(first_interval[14:16])
677
678    else:
679        first_year = min([int(i[0:4]) for i in intervals])
680        if len(test_key) > 4:
681            first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year])
682        if len(test_key) > 7:
683            first_day = min(
684                [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month])
685        if len(test_key) > 10:
686            first_hour = min(
687                [int(i[11:13]) for i in intervals if
688                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day])
689        if len(test_key) > 13:
690            first_minute = min(
691                [int(i[14:16]) for i in intervals if
692                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int(
693                     i[11:13]) == first_hour])
694
695    if last_interval:
696        last_interval = str(last_interval)
697        last_year = int(last_interval[0:4])
698        if len(last_interval) > 4:
699            last_month = int(last_interval[5:7])
700        if len(last_interval) > 7:
701            last_day = int(last_interval[8:10])
702        if len(last_interval) > 10:
703            last_hour = int(last_interval[11:13])
704        if len(last_interval) > 13:
705            last_minute = int(last_interval[14:16])
706    else:
707        last_year = max([int(i[0:4]) for i in intervals])
708        if len(test_key) > 4:
709            last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year])
710        if len(test_key) > 7:
711            last_day = max(
712                [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month])
713        if len(test_key) > 10:
714            last_hour = max(
715                [int(i[11:13]) for i in intervals if
716                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day])
717        if len(test_key) > 13:
718            last_minute = max(
719                [int(i[14:16]) for i in intervals if
720                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int(
721                     i[11:13]) == last_hour])
722
723    has_month = re.match(r"^[0-9]{4}-[0-9]", test_key)
724    has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key)
725    has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key)
726    has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key)
727
728    all_intervals = []
729    for year in range(first_year, last_year + 1):
730        year_interval = str(year)
731
732        if not has_month:
733            all_intervals.append(year_interval)
734            continue
735
736        start_month = first_month if year == first_year else 1
737        end_month = last_month if year == last_year else 12
738        for month in range(start_month, end_month + 1):
739            month_interval = year_interval + "-" + str(month).zfill(2)
740
741            if not has_day:
742                all_intervals.append(month_interval)
743                continue
744
745            start_day = first_day if all((year == first_year, month == first_month)) else 1
746            end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1]
747            for day in range(start_day, end_day + 1):
748                day_interval = month_interval + "-" + str(day).zfill(2)
749
750                if not has_hour:
751                    all_intervals.append(day_interval)
752                    continue
753
754                start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0
755                end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23
756                for hour in range(start_hour, end_hour + 1):
757                    hour_interval = day_interval + " " + str(hour).zfill(2)
758
759                    if not has_minute:
760                        all_intervals.append(hour_interval)
761                        continue
762
763                    start_minute = first_minute if all(
764                        (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0
765                    end_minute = last_minute if all(
766                        (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59
767
768                    for minute in range(start_minute, end_minute + 1):
769                        minute_interval = hour_interval + ":" + str(minute).zfill(2)
770                        all_intervals.append(minute_interval)
771
772    for interval in all_intervals:
773        if interval not in intervals:
774            intervals[interval] = 0
775            missing += 1
776
777    # sort while we're at it
778    intervals = {key: intervals[key] for key in sorted(intervals)}
779
780    return missing, intervals

Pad an interval so all intermediate intervals are filled

Warning, ugly code (PRs very welcome)

Parameters
  • dict intervals: A dictionary, with dates (YYYY{-MM}{-DD}) as keys and a numerical value.
  • first_interval:
  • last_interval:
Returns
def remove_nuls(value):
783def remove_nuls(value):
784    """
785    Remove \0 from a value
786
787    The CSV library cries about a null byte when it encounters one :( :( :(
788    poor little csv cannot handle a tiny little null byte
789
790    So remove them from the data because they should not occur in utf-8 data
791    anyway.
792
793    :param value:  Value to remove nulls from. For dictionaries, sets, tuples
794    and lists all items are parsed recursively.
795    :return value:  Cleaned value
796    """
797    if type(value) is dict:
798        for field in value:
799            value[field] = remove_nuls(value[field])
800    elif type(value) is list:
801        value = [remove_nuls(item) for item in value]
802    elif type(value) is tuple:
803        value = tuple([remove_nuls(item) for item in value])
804    elif type(value) is set:
805        value = set([remove_nuls(item) for item in value])
806    elif type(value) is str:
807        value = value.replace("\0", "")
808
809    return value

Remove from a value

The CSV library cries about a null byte when it encounters one :( :( :( poor little csv cannot handle a tiny little null byte

So remove them from the data because they should not occur in utf-8 data anyway.

Parameters
  • value: Value to remove nulls from. For dictionaries, sets, tuples and lists all items are parsed recursively.
Returns

Cleaned value

class NullAwareTextIOWrapper(_io.TextIOWrapper):
812class NullAwareTextIOWrapper(io.TextIOWrapper):
813    """
814    TextIOWrapper that skips null bytes
815
816    This can be used as a file reader that silently discards any null bytes it
817    encounters.
818    """
819
820    def __next__(self):
821        value = super().__next__()
822        return remove_nuls(value)

TextIOWrapper that skips null bytes

This can be used as a file reader that silently discards any null bytes it encounters.

class HashCache:
825class HashCache:
826    """
827    Simple cache handler to cache hashed values
828
829    Avoids having to calculate a hash for values that have been hashed before
830    """
831
832    def __init__(self, hasher):
833        self.hash_cache = {}
834        self.hasher = hasher
835
836    def update_cache(self, value):
837        """
838        Checks the hash_cache to see if the value has been cached previously,
839        updates the hash_cache if needed, and returns the hashed value.
840        """
841        # value = str(value)
842        if value not in self.hash_cache:
843            author_hasher = self.hasher.copy()
844            author_hasher.update(str(value).encode("utf-8"))
845            self.hash_cache[value] = author_hasher.hexdigest()
846            del author_hasher
847        return self.hash_cache[value]

Simple cache handler to cache hashed values

Avoids having to calculate a hash for values that have been hashed before

HashCache(hasher)
832    def __init__(self, hasher):
833        self.hash_cache = {}
834        self.hasher = hasher
hash_cache
hasher
def update_cache(self, value):
836    def update_cache(self, value):
837        """
838        Checks the hash_cache to see if the value has been cached previously,
839        updates the hash_cache if needed, and returns the hashed value.
840        """
841        # value = str(value)
842        if value not in self.hash_cache:
843            author_hasher = self.hasher.copy()
844            author_hasher.update(str(value).encode("utf-8"))
845            self.hash_cache[value] = author_hasher.hexdigest()
846            del author_hasher
847        return self.hash_cache[value]

Checks the hash_cache to see if the value has been cached previously, updates the hash_cache if needed, and returns the hashed value.

def dict_search_and_update(item, keyword_matches, function):
850def dict_search_and_update(item, keyword_matches, function):
851    """
852    Filter fields in an object recursively
853
854    Apply a function to every item and sub item of a dictionary if the key
855    contains one of the provided match terms.
856
857    Function loops through a dictionary or list and compares dictionary keys to
858    the strings defined by keyword_matches. It then applies the change_function
859    to corresponding values.
860
861    Note: if a matching term is found, all nested values will have the function
862    applied to them. e.g., all these values would be changed even those with
863    not_key_match:
864
865    {'key_match' : 'changed',
866    'also_key_match' : {'not_key_match' : 'but_value_still_changed'},
867    'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]}
868
869    This is a comprehensive (and expensive) approach to updating a dictionary.
870    IF a dictionary structure is known, a better solution would be to update
871    using specific keys.
872
873    :param Dict/List item:  dictionary/list/json to loop through
874    :param String keyword_matches:  list of strings that will be matched to
875    dictionary keys. Can contain wildcards which are matched using fnmatch.
876    :param Function function:  function appled to all values of any items
877    nested under a matching key
878
879    :return Dict/List: Copy of original item, but filtered
880    """
881
882    def loop_helper_function(d_or_l, match_terms, change_function):
883        """
884        Recursive helper function that updates item in place
885        """
886        if isinstance(d_or_l, dict):
887            # Iterate through dictionary
888            for key, value in iter(d_or_l.items()):
889                if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]):
890                    # Match found; apply function to all items and sub-items
891                    if isinstance(value, (list, dict)):
892                        # Pass item through again with match_terms = True
893                        loop_helper_function(value, 'True', change_function)
894                    elif value is None:
895                        pass
896                    else:
897                        # Update the value
898                        d_or_l[key] = change_function(value)
899                elif isinstance(value, (list, dict)):
900                    # Continue search
901                    loop_helper_function(value, match_terms, change_function)
902        elif isinstance(d_or_l, list):
903            # Iterate through list
904            for n, value in enumerate(d_or_l):
905                if isinstance(value, (list, dict)):
906                    # Continue search
907                    loop_helper_function(value, match_terms, change_function)
908                elif match_terms == 'True':
909                    # List item nested in matching
910                    d_or_l[n] = change_function(value)
911        else:
912            raise Exception('Must pass list or dictionary')
913
914    # Lowercase keyword_matches
915    keyword_matches = [keyword.lower() for keyword in keyword_matches]
916
917    # Create deepcopy and return new item
918    temp_item = copy.deepcopy(item)
919    loop_helper_function(temp_item, keyword_matches, function)
920    return temp_item

Filter fields in an object recursively

Apply a function to every item and sub item of a dictionary if the key contains one of the provided match terms.

Function loops through a dictionary or list and compares dictionary keys to the strings defined by keyword_matches. It then applies the change_function to corresponding values.

Note: if a matching term is found, all nested values will have the function applied to them. e.g., all these values would be changed even those with not_key_match:

{'key_match' : 'changed', 'also_key_match' : {'not_key_match' : 'but_value_still_changed'}, 'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]}

This is a comprehensive (and expensive) approach to updating a dictionary. IF a dictionary structure is known, a better solution would be to update using specific keys.

Parameters
  • Dict/List item: dictionary/list/json to loop through
  • String keyword_matches: list of strings that will be matched to dictionary keys. Can contain wildcards which are matched using fnmatch.
  • Function function: function appled to all values of any items nested under a matching key
Returns

Copy of original item, but filtered

def get_last_line(filepath):
923def get_last_line(filepath):
924    """
925    Seeks from end of file for '\n' and returns that line
926
927    :param str filepath:  path to file
928    :return str: last line of file
929    """
930    with open(filepath, "rb") as file:
931        try:
932            # start at the end of file
933            file.seek(-2, os.SEEK_END)
934            # check if NOT endline i.e. '\n'
935            while file.read(1) != b'\n':
936                # if not '\n', back up two characters and check again
937                file.seek(-2, os.SEEK_CUR)
938        except OSError:
939            file.seek(0)
940        last_line = file.readline().decode()
941    return last_line

Seeks from end of file for ' ' and returns that line

:param str filepath:  path to file
:return str: last line of file
def add_notification(db, user, notification, expires=None, allow_dismiss=True):
944def add_notification(db, user, notification, expires=None, allow_dismiss=True):
945    db.insert("users_notifications", {
946        "username": user,
947        "notification": notification,
948        "timestamp_expires": expires,
949        "allow_dismiss": allow_dismiss
950    }, safe=True)
def send_email(recipient, message):
953def send_email(recipient, message):
954    """
955    Send an e-mail using the configured SMTP settings
956
957    Just a thin wrapper around smtplib, so we don't have to repeat ourselves.
958    Exceptions are to be handled outside the function.
959
960    :param list recipient:  Recipient e-mail addresses
961    :param MIMEMultipart message:  Message to send
962    """
963    # Create a secure SSL context
964    context = ssl.create_default_context()
965
966    # Decide which connection type
967    with smtplib.SMTP_SSL(config.get('mail.server'), port=config.get('mail.port', 0), context=context) if config.get(
968            'mail.ssl') == 'ssl' else smtplib.SMTP(config.get('mail.server'),
969                                                   port=config.get('mail.port', 0)) as server:
970        if config.get('mail.ssl') == 'tls':
971            # smtplib.SMTP adds TLS context here
972            server.starttls(context=context)
973
974        # Log in
975        if config.get('mail.username') and config.get('mail.password'):
976            server.ehlo()
977            server.login(config.get('mail.username'), config.get('mail.password'))
978
979        # Send message
980        if type(message) == str:
981            server.sendmail(config.get('mail.noreply'), recipient, message)
982        else:
983            server.sendmail(config.get('mail.noreply'), recipient, message.as_string())

Send an e-mail using the configured SMTP settings

Just a thin wrapper around smtplib, so we don't have to repeat ourselves. Exceptions are to be handled outside the function.

Parameters
  • list recipient: Recipient e-mail addresses
  • MIMEMultipart message: Message to send
def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'):
 986def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'):
 987    """
 988    Return a flattened dictionary where nested dictionary objects are given new
 989    keys using the partent key combined using the seperator with the child key.
 990
 991    Lists will be converted to json strings via json.dumps()
 992
 993    :param MutableMapping d:  Dictionary like object
 994    :param str partent_key: The original parent key prepending future nested keys
 995    :param str sep: A seperator string used to combine parent and child keys
 996    :return dict:  A new dictionary with the no nested values
 997    """
 998
 999    def _flatten_dict_gen(d, parent_key, sep):
1000        for k, v in d.items():
1001            new_key = parent_key + sep + k if parent_key else k
1002            if isinstance(v, MutableMapping):
1003                yield from flatten_dict(v, new_key, sep=sep).items()
1004            elif isinstance(v, (list, set)):
1005                yield new_key, json.dumps(
1006                    [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v])
1007            else:
1008                yield new_key, v
1009
1010    return dict(_flatten_dict_gen(d, parent_key, sep))

Return a flattened dictionary where nested dictionary objects are given new keys using the partent key combined using the seperator with the child key.

Lists will be converted to json strings via json.dumps()

Parameters
  • MutableMapping d: Dictionary like object
  • str partent_key: The original parent key prepending future nested keys
  • str sep: A seperator string used to combine parent and child keys
Returns

A new dictionary with the no nested values

def sets_to_lists(d: MutableMapping):
1013def sets_to_lists(d: MutableMapping):
1014    """
1015    Return a dictionary where all nested sets have been converted to lists.
1016
1017    :param MutableMapping d:  Dictionary like object
1018    :return dict:  A new dictionary with the no nested sets
1019    """
1020
1021    def _check_list(l):
1022        return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, (
1023        set, list)) else item for item in l]
1024
1025    def _sets_to_lists_gen(d):
1026        for k, v in d.items():
1027            if isinstance(v, MutableMapping):
1028                yield k, sets_to_lists(v)
1029            elif isinstance(v, (list, set)):
1030                yield k, _check_list(v)
1031            else:
1032                yield k, v
1033
1034    return dict(_sets_to_lists_gen(d))

Return a dictionary where all nested sets have been converted to lists.

Parameters
  • MutableMapping d: Dictionary like object
Returns

A new dictionary with the no nested sets

def url_to_hash(url, remove_scheme=True, remove_www=True):
1037def url_to_hash(url, remove_scheme=True, remove_www=True):
1038    """
1039    Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the
1040    rest of the URL.
1041    """
1042    parsed_url = urlparse(url.lower())
1043    if parsed_url:
1044        if remove_scheme:
1045            parsed_url = parsed_url._replace(scheme="")
1046        if remove_www:
1047            netloc = re.sub(r"^www\.", "", parsed_url.netloc)
1048            parsed_url = parsed_url._replace(netloc=netloc)
1049
1050        url = re.sub(r"[^0-9a-z]+", "_", urlunparse(parsed_url).strip("/"))
1051    else:
1052        # Unable to parse URL; use regex
1053        if remove_scheme:
1054            url = re.sub(r"^https?://", "", url)
1055        if remove_www:
1056            if not remove_scheme:
1057                scheme = re.match(r"^https?://", url).group()
1058                temp_url = re.sub(r"^https?://", "", url)
1059                url = scheme + re.sub(r"^www\.", "", temp_url)
1060            else:
1061                url = re.sub(r"^www\.", "", url)
1062
1063        url = re.sub(r"[^0-9a-z]+", "_", url.lower().strip("/"))
1064
1065    return hashlib.blake2b(url.encode("utf-8"), digest_size=24).hexdigest()

Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the rest of the URL.

def split_urls(url_string, allowed_schemes=None):
1068def split_urls(url_string, allowed_schemes=None):
1069    """
1070    Split URL text by \n and commas.
1071
1072    4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs
1073    and also check for commas within URLs using schemes.
1074
1075    Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com);
1076    this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail).
1077    """
1078    if allowed_schemes is None:
1079        allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://')
1080    potential_urls = []
1081    # Split the text by \n
1082    for line in url_string.split('\n'):
1083        # Handle commas that may exist within URLs
1084        parts = line.split(',')
1085        recombined_url = ""
1086        for part in parts:
1087            if part.startswith(allowed_schemes):  # Other schemes exist
1088                # New URL start detected
1089                if recombined_url:
1090                    # Already have a URL, add to list
1091                    potential_urls.append(recombined_url)
1092                # Start new URL
1093                recombined_url = part
1094            elif part:
1095                if recombined_url:
1096                    # Add to existing URL
1097                    recombined_url += "," + part
1098                else:
1099                    # No existing URL, start new
1100                    recombined_url = part
1101            else:
1102                # Ignore empty strings
1103                pass
1104        if recombined_url:
1105            # Add any remaining URL
1106            potential_urls.append(recombined_url)
1107    return potential_urls

Split URL text by and commas.

4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs and also check for commas within URLs using schemes.

Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com); this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail).

def folder_size(path='.'):
1110def folder_size(path='.'):
1111    """
1112    Get the size of a folder using os.scandir for efficiency
1113    """
1114    total = 0
1115    for entry in os.scandir(path):
1116        if entry.is_file():
1117            total += entry.stat().st_size
1118        elif entry.is_dir():
1119            total += folder_size(entry.path)
1120    return total

Get the size of a folder using os.scandir for efficiency