Edit on GitHub

common.lib.helpers

Miscellaneous helper functions for the 4CAT backend

   1"""
   2Miscellaneous helper functions for the 4CAT backend
   3"""
   4import subprocess
   5import imagehash
   6import hashlib
   7import requests
   8import datetime
   9import smtplib
  10import fnmatch
  11import socket
  12import oslex
  13import copy
  14import time
  15import json
  16import math
  17import ural
  18import csv
  19import ssl
  20import re
  21import os
  22import io
  23
  24from pathlib import Path
  25from collections.abc import MutableMapping
  26from html.parser import HTMLParser
  27from urllib.parse import urlparse, urlunparse
  28from calendar import monthrange
  29from packaging import version
  30from PIL import Image
  31
  32from common.config_manager import CoreConfigManager
  33from common.lib.user_input import UserInput
  34__all__ = ("UserInput",)
  35
  36core_config = CoreConfigManager()
  37
  38def init_datasource(database, logger, queue, name, config):
  39    """
  40    Initialize data source
  41
  42    Queues jobs to scrape the boards that were configured to be scraped in the
  43    4CAT configuration file. If none were configured, nothing happens.
  44
  45    :param Database database:  Database connection instance
  46    :param Logger logger:  Log handler
  47    :param JobQueue queue:  Job Queue instance
  48    :param string name:  ID of datasource that is being initialised
  49    :param config:  Configuration reader
  50    """
  51    pass
  52
  53def get_datasource_example_keys(db, modules, dataset_type):
  54    """
  55    Get example keys for a datasource
  56    """
  57    from common.lib.dataset import DataSet
  58    example_dataset_key = db.fetchone("SELECT key from datasets WHERE type = %s and is_finished = True and num_rows > 0 ORDER BY timestamp_finished DESC LIMIT 1", (dataset_type,))
  59    if example_dataset_key:
  60        example_dataset = DataSet(db=db, key=example_dataset_key["key"], modules=modules)
  61        return example_dataset.get_columns()
  62    return []
  63
  64def strip_tags(html, convert_newlines=True):
  65    """
  66    Strip HTML from a string
  67
  68    :param html: HTML to strip
  69    :param convert_newlines: Convert <br> and </p> tags to \n before stripping
  70    :return: Stripped HTML
  71    """
  72    if not html:
  73        return ""
  74
  75    deduplicate_newlines = re.compile(r"\n+")
  76
  77    if convert_newlines:
  78        html = html.replace("<br>", "\n").replace("</p>", "</p>\n")
  79        html = deduplicate_newlines.sub("\n", html)
  80
  81    class HTMLStripper(HTMLParser):
  82        def __init__(self):
  83            super().__init__()
  84            self.reset()
  85            self.strict = False
  86            self.convert_charrefs = True
  87            self.fed = []
  88
  89        def handle_data(self, data):
  90            self.fed.append(data)
  91
  92        def get_data(self):
  93            return "".join(self.fed)
  94
  95    stripper = HTMLStripper()
  96    stripper.feed(html)
  97    return stripper.get_data()
  98
  99
 100def sniff_encoding(file):
 101    """
 102    Determine encoding from raw file bytes
 103
 104    Currently only distinguishes UTF-8 and UTF-8 with BOM
 105
 106    :param file:
 107    :return:
 108    """
 109    if type(file) is bytearray:
 110        maybe_bom = file[:3]
 111    elif hasattr(file, "getbuffer"):
 112        buffer = file.getbuffer()
 113        maybe_bom = buffer[:3].tobytes()
 114    elif hasattr(file, "peek"):
 115        buffer = file.peek(32)
 116        maybe_bom = buffer[:3]
 117    else:
 118        maybe_bom = False
 119
 120    return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8"
 121
 122def sniff_csv_dialect(csv_input):
 123    """
 124    Determine CSV dialect for an input stream
 125
 126    :param csv_input:  Input stream
 127    :return tuple:  Tuple: Dialect object and a boolean representing whether
 128    the CSV file seems to have a header
 129    """
 130    encoding = sniff_encoding(csv_input)
 131    if type(csv_input) is io.TextIOWrapper:
 132        wrapped_input = csv_input
 133    else:
 134        wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding)
 135    wrapped_input.seek(0)
 136    sample = wrapped_input.read(1024 * 1024)
 137    wrapped_input.seek(0)
 138    has_header = csv.Sniffer().has_header(sample)
 139    dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 140
 141    return dialect, has_header
 142
 143
 144def get_git_branch():
 145    """
 146    Get current git branch
 147
 148    If the 4CAT root folder is a git repository, this function will return the
 149    name of the currently checked-out branch. If the folder is not a git
 150    repository or git is not installed an empty string is returned.
 151    """
 152    try:
 153        root_dir = str(core_config.get('PATH_ROOT').resolve())
 154        branch = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE)
 155        if branch.returncode != 0:
 156            raise ValueError()
 157        branch_name = branch.stdout.decode("utf-8").strip()
 158        if not branch_name:
 159            # Check for detached HEAD state
 160            # Most likely occuring because of checking out release tags (which are not branches) or commits
 161            head_status = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} status"), stdout=subprocess.PIPE)
 162            if head_status.returncode == 0:
 163                for line in head_status.stdout.decode("utf-8").split("\n"):
 164                    if any([detached_message in line for detached_message in ("HEAD detached from", "HEAD detached at")]):
 165                        branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1]
 166                        return branch_name.strip()
 167    except (subprocess.SubprocessError, ValueError, FileNotFoundError):
 168        return ""
 169
 170
 171def get_software_commit(worker=None):
 172    """
 173    Get current 4CAT git commit hash
 174
 175    Use `get_software_version()` instead if you need the release version
 176    number rather than the precise commit hash.
 177
 178    If no version file is available, run `git show` to test if there is a git
 179    repository in the 4CAT root folder, and if so, what commit is currently
 180    checked out in it.
 181
 182    For extensions, get the repository information for that extension, or if
 183    the extension is not a git repository, return empty data.
 184
 185    :param BasicWorker processor:  Worker to get commit for. If not given, get
 186    version information for the main 4CAT installation.
 187
 188    :return tuple:  4CAT git commit hash, repository name
 189    """
 190    # try git command line within the 4CAT root folder
 191    # if it is a checked-out git repository, it will tell us the hash of
 192    # the currently checked-out commit
 193
 194    # path has no Path.relative()...
 195    try:
 196        # if extension, go to the extension file's path
 197        # we will run git here - if it is not its own repository, we have no
 198        # useful version info (since the extension is by definition not in the
 199        # main 4CAT repository) and will return an empty value
 200        if worker and worker.is_extension:
 201            relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent
 202            working_dir = str(core_config.get("PATH_ROOT").joinpath(relative_filepath).resolve())
 203            # check if we are in the extensions' own repo or 4CAT's
 204            git_cmd = f"git -C {oslex.quote(working_dir)} rev-parse --show-toplevel"
 205            repo_level = subprocess.run(oslex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 206            if Path(repo_level.stdout.decode("utf-8")) == core_config.get("PATH_ROOT"):
 207                # not its own repository
 208                return ("", "")
 209
 210        else:
 211            working_dir = str(core_config.get("PATH_ROOT").resolve())
 212
 213        show = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 214        if show.returncode != 0:
 215            raise ValueError()
 216        commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1]
 217
 218        # now get the repository the commit belongs to, if we can
 219        origin = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 220        if origin.returncode != 0 or not origin.stdout:
 221            raise ValueError()
 222        repository = origin.stdout.decode("utf-8").strip()
 223        if repository.endswith(".git"):
 224            repository = repository[:-4]
 225
 226    except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError):
 227        return ("", "")
 228
 229    return (commit, repository)
 230
 231def get_software_version():
 232    """
 233    Get current 4CAT version
 234
 235    This is the actual software version, i.e. not the commit hash (see
 236    `get_software_hash()` for that). The current version is stored in a file
 237    with a canonical location: if the file doesn't exist, an empty string is
 238    returned.
 239
 240    :return str:  Software version, for example `1.37`.
 241    """
 242    current_version_file = core_config.get("PATH_CONFIG").joinpath(".current-version")
 243    if not current_version_file.exists():
 244        return ""
 245
 246    with current_version_file.open() as infile:
 247        return infile.readline().strip()
 248
 249def get_github_version(repo_url, timeout=5):
 250    """
 251    Get latest release tag version from GitHub
 252
 253    Will raise a ValueError if it cannot retrieve information from GitHub.
 254
 255    :param str repo_url:  GitHub repository URL
 256    :param int timeout:  Timeout in seconds for HTTP request
 257
 258    :return tuple:  Version, e.g. `1.26`, and release URL.
 259    """
 260    if not repo_url.endswith("/"):
 261        repo_url += "/"
 262
 263    repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url))
 264
 265    api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id
 266    response = requests.get(api_url, timeout=timeout)
 267    response = response.json()
 268    if response.get("message") == "Not Found":
 269        raise ValueError("Invalid GitHub URL or repository name")
 270
 271    latest_tag = response.get("tag_name", "unknown")
 272    if latest_tag.startswith("v"):
 273        latest_tag = re.sub(r"^v", "", latest_tag)
 274
 275    return (latest_tag, response.get("html_url"))
 276
 277def get_ffmpeg_version(ffmpeg_path):
 278    """
 279    Determine ffmpeg version
 280
 281    This can be necessary when using commands that change name between versions.
 282
 283    :param ffmpeg_path: ffmpeg executable path
 284    :return packaging.version:  Comparable ersion
 285    """
 286    command = [ffmpeg_path, "-version"]
 287    ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
 288                                    stderr=subprocess.PIPE)
 289
 290    ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1]
 291    ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0]
 292
 293    return version.parse(ffmpeg_version)
 294
 295
 296def find_extensions():
 297    """
 298    Find 4CAT extensions and load their metadata
 299
 300    Looks for subfolders of the extension folder, and loads additional metadata
 301    where available.
 302
 303    :return tuple:  A tuple with two items; the extensions, as an ID -> metadata
 304    dictionary, and a list of (str) errors encountered while loading
 305    """
 306    extension_path = core_config.get("PATH_EXTENSIONS")
 307    errors = []
 308    if not extension_path.exists() or not extension_path.is_dir():
 309        return {}, errors
 310
 311    # each folder in the extensions folder is an extension
 312    extensions = {
 313        extension.name: {
 314            "name": extension.name,
 315            "version": "",
 316            "url": "",
 317            "git_url": "",
 318            "is_git": False,
 319        } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir()
 320    }
 321
 322    # collect metadata for extensions
 323    allowed_metadata_keys = ("name", "version", "url")
 324    for extension in extensions:
 325        extension_folder = extension_path.joinpath(extension)
 326        metadata_file = extension_folder.joinpath("metadata.json")
 327        if metadata_file.exists():
 328            with metadata_file.open() as infile:
 329                try:
 330                    metadata = json.load(infile)
 331                    extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys})
 332                except (TypeError, ValueError) as e:
 333                    errors.append(f"Error reading metadata file for extension '{extension}' ({e})")
 334                    continue
 335
 336        extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists()
 337        if extensions[extension]["is_git"]:
 338            # try to get remote URL
 339            try:
 340                extension_root = str(extension_folder.resolve())
 341                origin = subprocess.run(oslex.split(f"git -C {oslex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE,
 342                                        stdout=subprocess.PIPE)
 343                if origin.returncode != 0 or not origin.stdout:
 344                    raise ValueError()
 345                repository = origin.stdout.decode("utf-8").strip()
 346                if repository.endswith(".git") and "github.com" in repository:
 347                    # use repo URL
 348                    repository = repository[:-4]
 349                extensions[extension]["git_url"] = repository
 350            except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e:
 351                print(e)
 352                pass
 353
 354    return extensions, errors
 355
 356
 357def convert_to_int(value, default=0):
 358    """
 359    Convert a value to an integer, with a fallback
 360
 361    The fallback is used if an Error is thrown during converstion to int.
 362    This is a convenience function, but beats putting try-catches everywhere
 363    we're using user input as an integer.
 364
 365    :param value:  Value to convert
 366    :param int default:  Default value, if conversion not possible
 367    :return int:  Converted value
 368    """
 369    try:
 370        return int(value)
 371    except (ValueError, TypeError):
 372        return default
 373
 374def convert_to_float(value, default=0, force=False) -> float:
 375    """
 376    Convert a value to a floating point, with a fallback
 377
 378    The fallback is used if an Error is thrown during converstion to float.
 379    This is a convenience function, but beats putting try-catches everywhere
 380    we're using user input as a floating point number.
 381
 382    :param value:  Value to convert
 383    :param int default:  Default value, if conversion not possible
 384    :param force:   Whether to force the value into a float if it is not empty or None.
 385    :return float:  Converted value
 386    """
 387    if force:
 388        return float(value) if value else default
 389    try:
 390        return float(value)
 391    except (ValueError, TypeError):
 392        return default
 393
 394
 395def timify(number, short=False):
 396    """
 397    Make a number look like an indication of time
 398
 399    :param number:  Number to convert. If the number is larger than the current
 400    UNIX timestamp, decrease by that amount
 401    :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes`
 402    """
 403    number = int(number)
 404
 405    components = []
 406    if number > time.time():
 407        number = time.time() - number
 408
 409    month_length = 30.42 * 86400
 410    months = math.floor(number / month_length)
 411    if months:
 412        components.append(f"{months}{'mt' if short else ' month'}{'s' if months != 1 and not short else ''}")
 413        number -= (months * month_length)
 414
 415    week_length = 7 * 86400
 416    weeks = math.floor(number / week_length)
 417    if weeks:
 418        components.append(f"{weeks}{'w' if short else ' week'}{'s' if weeks != 1 and not short else ''}")
 419        number -= (weeks * week_length)
 420
 421    day_length = 86400
 422    days = math.floor(number / day_length)
 423    if days:
 424        components.append(f"{days}{'d' if short else ' day'}{'s' if days != 1 and not short else ''}")
 425        number -= (days * day_length)
 426
 427    hour_length = 3600
 428    hours = math.floor(number / hour_length)
 429    if hours:
 430        components.append(f"{hours}{'h' if short else ' hour'}{'s' if hours != 1 and not short else ''}")
 431        number -= (hours * hour_length)
 432
 433    minute_length = 60
 434    minutes = math.floor(number / minute_length)
 435    if minutes:
 436        components.append(f"{minutes}{'m' if short else ' minute'}{'s' if minutes != 1 and not short else ''}")
 437
 438    if not components:
 439        components.append("less than a minute")
 440
 441    last_str = components.pop()
 442    time_str = ""
 443    if components:
 444        time_str = ", ".join(components)
 445        time_str += " and "
 446
 447    return time_str + last_str
 448
 449def nthify(integer: int) -> str:
 450    """
 451    Takes an integer and returns a string with 'st', 'nd', 'rd', or 'th' as suffix, depending on the number.
 452    """
 453    int_str = str(integer).strip()
 454    if int_str.endswith("1"):
 455        suffix = "st"
 456    elif int_str.endswith("2"):
 457        suffix = "nd"
 458    elif int_str.endswith("3"):
 459        suffix = "rd"
 460    else:
 461        suffix = "th"
 462    return int_str + suffix
 463
 464def andify(items):
 465    """
 466    Format a list of items for use in text
 467
 468    Returns a comma-separated list, the last item preceded by "and"
 469
 470    :param items:  Iterable list
 471    :return str:  Formatted string
 472    """
 473
 474    items = items.copy()
 475
 476    if len(items) == 0:
 477        return ""
 478    elif len(items) == 1:
 479        return str(items[1])
 480
 481    result = f" and {items.pop()}"
 482    return ", ".join([str(item) for item in items]) + result
 483
 484def ellipsiate(text, length, inside=False, ellipsis_str="&hellip;"):
 485    if len(text) <= length:
 486        return text
 487
 488    elif not inside:
 489        return text[:length] + ellipsis_str
 490
 491    else:
 492        # two cases: URLs and normal text
 493        # for URLs, try to only ellipsiate after the domain name
 494        # this makes the URLs easier to read when shortened
 495        if ural.is_url(text):
 496            pre_part = "/".join(text.split("/")[:3])
 497            if len(pre_part) < length - 6:  # kind of arbitrary
 498                before = len(pre_part) + 1
 499            else:
 500                before = math.floor(length / 2)
 501        else:
 502            before = math.floor(length / 2)
 503
 504        after = len(text) - before
 505        return text[:before] + ellipsis_str + text[after:]
 506
 507def hash_file(image_file, hash_type="file-hash"):
 508    """
 509    Generate an image hash
 510
 511    :param Path image_file:  Image file to hash
 512    :param str hash_type:  Hash type, one of `file-hash`, `colorhash`,
 513    `phash`, `average_hash`, `dhash`
 514    :return str:  Hexadecimal hash value
 515    """
 516    if not image_file.exists():
 517        raise FileNotFoundError()
 518
 519    if hash_type == "file-hash":
 520        hasher = hashlib.sha1()
 521
 522        # Open the file in binary mode
 523        with image_file.open("rb") as infile:
 524            # Read and update hash in chunks to handle large files
 525            while chunk := infile.read(1024):
 526                hasher.update(chunk)
 527
 528        return hasher.hexdigest()
 529
 530    elif hash_type in ("colorhash", "phash", "average_hash", "dhash"):
 531        image = Image.open(image_file)
 532
 533        return str(getattr(imagehash, hash_type)(image))
 534
 535    else:
 536        raise NotImplementedError(f"Unknown hash type '{hash_type}'")
 537
 538def get_yt_compatible_ids(yt_ids):
 539    """
 540    :param yt_ids list, a list of strings
 541    :returns list, a ist of joined strings in pairs of 50
 542
 543    Takes a list of IDs and returns list of joined strings
 544    in pairs of fifty. This should be done for the YouTube API
 545    that requires a comma-separated string and can only return
 546    max fifty results.
 547    """
 548
 549    # If there's only one item, return a single list item
 550    if isinstance(yt_ids, str):
 551        return [yt_ids]
 552
 553    ids = []
 554    last_i = 0
 555    for i, yt_id in enumerate(yt_ids):
 556
 557        # Add a joined string per fifty videos
 558        if i % 50 == 0 and i != 0:
 559            ids_string = ",".join(yt_ids[last_i:i])
 560            ids.append(ids_string)
 561            last_i = i
 562
 563        # If the end of the list is reached, add the last data
 564        elif i == (len(yt_ids) - 1):
 565            ids_string = ",".join(yt_ids[last_i:i])
 566            ids.append(ids_string)
 567
 568    return ids
 569
 570
 571def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None,
 572                    fontsize_small=None, fontsize_large=None):
 573    """
 574    Get a standard SVG canvas to draw 4CAT graphs to
 575
 576    Adds a border, footer, header, and some basic text styling
 577
 578    :param path:  The path where the SVG graph will be saved
 579    :param width:  Width of the canvas
 580    :param height:  Height of the canvas
 581    :param header:  Header, if necessary to draw
 582    :param footer:  Footer text, if necessary to draw. Defaults to shameless
 583    4CAT advertisement.
 584    :param fontsize_normal:  Font size of normal text
 585    :param fontsize_small:  Font size of small text (e.g. footer)
 586    :param fontsize_large:  Font size of large text (e.g. header)
 587    :return SVG:  SVG canvas (via svgwrite) that can be drawn to
 588    """
 589    from svgwrite.container import SVG, Hyperlink
 590    from svgwrite.drawing import Drawing
 591    from svgwrite.shapes import Rect
 592    from svgwrite.text import Text
 593
 594    if fontsize_normal is None:
 595        fontsize_normal = width / 75
 596
 597    if fontsize_small is None:
 598        fontsize_small = width / 100
 599
 600    if fontsize_large is None:
 601        fontsize_large = width / 50
 602
 603    # instantiate with border and white background
 604    canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal)
 605    canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF"))
 606
 607    # header
 608    if header:
 609        header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2))
 610        header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
 611        header_shape.add(
 612            Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
 613                 style="font-size:%ipx" % fontsize_large))
 614        canvas.add(header_shape)
 615
 616    # footer (i.e. 4cat banner)
 617    if footer:
 618        footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2)
 619        footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize)
 620        footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
 621        link = Hyperlink(href="https://4cat.nl")
 622        link.add(
 623            Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
 624                 style="font-size:%ipx" % fontsize_small))
 625        footer_shape.add(link)
 626        canvas.add(footer_shape)
 627
 628    return canvas
 629
 630
 631def call_api(action, payload=None, wait_for_response=True):
 632    """
 633    Send message to server
 634
 635    Calls the internal API and returns interpreted response. "status" is always 
 636    None if wait_for_response is False.
 637
 638    :param str action: API action
 639    :param payload: API payload
 640    :param bool wait_for_response:  Wait for response? If not close connection
 641    immediately after sending data.
 642
 643    :return: API response {"status": "success"|"error", "response": response, "error": error}
 644    """
 645    connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 646    connection.settimeout(15)
 647    config = CoreConfigManager()
 648    try:
 649        connection.connect((config.get('API_HOST'), config.get('API_PORT')))
 650    except ConnectionRefusedError:
 651        return {"status": "error", "error": "Connection refused"}
 652
 653    msg = json.dumps({"request": action, "payload": payload})
 654    connection.sendall(msg.encode("ascii", "ignore"))
 655
 656    response_data = {
 657        "status": None,
 658        "response": None,
 659        "error": None
 660    }
 661
 662    if wait_for_response:
 663        try:
 664            response = ""
 665            while True:
 666                bytes = connection.recv(2048)
 667                if not bytes:
 668                    break
 669
 670                response += bytes.decode("ascii", "ignore")
 671        except (socket.timeout, TimeoutError):
 672            response_data["status"] = "error"
 673            response_data["error"] = "Connection timed out"
 674
 675    try:
 676        connection.shutdown(socket.SHUT_RDWR)
 677    except OSError:
 678        # already shut down automatically
 679        pass
 680    connection.close()
 681
 682    if wait_for_response:
 683        try:
 684            json_response = json.loads(response)
 685            response_data["response"] = json_response["response"]
 686            response_data["error"] = json_response.get("error", None)
 687            response_data["status"] = "error" if json_response.get("error") else "success"
 688        except json.JSONDecodeError:
 689            response_data["status"] = "error"
 690            response_data["error"] = "Invalid JSON response"
 691            response_data["response"] = response
 692    
 693    return response_data
 694
 695def get_interval_descriptor(item, interval, item_column="timestamp"):
 696    """
 697    Get interval descriptor based on timestamp
 698
 699    :param dict item:  Item to generate descriptor for, should have a
 700    "timestamp" key
 701    :param str interval:  Interval, one of "all", "overall", "year",
 702    "month", "week", "day"
 703    :param str item_column:  Column name in the item dictionary that contains
 704    the timestamp. Defaults to "timestamp".
 705    :return str:  Interval descriptor, e.g. "overall", "unknown_date", "2020", "2020-08",
 706    "2020-43", "2020-08-01"
 707    """
 708    if interval in ("all", "overall"):
 709        return interval
 710    
 711    if not item.get(item_column, None):
 712        return "unknown_date"
 713
 714    # Catch cases where a custom timestamp has an epoch integer as value.
 715    try:
 716        timestamp = int(item[item_column])
 717        try:
 718            timestamp = datetime.datetime.fromtimestamp(timestamp)
 719        except (ValueError, TypeError):
 720            raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"]))
 721    except (TypeError, ValueError):
 722        try:
 723            timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S")
 724        except (ValueError, TypeError):
 725            raise ValueError("Invalid date '%s'" % str(item["timestamp"]))
 726
 727    if interval == "year":
 728        return str(timestamp.year)
 729    elif interval == "month":
 730        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2)
 731    elif interval == "week":
 732        return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2)
 733    elif interval == "hour":
 734        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
 735            2) + " " + str(timestamp.hour).zfill(2)
 736    elif interval == "minute":
 737        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
 738            2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2)
 739    else:
 740        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2)
 741
 742
 743def pad_interval(intervals, first_interval=None, last_interval=None):
 744    """
 745    Pad an interval so all intermediate intervals are filled
 746
 747    Warning, ugly code (PRs very welcome)
 748
 749    :param dict intervals:  A dictionary, with dates (YYYY{-MM}{-DD}) as keys
 750    and a numerical value.
 751    :param first_interval:
 752    :param last_interval:
 753    :return:
 754    """
 755    missing = 0
 756    try:
 757        test_key = list(intervals.keys())[0]
 758    except IndexError:
 759        return 0, {}
 760
 761    # first determine the boundaries of the interval
 762    # these may be passed as parameters, or they can be inferred from the
 763    # interval given
 764    if first_interval:
 765        first_interval = str(first_interval)
 766        first_year = int(first_interval[0:4])
 767        if len(first_interval) > 4:
 768            first_month = int(first_interval[5:7])
 769        if len(first_interval) > 7:
 770            first_day = int(first_interval[8:10])
 771        if len(first_interval) > 10:
 772            first_hour = int(first_interval[11:13])
 773        if len(first_interval) > 13:
 774            first_minute = int(first_interval[14:16])
 775
 776    else:
 777        first_year = min([int(i[0:4]) for i in intervals])
 778        if len(test_key) > 4:
 779            first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year])
 780        if len(test_key) > 7:
 781            first_day = min(
 782                [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month])
 783        if len(test_key) > 10:
 784            first_hour = min(
 785                [int(i[11:13]) for i in intervals if
 786                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day])
 787        if len(test_key) > 13:
 788            first_minute = min(
 789                [int(i[14:16]) for i in intervals if
 790                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int(
 791                     i[11:13]) == first_hour])
 792
 793    if last_interval:
 794        last_interval = str(last_interval)
 795        last_year = int(last_interval[0:4])
 796        if len(last_interval) > 4:
 797            last_month = int(last_interval[5:7])
 798        if len(last_interval) > 7:
 799            last_day = int(last_interval[8:10])
 800        if len(last_interval) > 10:
 801            last_hour = int(last_interval[11:13])
 802        if len(last_interval) > 13:
 803            last_minute = int(last_interval[14:16])
 804    else:
 805        last_year = max([int(i[0:4]) for i in intervals])
 806        if len(test_key) > 4:
 807            last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year])
 808        if len(test_key) > 7:
 809            last_day = max(
 810                [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month])
 811        if len(test_key) > 10:
 812            last_hour = max(
 813                [int(i[11:13]) for i in intervals if
 814                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day])
 815        if len(test_key) > 13:
 816            last_minute = max(
 817                [int(i[14:16]) for i in intervals if
 818                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int(
 819                     i[11:13]) == last_hour])
 820
 821    has_month = re.match(r"^[0-9]{4}-[0-9]", test_key)
 822    has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key)
 823    has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key)
 824    has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key)
 825
 826    all_intervals = []
 827    for year in range(first_year, last_year + 1):
 828        year_interval = str(year)
 829
 830        if not has_month:
 831            all_intervals.append(year_interval)
 832            continue
 833
 834        start_month = first_month if year == first_year else 1
 835        end_month = last_month if year == last_year else 12
 836        for month in range(start_month, end_month + 1):
 837            month_interval = year_interval + "-" + str(month).zfill(2)
 838
 839            if not has_day:
 840                all_intervals.append(month_interval)
 841                continue
 842
 843            start_day = first_day if all((year == first_year, month == first_month)) else 1
 844            end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1]
 845            for day in range(start_day, end_day + 1):
 846                day_interval = month_interval + "-" + str(day).zfill(2)
 847
 848                if not has_hour:
 849                    all_intervals.append(day_interval)
 850                    continue
 851
 852                start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0
 853                end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23
 854                for hour in range(start_hour, end_hour + 1):
 855                    hour_interval = day_interval + " " + str(hour).zfill(2)
 856
 857                    if not has_minute:
 858                        all_intervals.append(hour_interval)
 859                        continue
 860
 861                    start_minute = first_minute if all(
 862                        (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0
 863                    end_minute = last_minute if all(
 864                        (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59
 865
 866                    for minute in range(start_minute, end_minute + 1):
 867                        minute_interval = hour_interval + ":" + str(minute).zfill(2)
 868                        all_intervals.append(minute_interval)
 869
 870    for interval in all_intervals:
 871        if interval not in intervals:
 872            intervals[interval] = 0
 873            missing += 1
 874
 875    # sort while we're at it
 876    intervals = {key: intervals[key] for key in sorted(intervals)}
 877
 878    return missing, intervals
 879
 880
 881def remove_nuls(value):
 882    """
 883    Remove \0 from a value
 884
 885    The CSV library cries about a null byte when it encounters one :( :( :(
 886    poor little csv cannot handle a tiny little null byte
 887
 888    So remove them from the data because they should not occur in utf-8 data
 889    anyway.
 890
 891    :param value:  Value to remove nulls from. For dictionaries, sets, tuples
 892    and lists all items are parsed recursively.
 893    :return value:  Cleaned value
 894    """
 895    if type(value) is dict:
 896        for field in value:
 897            value[field] = remove_nuls(value[field])
 898    elif type(value) is list:
 899        value = [remove_nuls(item) for item in value]
 900    elif type(value) is tuple:
 901        value = tuple([remove_nuls(item) for item in value])
 902    elif type(value) is set:
 903        value = set([remove_nuls(item) for item in value])
 904    elif type(value) is str:
 905        value = value.replace("\0", "")
 906
 907    return value
 908
 909
 910class NullAwareTextIOWrapper(io.TextIOWrapper):
 911    """
 912    TextIOWrapper that skips null bytes
 913
 914    This can be used as a file reader that silently discards any null bytes it
 915    encounters.
 916    """
 917
 918    def __next__(self):
 919        value = super().__next__()
 920        return remove_nuls(value)
 921
 922
 923class HashCache:
 924    """
 925    Simple cache handler to cache hashed values
 926
 927    Avoids having to calculate a hash for values that have been hashed before
 928    """
 929
 930    def __init__(self, hasher):
 931        self.hash_cache = {}
 932        self.hasher = hasher
 933
 934    def update_cache(self, value):
 935        """
 936        Checks the hash_cache to see if the value has been cached previously,
 937        updates the hash_cache if needed, and returns the hashed value.
 938        """
 939        # value = str(value)
 940        if value not in self.hash_cache:
 941            author_hasher = self.hasher.copy()
 942            author_hasher.update(str(value).encode("utf-8"))
 943            self.hash_cache[value] = author_hasher.hexdigest()
 944            del author_hasher
 945        return self.hash_cache[value]
 946
 947
 948def dict_search_and_update(item, keyword_matches, function):
 949    """
 950    Filter fields in an object recursively
 951
 952    Apply a function to every item and sub item of a dictionary if the key
 953    contains one of the provided match terms.
 954
 955    Function loops through a dictionary or list and compares dictionary keys to
 956    the strings defined by keyword_matches. It then applies the change_function
 957    to corresponding values.
 958
 959    Note: if a matching term is found, all nested values will have the function
 960    applied to them. e.g., all these values would be changed even those with
 961    not_key_match:
 962
 963    {'key_match' : 'changed',
 964    'also_key_match' : {'not_key_match' : 'but_value_still_changed'},
 965    'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]}
 966
 967    This is a comprehensive (and expensive) approach to updating a dictionary.
 968    IF a dictionary structure is known, a better solution would be to update
 969    using specific keys.
 970
 971    :param Dict/List item:  dictionary/list/json to loop through
 972    :param String keyword_matches:  list of strings that will be matched to
 973    dictionary keys. Can contain wildcards which are matched using fnmatch.
 974    :param Function function:  function appled to all values of any items
 975    nested under a matching key
 976
 977    :return Dict/List: Copy of original item, but filtered
 978    """
 979
 980    def loop_helper_function(d_or_l, match_terms, change_function):
 981        """
 982        Recursive helper function that updates item in place
 983        """
 984        if isinstance(d_or_l, dict):
 985            # Iterate through dictionary
 986            for key, value in iter(d_or_l.items()):
 987                if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]):
 988                    # Match found; apply function to all items and sub-items
 989                    if isinstance(value, (list, dict)):
 990                        # Pass item through again with match_terms = True
 991                        loop_helper_function(value, 'True', change_function)
 992                    elif value is None:
 993                        pass
 994                    else:
 995                        # Update the value
 996                        d_or_l[key] = change_function(value)
 997                elif isinstance(value, (list, dict)):
 998                    # Continue search
 999                    loop_helper_function(value, match_terms, change_function)
1000        elif isinstance(d_or_l, list):
1001            # Iterate through list
1002            for n, value in enumerate(d_or_l):
1003                if isinstance(value, (list, dict)):
1004                    # Continue search
1005                    loop_helper_function(value, match_terms, change_function)
1006                elif match_terms == 'True':
1007                    # List item nested in matching
1008                    d_or_l[n] = change_function(value)
1009        else:
1010            raise Exception('Must pass list or dictionary')
1011
1012    # Lowercase keyword_matches
1013    keyword_matches = [keyword.lower() for keyword in keyword_matches]
1014
1015    # Create deepcopy and return new item
1016    temp_item = copy.deepcopy(item)
1017    loop_helper_function(temp_item, keyword_matches, function)
1018    return temp_item
1019
1020
1021def get_last_line(filepath):
1022    """
1023    Seeks from end of file for '\n' and returns that line
1024
1025    :param str filepath:  path to file
1026    :return str: last line of file
1027    """
1028    with open(filepath, "rb") as file:
1029        try:
1030            # start at the end of file
1031            file.seek(-2, os.SEEK_END)
1032            # check if NOT endline i.e. '\n'
1033            while file.read(1) != b'\n':
1034                # if not '\n', back up two characters and check again
1035                file.seek(-2, os.SEEK_CUR)
1036        except OSError:
1037            file.seek(0)
1038        last_line = file.readline().decode()
1039    return last_line
1040
1041
1042def add_notification(db, user, notification, expires=None, allow_dismiss=True):
1043    db.insert("users_notifications", {
1044        "username": user,
1045        "notification": notification,
1046        "timestamp_expires": expires,
1047        "allow_dismiss": allow_dismiss
1048    }, safe=True)
1049
1050
1051def send_email(recipient, message, mail_config):
1052    """
1053    Send an e-mail using the configured SMTP settings
1054
1055    Just a thin wrapper around smtplib, so we don't have to repeat ourselves.
1056    Exceptions are to be handled outside the function.
1057
1058    :param list recipient:  Recipient e-mail addresses
1059    :param MIMEMultipart message:  Message to send
1060    :param mail_config:  Configuration reader
1061    """
1062    # Create a secure SSL context
1063    context = ssl.create_default_context()
1064
1065    # Decide which connection type
1066    with smtplib.SMTP_SSL(mail_config.get('mail.server'), port=mail_config.get('mail.port', 0), context=context) if mail_config.get(
1067            'mail.ssl') == 'ssl' else smtplib.SMTP(mail_config.get('mail.server'),
1068                                                   port=mail_config.get('mail.port', 0)) as server:
1069        if mail_config.get('mail.ssl') == 'tls':
1070            # smtplib.SMTP adds TLS context here
1071            server.starttls(context=context)
1072
1073        # Log in
1074        if mail_config.get('mail.username') and mail_config.get('mail.password'):
1075            server.ehlo()
1076            server.login(mail_config.get('mail.username'), mail_config.get('mail.password'))
1077
1078        # Send message
1079        if type(message) is str:
1080            server.sendmail(mail_config.get('mail.noreply'), recipient, message)
1081        else:
1082            server.sendmail(mail_config.get('mail.noreply'), recipient, message.as_string())
1083
1084
1085def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'):
1086    """
1087    Return a flattened dictionary where nested dictionary objects are given new
1088    keys using the parent key combined using the seperator with the child key.
1089
1090    Lists will be converted to json strings via json.dumps()
1091
1092    :param MutableMapping d:  Dictionary like object
1093    :param str parent_key: The original parent key prepending future nested keys
1094    :param str sep: A seperator string used to combine parent and child keys
1095    :return dict:  A new dictionary with the no nested values
1096    """
1097
1098    def _flatten_dict_gen(d, parent_key, sep):
1099        for k, v in d.items():
1100            new_key = parent_key + sep + k if parent_key else k
1101            if isinstance(v, MutableMapping):
1102                yield from flatten_dict(v, new_key, sep=sep).items()
1103            elif isinstance(v, (list, set)):
1104                yield new_key, json.dumps(
1105                    [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v])
1106            else:
1107                yield new_key, v
1108
1109    return dict(_flatten_dict_gen(d, parent_key, sep))
1110
1111
1112def sets_to_lists(d: MutableMapping):
1113    """
1114    Return a dictionary where all nested sets have been converted to lists.
1115
1116    :param MutableMapping d:  Dictionary like object
1117    :return dict:  A new dictionary with the no nested sets
1118    """
1119
1120    def _check_list(lst):
1121        return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, (
1122        set, list)) else item for item in lst]
1123
1124    def _sets_to_lists_gen(d):
1125        for k, v in d.items():
1126            if isinstance(v, MutableMapping):
1127                yield k, sets_to_lists(v)
1128            elif isinstance(v, (list, set)):
1129                yield k, _check_list(v)
1130            else:
1131                yield k, v
1132
1133    return dict(_sets_to_lists_gen(d))
1134
1135
1136def url_to_hash(url, remove_scheme=True, remove_www=True):
1137    """
1138    Convert a URL to a hash. Allows removing scheme and www prefix before hashing.
1139    
1140    :param url: URL to hash
1141    :param remove_scheme: If True, removes the scheme from URL before hashing
1142    :param remove_www: If True, removes the www. prefix from URL before hashing
1143    :return: Hash of the URL
1144    """
1145    parsed_url = urlparse(url.lower())
1146    if parsed_url:
1147        if remove_scheme:
1148            parsed_url = parsed_url._replace(scheme="")
1149        if remove_www:
1150            netloc = re.sub(r"^www\.", "", parsed_url.netloc)
1151            parsed_url = parsed_url._replace(netloc=netloc)
1152        
1153        # Hash the normalized URL directly
1154        normalized_url = urlunparse(parsed_url).strip("/")
1155    else:
1156        # Unable to parse URL; use regex normalization
1157        normalized_url = url.lower().strip("/")
1158        if remove_scheme:
1159            normalized_url = re.sub(r"^https?://", "", normalized_url)
1160        if remove_www:
1161            if not remove_scheme:
1162                scheme_match = re.match(r"^https?://", normalized_url)
1163                if scheme_match:
1164                    scheme = scheme_match.group()
1165                    temp_url = re.sub(r"^https?://", "", normalized_url)
1166                    normalized_url = scheme + re.sub(r"^www\.", "", temp_url)
1167            else:
1168                normalized_url = re.sub(r"^www\.", "", normalized_url)
1169
1170    return hashlib.blake2b(normalized_url.encode("utf-8"), digest_size=24).hexdigest()
1171
1172def url_to_filename(url, staging_area=None, default_name="file", default_ext=".png", max_bytes=255, existing_filenames=None):
1173        """
1174        Determine filenames for saved files
1175
1176        Prefer the original filename (extracted from the URL), but this may not
1177        always be possible or be an actual filename. Also, avoid using the same
1178        filename multiple times. Ensures filenames don't exceed max_bytes.
1179
1180        Note: Collision possible without staging area (used to check for already 
1181        existing filenames).
1182
1183        :param str url:  URLs to determine filenames for
1184        :param Path staging_area:  Path to the staging area where files are saved
1185        (to avoid collisions); if None, no collision avoidance is done.
1186        :param str default_name:  Default name to use if no filename can be
1187        extracted from the URL
1188        :param str default_ext:  Default extension to use if no filename can be
1189        extracted from the URL
1190        :param int max_bytes:  Maximum number of bytes for the filename
1191        :return str:  Suitable file name
1192        """
1193        clean_filename = url.split("/")[-1].split("?")[0].split("#")[0]
1194        if re.match(r"[^.]+\.[a-zA-Z0-9]{1,10}", clean_filename):
1195            base_filename = clean_filename
1196        else:
1197            base_filename = default_name + default_ext
1198
1199        if not existing_filenames:
1200            existing_filenames = []
1201
1202        # Split base filename into name and extension
1203        if '.' in base_filename:
1204            name_part, ext_part = base_filename.rsplit('.', 1)
1205            ext_part = '.' + ext_part
1206        else:
1207            name_part = base_filename
1208            ext_part = ''
1209
1210        # Truncate base filename if it exceeds max_bytes
1211        if len(base_filename.encode('utf-8')) > max_bytes:
1212            # Reserve space for extension
1213            available_bytes = max_bytes - len(ext_part.encode('utf-8'))
1214            if available_bytes <= 0:
1215                # If extension is too long, use minimal name
1216                name_part = default_name
1217                ext_part = default_ext
1218                available_bytes = max_bytes - len(ext_part.encode('utf-8'))
1219            
1220            # Truncate name part to fit
1221            name_bytes = name_part.encode('utf-8')
1222            if len(name_bytes) > available_bytes:
1223                # Truncate byte by byte to ensure valid UTF-8
1224                while len(name_bytes) > available_bytes:
1225                    name_part = name_part[:-1]
1226                    name_bytes = name_part.encode('utf-8')
1227            
1228            base_filename = name_part + ext_part
1229
1230        filename = base_filename
1231
1232        if staging_area:
1233            # Ensure the filename is unique in the staging area
1234            file_path = staging_area.joinpath(filename)
1235            file_index = 1
1236            
1237            while file_path.exists() or filename in existing_filenames:
1238                # Calculate space needed for index suffix
1239                index_suffix = f"-{file_index}"
1240                
1241                # Check if filename with index would exceed max_bytes
1242                test_filename = name_part + index_suffix + ext_part
1243                if len(test_filename.encode('utf-8')) > max_bytes:
1244                    # Need to truncate name_part to make room for index
1245                    available_bytes = max_bytes - len((index_suffix + ext_part).encode('utf-8'))
1246                    if available_bytes <= 0:
1247                        # Extreme case - use minimal name
1248                        truncated_name = "f"
1249                    else:
1250                        # Truncate name_part to fit
1251                        truncated_name = name_part
1252                        name_bytes = truncated_name.encode('utf-8')
1253                        while len(name_bytes) > available_bytes:
1254                            truncated_name = truncated_name[:-1]
1255                            name_bytes = truncated_name.encode('utf-8')
1256                    
1257                    filename = truncated_name + index_suffix + ext_part
1258                else:
1259                    filename = test_filename
1260                
1261                file_index += 1
1262                file_path = staging_area.joinpath(filename)
1263
1264        return filename
1265
1266
1267def split_urls(url_string, allowed_schemes=None):
1268    """
1269    Split URL text by \n and commas.
1270
1271    4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs
1272    and also check for commas within URLs using schemes.
1273
1274    Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com);
1275    this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail).
1276    """
1277    if allowed_schemes is None:
1278        allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://')
1279    potential_urls = []
1280    # Split the text by \n
1281    for line in url_string.split('\n'):
1282        # Handle commas that may exist within URLs
1283        parts = line.split(',')
1284        recombined_url = ""
1285        for part in parts:
1286            if part.startswith(allowed_schemes):  # Other schemes exist
1287                # New URL start detected
1288                if recombined_url:
1289                    # Already have a URL, add to list
1290                    potential_urls.append(recombined_url)
1291                # Start new URL
1292                recombined_url = part
1293            elif part:
1294                if recombined_url:
1295                    # Add to existing URL
1296                    recombined_url += "," + part
1297                else:
1298                    # No existing URL, start new
1299                    recombined_url = part
1300            else:
1301                # Ignore empty strings
1302                pass
1303        if recombined_url:
1304            # Add any remaining URL
1305            potential_urls.append(recombined_url)
1306    return potential_urls
1307
1308
1309def folder_size(path='.'):
1310    """
1311    Get the size of a folder using os.scandir for efficiency
1312    """
1313    total = 0
1314    for entry in os.scandir(path):
1315        if entry.is_file():
1316            total += entry.stat().st_size
1317        elif entry.is_dir():
1318            total += folder_size(entry.path)
1319    return total
1320
1321def hash_to_md5(string: str) -> str:
1322    """
1323    Hash a string with an md5 hash.
1324    """
1325    return hashlib.md5(string.encode("utf-8")).hexdigest()
class UserInput:
 16class UserInput:
 17    """
 18    Class for handling user input
 19
 20    It is important to sanitise user input, as carelessly entered parameters
 21    may in e.g. requesting far more data than needed, or lead to undefined
 22    behaviour. This class offers a set of pre-defined value types that can be
 23    consistently rendered as form elements in an interface and parsed.
 24    """
 25    OPTION_TOGGLE = "toggle"  # boolean toggle (checkbox)
 26    OPTION_CHOICE = "choice"  # one choice out of a list (select)
 27    OPTION_TEXT = "string"  # simple string or integer (input text)
 28    OPTION_MULTI = "multi"  # multiple values out of a list (select multiple)
 29    OPTION_MULTI_SELECT = "multi_select"  # multiple values out of a dropdown list (select multiple)
 30    OPTION_INFO = "info"  # just a bit of text, not actual input
 31    OPTION_TEXT_LARGE = "textarea"  # longer text
 32    OPTION_TEXT_JSON = "json"  # text, but should be valid JSON
 33    OPTION_DATE = "date"  # a single date
 34    OPTION_DATERANGE = "daterange"  # a beginning and end date
 35    OPTION_DIVIDER = "divider"  # meta-option, divides related sets of options
 36    OPTION_FILE = "file"  # file upload
 37    OPTION_HUE = "hue"  # colour hue
 38    OPTION_DATASOURCES = "datasources"  # data source toggling
 39    OPTION_EXTENSIONS = "extensions"  # extension toggling
 40    OPTION_DATASOURCES_TABLE = "datasources_table"  # a table with settings per data source
 41    OPTION_ANNOTATION = "annotation"  # checkbox for whether to an annotation
 42    OPTION_ANNOTATIONS = "annotations"  # table for whether to write multiple annotations
 43
 44    OPTIONS_COSMETIC = (OPTION_INFO, OPTION_DIVIDER)
 45
 46    @staticmethod
 47    def parse_all(options, input, silently_correct=True):
 48        """
 49        Parse form input for the provided options
 50
 51        Ignores all input not belonging to any of the defined options: parses
 52        and sanitises the rest, and returns a dictionary with the sanitised
 53        options. If an option is *not* present in the input, the default value
 54        is used, and if that is absent, `None`.
 55
 56        In other words, this ensures a dictionary with 1) only white-listed
 57        keys, 2) a value of an expected type for each key.
 58
 59        :param dict options:  Options, as a name -> settings dictionary
 60        :param dict input:  Input, as a form field -> value dictionary
 61        :param bool silently_correct:  If true, replace invalid values with the
 62        given default value; else, raise a QueryParametersException if a value
 63        is invalid.
 64
 65        :return dict:  Sanitised form input
 66        """
 67
 68        from common.lib.helpers import convert_to_int
 69        parsed_input = {}
 70
 71        if type(input) is not dict and type(input) is not ImmutableMultiDict:
 72            raise TypeError("input must be a dictionary or ImmutableMultiDict")
 73
 74        if type(input) is ImmutableMultiDict:
 75            # we are not using to_dict, because that messes up multi-selects
 76            input = {key: input.getlist(key) for key in input}
 77            for key, value in input.items():
 78                if type(value) is list and len(value) == 1:
 79                    input[key] = value[0]
 80
 81        # all parameters are submitted as option-[parameter ID], this is an 
 82        # artifact of how the web interface works and we can simply remove the
 83        # prefix
 84        input = {re.sub(r"^option-", "", field): input[field] for field in input}
 85
 86        # re-order input so that the fields relying on the value of other
 87        # fields are parsed last
 88        options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)}
 89
 90        for option, settings in options.items():
 91            if settings.get("indirect"):
 92                # these are settings that are derived from and set by other
 93                # settings
 94                continue
 95
 96            if settings.get("type") in UserInput.OPTIONS_COSMETIC:
 97                # these are structural form elements and never have a value
 98                continue
 99
100            elif settings.get("type") == UserInput.OPTION_DATERANGE:
101                # special case, since it combines two inputs
102                option_min = option + "-min"
103                option_max = option + "-max"
104
105                # normally this is taken care of client-side, but in case this
106                # didn't work, try to salvage it server-side
107                if option_min not in input or input.get(option_min) == "-1":
108                    option_min += "_proxy"
109
110                if option_max not in input or input.get(option_max) == "-1":
111                    option_max += "_proxy"
112
113                # save as a tuple of unix timestamps (or None)
114                try:
115                    after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct))
116
117                    if before and after and after > before:
118                        if not silently_correct:
119                            raise QueryParametersException("End of date range must be after beginning of date range.")
120                        else:
121                            before = after
122
123                    parsed_input[option] = (after, before)
124                except RequirementsNotMetException:
125                    pass
126
127            elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
128                # special case too, since if a checkbox is unchecked, it simply
129                # does not show up in the input
130                try:
131                    if option in input:
132                        # Toggle needs to be parsed
133                        parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
134                    else:
135                        # Toggle was left blank
136                        parsed_input[option] = False
137                except RequirementsNotMetException:
138                    pass
139
140            elif settings.get("type") == UserInput.OPTION_DATASOURCES:
141                # special case, because this combines multiple inputs to
142                # configure data source availability and expiration
143                datasources = {datasource: {
144                    "enabled": f"{option}-enable-{datasource}" in input,
145                    "allow_optout": f"{option}-optout-{datasource}" in input,
146                    "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0)
147                } for datasource in input[option].split(",")}
148
149                parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]]
150                parsed_input[option.split(".")[0] + ".expiration"] = datasources
151
152            elif settings.get("type") == UserInput.OPTION_EXTENSIONS:
153                # also a special case
154                parsed_input[option] = {extension: {
155                    "enabled": f"{option}-enable-{extension}" in input
156                } for extension in input[option].split(",")}
157
158            elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE:
159                # special case, parse table values to generate a dict
160                columns = list(settings["columns"].keys())
161                table_input = {}
162
163                for datasource in list(settings["default"].keys()):
164                    table_input[datasource] = {}
165                    for column in columns:
166
167                        choice = input.get(option + "-" + datasource + "-" + column, False)
168                        column_settings = settings["columns"][column]  # sub-settings per column
169                        table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True)
170
171                parsed_input[option] = table_input
172
173            elif option not in input:
174                # not provided? use default
175                parsed_input[option] = settings.get("default", None)
176
177            else:
178                # normal parsing and sanitisation
179                try:
180                    parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
181                except RequirementsNotMetException:
182                    pass
183
184        return parsed_input
185
186    @staticmethod
187    def parse_value(settings, choice, other_input=None, silently_correct=True):
188        """
189        Filter user input
190
191        Makes sure user input for post-processors is valid and within the
192        parameters specified by the post-processor
193
194        :param obj settings:  Settings, including defaults and valid options
195        :param choice:  The chosen option, to be parsed
196        :param dict other_input:  Other input, as parsed so far
197        :param bool silently_correct:  If true, replace invalid values with the
198        given default value; else, raise a QueryParametersException if a value
199        is invalid.
200
201        :return:  Validated and parsed input
202        """
203        # short-circuit if there is a requirement for the field to be parsed
204        # and the requirement isn't met
205        if settings.get("requires"):
206            try:
207                field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0]
208            except IndexError:
209                # invalid condition, interpret as 'does the field with this name have a value'
210                field, operator, value = (choice, "!=", "")
211
212            if field not in other_input:
213                raise RequirementsNotMetException()
214
215            other_value = other_input.get(field)
216            if type(other_value) is bool:
217                # evalues to a boolean, i.e. checkboxes etc
218                if operator == "!=":
219                    if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")):
220                        raise RequirementsNotMetException()
221                else:
222                    if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")):
223                        raise RequirementsNotMetException()
224
225            else:
226                if type(other_value) in (tuple, list):
227                # iterables are a bit special
228                    if len(other_value) == 1:
229                        # treat one-item lists as "normal" values
230                        other_value = other_value[0]
231                    elif operator == "~=":  # interpret as 'is in list?'
232                        if value not in other_value:
233                            raise RequirementsNotMetException()
234                    else:
235                        # condition doesn't make sense for a list, so assume it's not True
236                        raise RequirementsNotMetException()
237
238                if operator == "^=" and not str(other_value).startswith(value):
239                    raise RequirementsNotMetException()
240                elif operator == "$=" and not str(other_value).endswith(value):
241                    raise RequirementsNotMetException()
242                elif operator == "~=" and value not in str(other_value):
243                    raise RequirementsNotMetException()
244                elif operator == "!=" and value == other_value:
245                    raise RequirementsNotMetException()
246                elif operator in ("==", "=") and value != other_value:
247                    raise RequirementsNotMetException()
248
249        input_type = settings.get("type", "")
250        if input_type in UserInput.OPTIONS_COSMETIC:
251            # these are structural form elements and can never return a value
252            return None
253
254        elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
255            # simple boolean toggle
256            if type(choice) is bool:
257                return choice
258            elif choice in ['false', 'False']:
259                # Sanitized options passed back to Flask can be converted to strings as 'false'
260                return False
261            elif choice in ['true', 'True', 'on']:
262                # Toggle will have value 'on', but may also becomes a string 'true'
263                return True
264            else:
265                raise QueryParametersException("Toggle invalid input")
266
267        elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE):
268            # parse either integers (unix timestamps) or try to guess the date
269            # format (the latter may be used for input if JavaScript is turned
270            # off in the front-end and the input comes from there)
271            value = None
272            try:
273                value = int(choice)
274            except ValueError:
275                parsed_choice = parse_datetime(choice)
276                value = int(parsed_choice.timestamp())
277            finally:
278                return value
279
280        elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS):
281            # any number of values out of a list of possible values
282            # comma-separated during input, returned as a list of valid options
283            if not choice:
284                return settings.get("default", [])
285
286            chosen = choice.split(",")
287            return [item for item in chosen if item in settings.get("options", [])]
288
289        elif input_type == UserInput.OPTION_MULTI_SELECT:
290            # multiple number of values out of a dropdown list of possible values
291            # comma-separated during input, returned as a list of valid options
292            if not choice:
293                return settings.get("default", [])
294
295            if type(choice) is str:
296                # should be a list if the form control was actually a multiselect
297                # but we have some client side UI helpers that may produce a string
298                # instead
299                choice = choice.split(",")
300
301            return [item for item in choice if item in settings.get("options", [])]
302
303        elif input_type == UserInput.OPTION_CHOICE:
304            # select box
305            # one out of multiple options
306            # return option if valid, or default
307            if choice not in settings.get("options"):
308                if not silently_correct:
309                    raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}")
310                else:
311                    return settings.get("default", "")
312            else:
313                return choice
314
315        elif input_type == UserInput.OPTION_TEXT_JSON:
316            # verify that this is actually json
317            try:
318                json.dumps(json.loads(choice))
319            except json.JSONDecodeError:
320                raise QueryParametersException("Invalid JSON value '%s'" % choice)
321
322            return json.loads(choice)
323
324        elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE):
325            # text string
326            # optionally clamp it as an integer; return default if not a valid
327            # integer (or float; inferred from default or made explicit via the
328            # coerce_type setting)
329            if settings.get("coerce_type"):
330                value_type = settings["coerce_type"]
331            else:
332                value_type = type(settings.get("default"))
333                if value_type not in (int, float):
334                    value_type = int
335
336            if "max" in settings:
337                try:
338                    choice = min(settings["max"], value_type(choice))
339                except (ValueError, TypeError):
340                    if not silently_correct:
341                        raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"]))
342
343                    choice = settings.get("default")
344
345            if "min" in settings:
346                try:
347                    choice = max(settings["min"], value_type(choice))
348                except (ValueError, TypeError):
349                    if not silently_correct:
350                        raise QueryParametersException("Provide a value of %s or more." % str(settings["min"]))
351
352                    choice = settings.get("default")
353
354            if choice is None or choice == "":
355                choice = settings.get("default")
356
357            if choice is None:
358                choice = 0 if "min" in settings or "max" in settings else ""
359
360            if settings.get("coerce_type"):
361                try:
362                    return value_type(choice)
363                except (ValueError, TypeError):
364                    return settings.get("default")
365            else:
366                return choice
367
368        else:
369            # no filtering
370            return choice

Class for handling user input

It is important to sanitise user input, as carelessly entered parameters may in e.g. requesting far more data than needed, or lead to undefined behaviour. This class offers a set of pre-defined value types that can be consistently rendered as form elements in an interface and parsed.

OPTION_TOGGLE = 'toggle'
OPTION_CHOICE = 'choice'
OPTION_TEXT = 'string'
OPTION_MULTI = 'multi'
OPTION_MULTI_SELECT = 'multi_select'
OPTION_INFO = 'info'
OPTION_TEXT_LARGE = 'textarea'
OPTION_TEXT_JSON = 'json'
OPTION_DATE = 'date'
OPTION_DATERANGE = 'daterange'
OPTION_DIVIDER = 'divider'
OPTION_FILE = 'file'
OPTION_HUE = 'hue'
OPTION_DATASOURCES = 'datasources'
OPTION_EXTENSIONS = 'extensions'
OPTION_DATASOURCES_TABLE = 'datasources_table'
OPTION_ANNOTATION = 'annotation'
OPTION_ANNOTATIONS = 'annotations'
OPTIONS_COSMETIC = ('info', 'divider')
@staticmethod
def parse_all(options, input, silently_correct=True):
 46    @staticmethod
 47    def parse_all(options, input, silently_correct=True):
 48        """
 49        Parse form input for the provided options
 50
 51        Ignores all input not belonging to any of the defined options: parses
 52        and sanitises the rest, and returns a dictionary with the sanitised
 53        options. If an option is *not* present in the input, the default value
 54        is used, and if that is absent, `None`.
 55
 56        In other words, this ensures a dictionary with 1) only white-listed
 57        keys, 2) a value of an expected type for each key.
 58
 59        :param dict options:  Options, as a name -> settings dictionary
 60        :param dict input:  Input, as a form field -> value dictionary
 61        :param bool silently_correct:  If true, replace invalid values with the
 62        given default value; else, raise a QueryParametersException if a value
 63        is invalid.
 64
 65        :return dict:  Sanitised form input
 66        """
 67
 68        from common.lib.helpers import convert_to_int
 69        parsed_input = {}
 70
 71        if type(input) is not dict and type(input) is not ImmutableMultiDict:
 72            raise TypeError("input must be a dictionary or ImmutableMultiDict")
 73
 74        if type(input) is ImmutableMultiDict:
 75            # we are not using to_dict, because that messes up multi-selects
 76            input = {key: input.getlist(key) for key in input}
 77            for key, value in input.items():
 78                if type(value) is list and len(value) == 1:
 79                    input[key] = value[0]
 80
 81        # all parameters are submitted as option-[parameter ID], this is an 
 82        # artifact of how the web interface works and we can simply remove the
 83        # prefix
 84        input = {re.sub(r"^option-", "", field): input[field] for field in input}
 85
 86        # re-order input so that the fields relying on the value of other
 87        # fields are parsed last
 88        options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)}
 89
 90        for option, settings in options.items():
 91            if settings.get("indirect"):
 92                # these are settings that are derived from and set by other
 93                # settings
 94                continue
 95
 96            if settings.get("type") in UserInput.OPTIONS_COSMETIC:
 97                # these are structural form elements and never have a value
 98                continue
 99
100            elif settings.get("type") == UserInput.OPTION_DATERANGE:
101                # special case, since it combines two inputs
102                option_min = option + "-min"
103                option_max = option + "-max"
104
105                # normally this is taken care of client-side, but in case this
106                # didn't work, try to salvage it server-side
107                if option_min not in input or input.get(option_min) == "-1":
108                    option_min += "_proxy"
109
110                if option_max not in input or input.get(option_max) == "-1":
111                    option_max += "_proxy"
112
113                # save as a tuple of unix timestamps (or None)
114                try:
115                    after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct))
116
117                    if before and after and after > before:
118                        if not silently_correct:
119                            raise QueryParametersException("End of date range must be after beginning of date range.")
120                        else:
121                            before = after
122
123                    parsed_input[option] = (after, before)
124                except RequirementsNotMetException:
125                    pass
126
127            elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
128                # special case too, since if a checkbox is unchecked, it simply
129                # does not show up in the input
130                try:
131                    if option in input:
132                        # Toggle needs to be parsed
133                        parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
134                    else:
135                        # Toggle was left blank
136                        parsed_input[option] = False
137                except RequirementsNotMetException:
138                    pass
139
140            elif settings.get("type") == UserInput.OPTION_DATASOURCES:
141                # special case, because this combines multiple inputs to
142                # configure data source availability and expiration
143                datasources = {datasource: {
144                    "enabled": f"{option}-enable-{datasource}" in input,
145                    "allow_optout": f"{option}-optout-{datasource}" in input,
146                    "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0)
147                } for datasource in input[option].split(",")}
148
149                parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]]
150                parsed_input[option.split(".")[0] + ".expiration"] = datasources
151
152            elif settings.get("type") == UserInput.OPTION_EXTENSIONS:
153                # also a special case
154                parsed_input[option] = {extension: {
155                    "enabled": f"{option}-enable-{extension}" in input
156                } for extension in input[option].split(",")}
157
158            elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE:
159                # special case, parse table values to generate a dict
160                columns = list(settings["columns"].keys())
161                table_input = {}
162
163                for datasource in list(settings["default"].keys()):
164                    table_input[datasource] = {}
165                    for column in columns:
166
167                        choice = input.get(option + "-" + datasource + "-" + column, False)
168                        column_settings = settings["columns"][column]  # sub-settings per column
169                        table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True)
170
171                parsed_input[option] = table_input
172
173            elif option not in input:
174                # not provided? use default
175                parsed_input[option] = settings.get("default", None)
176
177            else:
178                # normal parsing and sanitisation
179                try:
180                    parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
181                except RequirementsNotMetException:
182                    pass
183
184        return parsed_input

Parse form input for the provided options

Ignores all input not belonging to any of the defined options: parses and sanitises the rest, and returns a dictionary with the sanitised options. If an option is not present in the input, the default value is used, and if that is absent, None.

In other words, this ensures a dictionary with 1) only white-listed keys, 2) a value of an expected type for each key.

Parameters
  • dict options: Options, as a name -> settings dictionary
  • dict input: Input, as a form field -> value dictionary
  • bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns

Sanitised form input

@staticmethod
def parse_value(settings, choice, other_input=None, silently_correct=True):
186    @staticmethod
187    def parse_value(settings, choice, other_input=None, silently_correct=True):
188        """
189        Filter user input
190
191        Makes sure user input for post-processors is valid and within the
192        parameters specified by the post-processor
193
194        :param obj settings:  Settings, including defaults and valid options
195        :param choice:  The chosen option, to be parsed
196        :param dict other_input:  Other input, as parsed so far
197        :param bool silently_correct:  If true, replace invalid values with the
198        given default value; else, raise a QueryParametersException if a value
199        is invalid.
200
201        :return:  Validated and parsed input
202        """
203        # short-circuit if there is a requirement for the field to be parsed
204        # and the requirement isn't met
205        if settings.get("requires"):
206            try:
207                field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0]
208            except IndexError:
209                # invalid condition, interpret as 'does the field with this name have a value'
210                field, operator, value = (choice, "!=", "")
211
212            if field not in other_input:
213                raise RequirementsNotMetException()
214
215            other_value = other_input.get(field)
216            if type(other_value) is bool:
217                # evalues to a boolean, i.e. checkboxes etc
218                if operator == "!=":
219                    if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")):
220                        raise RequirementsNotMetException()
221                else:
222                    if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")):
223                        raise RequirementsNotMetException()
224
225            else:
226                if type(other_value) in (tuple, list):
227                # iterables are a bit special
228                    if len(other_value) == 1:
229                        # treat one-item lists as "normal" values
230                        other_value = other_value[0]
231                    elif operator == "~=":  # interpret as 'is in list?'
232                        if value not in other_value:
233                            raise RequirementsNotMetException()
234                    else:
235                        # condition doesn't make sense for a list, so assume it's not True
236                        raise RequirementsNotMetException()
237
238                if operator == "^=" and not str(other_value).startswith(value):
239                    raise RequirementsNotMetException()
240                elif operator == "$=" and not str(other_value).endswith(value):
241                    raise RequirementsNotMetException()
242                elif operator == "~=" and value not in str(other_value):
243                    raise RequirementsNotMetException()
244                elif operator == "!=" and value == other_value:
245                    raise RequirementsNotMetException()
246                elif operator in ("==", "=") and value != other_value:
247                    raise RequirementsNotMetException()
248
249        input_type = settings.get("type", "")
250        if input_type in UserInput.OPTIONS_COSMETIC:
251            # these are structural form elements and can never return a value
252            return None
253
254        elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
255            # simple boolean toggle
256            if type(choice) is bool:
257                return choice
258            elif choice in ['false', 'False']:
259                # Sanitized options passed back to Flask can be converted to strings as 'false'
260                return False
261            elif choice in ['true', 'True', 'on']:
262                # Toggle will have value 'on', but may also becomes a string 'true'
263                return True
264            else:
265                raise QueryParametersException("Toggle invalid input")
266
267        elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE):
268            # parse either integers (unix timestamps) or try to guess the date
269            # format (the latter may be used for input if JavaScript is turned
270            # off in the front-end and the input comes from there)
271            value = None
272            try:
273                value = int(choice)
274            except ValueError:
275                parsed_choice = parse_datetime(choice)
276                value = int(parsed_choice.timestamp())
277            finally:
278                return value
279
280        elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS):
281            # any number of values out of a list of possible values
282            # comma-separated during input, returned as a list of valid options
283            if not choice:
284                return settings.get("default", [])
285
286            chosen = choice.split(",")
287            return [item for item in chosen if item in settings.get("options", [])]
288
289        elif input_type == UserInput.OPTION_MULTI_SELECT:
290            # multiple number of values out of a dropdown list of possible values
291            # comma-separated during input, returned as a list of valid options
292            if not choice:
293                return settings.get("default", [])
294
295            if type(choice) is str:
296                # should be a list if the form control was actually a multiselect
297                # but we have some client side UI helpers that may produce a string
298                # instead
299                choice = choice.split(",")
300
301            return [item for item in choice if item in settings.get("options", [])]
302
303        elif input_type == UserInput.OPTION_CHOICE:
304            # select box
305            # one out of multiple options
306            # return option if valid, or default
307            if choice not in settings.get("options"):
308                if not silently_correct:
309                    raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}")
310                else:
311                    return settings.get("default", "")
312            else:
313                return choice
314
315        elif input_type == UserInput.OPTION_TEXT_JSON:
316            # verify that this is actually json
317            try:
318                json.dumps(json.loads(choice))
319            except json.JSONDecodeError:
320                raise QueryParametersException("Invalid JSON value '%s'" % choice)
321
322            return json.loads(choice)
323
324        elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE):
325            # text string
326            # optionally clamp it as an integer; return default if not a valid
327            # integer (or float; inferred from default or made explicit via the
328            # coerce_type setting)
329            if settings.get("coerce_type"):
330                value_type = settings["coerce_type"]
331            else:
332                value_type = type(settings.get("default"))
333                if value_type not in (int, float):
334                    value_type = int
335
336            if "max" in settings:
337                try:
338                    choice = min(settings["max"], value_type(choice))
339                except (ValueError, TypeError):
340                    if not silently_correct:
341                        raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"]))
342
343                    choice = settings.get("default")
344
345            if "min" in settings:
346                try:
347                    choice = max(settings["min"], value_type(choice))
348                except (ValueError, TypeError):
349                    if not silently_correct:
350                        raise QueryParametersException("Provide a value of %s or more." % str(settings["min"]))
351
352                    choice = settings.get("default")
353
354            if choice is None or choice == "":
355                choice = settings.get("default")
356
357            if choice is None:
358                choice = 0 if "min" in settings or "max" in settings else ""
359
360            if settings.get("coerce_type"):
361                try:
362                    return value_type(choice)
363                except (ValueError, TypeError):
364                    return settings.get("default")
365            else:
366                return choice
367
368        else:
369            # no filtering
370            return choice

Filter user input

Makes sure user input for post-processors is valid and within the parameters specified by the post-processor

Parameters
  • obj settings: Settings, including defaults and valid options
  • choice: The chosen option, to be parsed
  • dict other_input: Other input, as parsed so far
  • bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns

Validated and parsed input