Edit on GitHub

common.lib.helpers

Miscellaneous helper functions for the 4CAT backend

   1"""
   2Miscellaneous helper functions for the 4CAT backend
   3"""
   4import subprocess
   5import imagehash
   6import hashlib
   7import requests
   8import datetime
   9import smtplib
  10import fnmatch
  11import socket
  12import oslex
  13import copy
  14import time
  15import json
  16import math
  17import ural
  18import csv
  19import ssl
  20import re
  21import os
  22import io
  23
  24from pathlib import Path
  25from collections.abc import MutableMapping
  26from html.parser import HTMLParser
  27from urllib.parse import urlparse, urlunparse
  28from calendar import monthrange
  29from packaging import version
  30from PIL import Image
  31
  32from common.config_manager import CoreConfigManager
  33from common.lib.user_input import UserInput
  34__all__ = ("UserInput",)
  35
  36core_config = CoreConfigManager()
  37
  38def init_datasource(database, logger, queue, name, config):
  39    """
  40    Initialize data source
  41
  42    Queues jobs to scrape the boards that were configured to be scraped in the
  43    4CAT configuration file. If none were configured, nothing happens.
  44
  45    :param Database database:  Database connection instance
  46    :param Logger logger:  Log handler
  47    :param JobQueue queue:  Job Queue instance
  48    :param string name:  ID of datasource that is being initialised
  49    :param config:  Configuration reader
  50    """
  51    pass
  52
  53def get_datasource_example_keys(db, modules, dataset_type):
  54    """
  55    Get example keys for a datasource
  56    """
  57    from common.lib.dataset import DataSet
  58    example_dataset_key = db.fetchone("SELECT key from datasets WHERE type = %s and is_finished = True and num_rows > 0 ORDER BY timestamp_finished DESC LIMIT 1", (dataset_type,))
  59    if example_dataset_key:
  60        example_dataset = DataSet(db=db, key=example_dataset_key["key"], modules=modules)
  61        return example_dataset.get_columns()
  62    return []
  63
  64def strip_tags(html, convert_newlines=True):
  65    """
  66    Strip HTML from a string
  67
  68    :param html: HTML to strip
  69    :param convert_newlines: Convert <br> and </p> tags to \n before stripping
  70    :return: Stripped HTML
  71    """
  72    if not html:
  73        return ""
  74
  75    deduplicate_newlines = re.compile(r"\n+")
  76
  77    if convert_newlines:
  78        html = html.replace("<br>", "\n").replace("</p>", "</p>\n")
  79        html = deduplicate_newlines.sub("\n", html)
  80
  81    class HTMLStripper(HTMLParser):
  82        def __init__(self):
  83            super().__init__()
  84            self.reset()
  85            self.strict = False
  86            self.convert_charrefs = True
  87            self.fed = []
  88
  89        def handle_data(self, data):
  90            self.fed.append(data)
  91
  92        def get_data(self):
  93            return "".join(self.fed)
  94
  95    stripper = HTMLStripper()
  96    stripper.feed(html)
  97    return stripper.get_data()
  98
  99
 100def sniff_encoding(file):
 101    """
 102    Determine encoding from raw file bytes
 103
 104    Currently only distinguishes UTF-8 and UTF-8 with BOM
 105
 106    :param file:
 107    :return:
 108    """
 109    if type(file) is bytearray:
 110        maybe_bom = file[:3]
 111    elif hasattr(file, "getbuffer"):
 112        buffer = file.getbuffer()
 113        maybe_bom = buffer[:3].tobytes()
 114    elif hasattr(file, "peek"):
 115        buffer = file.peek(32)
 116        maybe_bom = buffer[:3]
 117    else:
 118        maybe_bom = False
 119
 120    return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8"
 121
 122def sniff_csv_dialect(csv_input):
 123    """
 124    Determine CSV dialect for an input stream
 125
 126    :param csv_input:  Input stream
 127    :return tuple:  Tuple: Dialect object and a boolean representing whether
 128    the CSV file seems to have a header
 129    """
 130    encoding = sniff_encoding(csv_input)
 131    if type(csv_input) is io.TextIOWrapper:
 132        wrapped_input = csv_input
 133    else:
 134        wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding)
 135    wrapped_input.seek(0)
 136    sample = wrapped_input.read(1024 * 1024)
 137    wrapped_input.seek(0)
 138    has_header = csv.Sniffer().has_header(sample)
 139    dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t"))
 140
 141    return dialect, has_header
 142
 143
 144def get_git_branch():
 145    """
 146    Get current git branch
 147
 148    If the 4CAT root folder is a git repository, this function will return the
 149    name of the currently checked-out branch. If the folder is not a git
 150    repository or git is not installed an empty string is returned.
 151    """
 152    try:
 153        root_dir = str(core_config.get('PATH_ROOT').resolve())
 154        branch = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE)
 155        if branch.returncode != 0:
 156            raise ValueError()
 157        branch_name = branch.stdout.decode("utf-8").strip()
 158        if not branch_name:
 159            # Check for detached HEAD state
 160            # Most likely occuring because of checking out release tags (which are not branches) or commits
 161            head_status = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} status"), stdout=subprocess.PIPE)
 162            if head_status.returncode == 0:
 163                for line in head_status.stdout.decode("utf-8").split("\n"):
 164                    if any([detached_message in line for detached_message in ("HEAD detached from", "HEAD detached at")]):
 165                        branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1]
 166                        return branch_name.strip()
 167    except (subprocess.SubprocessError, ValueError, FileNotFoundError):
 168        return ""
 169
 170
 171def get_software_commit(worker=None):
 172    """
 173    Get current 4CAT git commit hash
 174
 175    Use `get_software_version()` instead if you need the release version
 176    number rather than the precise commit hash.
 177
 178    If no version file is available, run `git show` to test if there is a git
 179    repository in the 4CAT root folder, and if so, what commit is currently
 180    checked out in it.
 181
 182    For extensions, get the repository information for that extension, or if
 183    the extension is not a git repository, return empty data.
 184
 185    :param BasicWorker processor:  Worker to get commit for. If not given, get
 186    version information for the main 4CAT installation.
 187
 188    :return tuple:  4CAT git commit hash, repository name
 189    """
 190    # try git command line within the 4CAT root folder
 191    # if it is a checked-out git repository, it will tell us the hash of
 192    # the currently checked-out commit
 193
 194    # path has no Path.relative()...
 195    try:
 196        # if extension, go to the extension file's path
 197        # we will run git here - if it is not its own repository, we have no
 198        # useful version info (since the extension is by definition not in the
 199        # main 4CAT repository) and will return an empty value
 200        if worker and worker.is_extension:
 201            relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent
 202            working_dir = str(core_config.get("PATH_ROOT").joinpath(relative_filepath).resolve())
 203            # check if we are in the extensions' own repo or 4CAT's
 204            git_cmd = f"git -C {oslex.quote(working_dir)} rev-parse --show-toplevel"
 205            repo_level = subprocess.run(oslex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 206            if Path(repo_level.stdout.decode("utf-8")) == core_config.get("PATH_ROOT"):
 207                # not its own repository
 208                return ("", "")
 209
 210        else:
 211            working_dir = str(core_config.get("PATH_ROOT").resolve())
 212
 213        show = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 214        if show.returncode != 0:
 215            raise ValueError()
 216        commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1]
 217
 218        # now get the repository the commit belongs to, if we can
 219        origin = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 220        if origin.returncode != 0 or not origin.stdout:
 221            raise ValueError()
 222        repository = origin.stdout.decode("utf-8").strip()
 223        if repository.endswith(".git"):
 224            repository = repository[:-4]
 225
 226    except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError):
 227        return ("", "")
 228
 229    return (commit, repository)
 230
 231def get_software_version():
 232    """
 233    Get current 4CAT version
 234
 235    This is the actual software version, i.e. not the commit hash (see
 236    `get_software_hash()` for that). The current version is stored in a file
 237    with a canonical location: if the file doesn't exist, an empty string is
 238    returned.
 239
 240    :return str:  Software version, for example `1.37`.
 241    """
 242    current_version_file = core_config.get("PATH_ROOT").joinpath("config/.current-version")
 243    if not current_version_file.exists():
 244        return ""
 245
 246    with current_version_file.open() as infile:
 247        return infile.readline().strip()
 248
 249def get_github_version(repo_url, timeout=5):
 250    """
 251    Get latest release tag version from GitHub
 252
 253    Will raise a ValueError if it cannot retrieve information from GitHub.
 254
 255    :param str repo_url:  GitHub repository URL
 256    :param int timeout:  Timeout in seconds for HTTP request
 257
 258    :return tuple:  Version, e.g. `1.26`, and release URL.
 259    """
 260    if not repo_url.endswith("/"):
 261        repo_url += "/"
 262
 263    repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url))
 264
 265    api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id
 266    response = requests.get(api_url, timeout=timeout)
 267    response = response.json()
 268    if response.get("message") == "Not Found":
 269        raise ValueError("Invalid GitHub URL or repository name")
 270
 271    latest_tag = response.get("tag_name", "unknown")
 272    if latest_tag.startswith("v"):
 273        latest_tag = re.sub(r"^v", "", latest_tag)
 274
 275    return (latest_tag, response.get("html_url"))
 276
 277def get_ffmpeg_version(ffmpeg_path):
 278    """
 279    Determine ffmpeg version
 280
 281    This can be necessary when using commands that change name between versions.
 282
 283    :param ffmpeg_path: ffmpeg executable path
 284    :return packaging.version:  Comparable ersion
 285    """
 286    command = [ffmpeg_path, "-version"]
 287    ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE,
 288                                    stderr=subprocess.PIPE)
 289
 290    ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1]
 291    ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0]
 292
 293    return version.parse(ffmpeg_version)
 294
 295
 296def find_extensions():
 297    """
 298    Find 4CAT extensions and load their metadata
 299
 300    Looks for subfolders of the extension folder, and loads additional metadata
 301    where available.
 302
 303    :return tuple:  A tuple with two items; the extensions, as an ID -> metadata
 304    dictionary, and a list of (str) errors encountered while loading
 305    """
 306    extension_path = core_config.get("PATH_ROOT").joinpath("extensions")
 307    errors = []
 308    if not extension_path.exists() or not extension_path.is_dir():
 309        return [], None
 310
 311    # each folder in the extensions folder is an extension
 312    extensions = {
 313        extension.name: {
 314            "name": extension.name,
 315            "version": "",
 316            "url": "",
 317            "git_url": "",
 318            "is_git": False
 319        } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir()
 320    }
 321
 322    # collect metadata for extensions
 323    allowed_metadata_keys = ("name", "version", "url")
 324    for extension in extensions:
 325        extension_folder = extension_path.joinpath(extension)
 326        metadata_file = extension_folder.joinpath("metadata.json")
 327        if metadata_file.exists():
 328            with metadata_file.open() as infile:
 329                try:
 330                    metadata = json.load(infile)
 331                    extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys})
 332                except (TypeError, ValueError) as e:
 333                    errors.append(f"Error reading metadata file for extension '{extension}' ({e})")
 334                    continue
 335
 336        extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists()
 337        if extensions[extension]["is_git"]:
 338            # try to get remote URL
 339            try:
 340                extension_root = str(extension_folder.resolve())
 341                origin = subprocess.run(oslex.split(f"git -C {oslex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE,
 342                                        stdout=subprocess.PIPE)
 343                if origin.returncode != 0 or not origin.stdout:
 344                    raise ValueError()
 345                repository = origin.stdout.decode("utf-8").strip()
 346                if repository.endswith(".git") and "github.com" in repository:
 347                    # use repo URL
 348                    repository = repository[:-4]
 349                extensions[extension]["git_url"] = repository
 350            except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e:
 351                print(e)
 352                pass
 353
 354    return extensions, errors
 355
 356
 357def convert_to_int(value, default=0):
 358    """
 359    Convert a value to an integer, with a fallback
 360
 361    The fallback is used if an Error is thrown during converstion to int.
 362    This is a convenience function, but beats putting try-catches everywhere
 363    we're using user input as an integer.
 364
 365    :param value:  Value to convert
 366    :param int default:  Default value, if conversion not possible
 367    :return int:  Converted value
 368    """
 369    try:
 370        return int(value)
 371    except (ValueError, TypeError):
 372        return default
 373
 374def convert_to_float(value, default=0, force=False) -> float:
 375    """
 376    Convert a value to a floating point, with a fallback
 377
 378    The fallback is used if an Error is thrown during converstion to float.
 379    This is a convenience function, but beats putting try-catches everywhere
 380    we're using user input as a floating point number.
 381
 382    :param value:  Value to convert
 383    :param int default:  Default value, if conversion not possible
 384    :param force:   Whether to force the value into a float if it is not empty or None.
 385    :return float:  Converted value
 386    """
 387    if force:
 388        return float(value) if value else default
 389    try:
 390        return float(value)
 391    except (ValueError, TypeError):
 392        return default
 393
 394
 395def timify(number, short=False):
 396    """
 397    Make a number look like an indication of time
 398
 399    :param number:  Number to convert. If the number is larger than the current
 400    UNIX timestamp, decrease by that amount
 401    :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes`
 402    """
 403    number = int(number)
 404
 405    components = []
 406    if number > time.time():
 407        number = time.time() - number
 408
 409    month_length = 30.42 * 86400
 410    months = math.floor(number / month_length)
 411    if months:
 412        components.append(f"{months}{'mt' if short else ' month'}{'s' if months != 1 and not short else ''}")
 413        number -= (months * month_length)
 414
 415    week_length = 7 * 86400
 416    weeks = math.floor(number / week_length)
 417    if weeks:
 418        components.append(f"{weeks}{'w' if short else ' week'}{'s' if weeks != 1 and not short else ''}")
 419        number -= (weeks * week_length)
 420
 421    day_length = 86400
 422    days = math.floor(number / day_length)
 423    if days:
 424        components.append(f"{days}{'d' if short else ' day'}{'s' if days != 1 and not short else ''}")
 425        number -= (days * day_length)
 426
 427    hour_length = 3600
 428    hours = math.floor(number / hour_length)
 429    if hours:
 430        components.append(f"{hours}{'h' if short else ' hour'}{'s' if hours != 1 and not short else ''}")
 431        number -= (hours * hour_length)
 432
 433    minute_length = 60
 434    minutes = math.floor(number / minute_length)
 435    if minutes:
 436        components.append(f"{minutes}{'m' if short else ' minute'}{'s' if minutes != 1 and not short else ''}")
 437
 438    if not components:
 439        components.append("less than a minute")
 440
 441    last_str = components.pop()
 442    time_str = ""
 443    if components:
 444        time_str = ", ".join(components)
 445        time_str += " and "
 446
 447    return time_str + last_str
 448
 449def andify(items):
 450    """
 451    Format a list of items for use in text
 452
 453    Returns a comma-separated list, the last item preceded by "and"
 454
 455    :param items:  Iterable list
 456    :return str:  Formatted string
 457    """
 458    if len(items) == 0:
 459        return ""
 460    elif len(items) == 1:
 461        return str(items[1])
 462
 463    result = f" and {items.pop()}"
 464    return ", ".join([str(item) for item in items]) + result
 465
 466def ellipsiate(text, length, inside=False, ellipsis_str="&hellip;"):
 467    if len(text) <= length:
 468        return text
 469
 470    elif not inside:
 471        return text[:length] + ellipsis_str
 472
 473    else:
 474        # two cases: URLs and normal text
 475        # for URLs, try to only ellipsiate after the domain name
 476        # this makes the URLs easier to read when shortened
 477        if ural.is_url(text):
 478            pre_part = "/".join(text.split("/")[:3])
 479            if len(pre_part) < length - 6:  # kind of arbitrary
 480                before = len(pre_part) + 1
 481            else:
 482                before = math.floor(length / 2)
 483        else:
 484            before = math.floor(length / 2)
 485
 486        after = len(text) - before
 487        return text[:before] + ellipsis_str + text[after:]
 488
 489def hash_file(image_file, hash_type="file-hash"):
 490    """
 491    Generate an image hash
 492
 493    :param Path image_file:  Image file to hash
 494    :param str hash_type:  Hash type, one of `file-hash`, `colorhash`,
 495    `phash`, `average_hash`, `dhash`
 496    :return str:  Hexadecimal hash value
 497    """
 498    if not image_file.exists():
 499        raise FileNotFoundError()
 500
 501    if hash_type == "file-hash":
 502        hasher = hashlib.sha1()
 503
 504        # Open the file in binary mode
 505        with image_file.open("rb") as infile:
 506            # Read and update hash in chunks to handle large files
 507            while chunk := infile.read(1024):
 508                hasher.update(chunk)
 509
 510        return hasher.hexdigest()
 511
 512    elif hash_type in ("colorhash", "phash", "average_hash", "dhash"):
 513        image = Image.open(image_file)
 514
 515        return str(getattr(imagehash, hash_type)(image))
 516
 517    else:
 518        raise NotImplementedError(f"Unknown hash type '{hash_type}'")
 519
 520def get_yt_compatible_ids(yt_ids):
 521    """
 522    :param yt_ids list, a list of strings
 523    :returns list, a ist of joined strings in pairs of 50
 524
 525    Takes a list of IDs and returns list of joined strings
 526    in pairs of fifty. This should be done for the YouTube API
 527    that requires a comma-separated string and can only return
 528    max fifty results.
 529    """
 530
 531    # If there's only one item, return a single list item
 532    if isinstance(yt_ids, str):
 533        return [yt_ids]
 534
 535    ids = []
 536    last_i = 0
 537    for i, yt_id in enumerate(yt_ids):
 538
 539        # Add a joined string per fifty videos
 540        if i % 50 == 0 and i != 0:
 541            ids_string = ",".join(yt_ids[last_i:i])
 542            ids.append(ids_string)
 543            last_i = i
 544
 545        # If the end of the list is reached, add the last data
 546        elif i == (len(yt_ids) - 1):
 547            ids_string = ",".join(yt_ids[last_i:i])
 548            ids.append(ids_string)
 549
 550    return ids
 551
 552
 553def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None,
 554                    fontsize_small=None, fontsize_large=None):
 555    """
 556    Get a standard SVG canvas to draw 4CAT graphs to
 557
 558    Adds a border, footer, header, and some basic text styling
 559
 560    :param path:  The path where the SVG graph will be saved
 561    :param width:  Width of the canvas
 562    :param height:  Height of the canvas
 563    :param header:  Header, if necessary to draw
 564    :param footer:  Footer text, if necessary to draw. Defaults to shameless
 565    4CAT advertisement.
 566    :param fontsize_normal:  Font size of normal text
 567    :param fontsize_small:  Font size of small text (e.g. footer)
 568    :param fontsize_large:  Font size of large text (e.g. header)
 569    :return SVG:  SVG canvas (via svgwrite) that can be drawn to
 570    """
 571    from svgwrite.container import SVG, Hyperlink
 572    from svgwrite.drawing import Drawing
 573    from svgwrite.shapes import Rect
 574    from svgwrite.text import Text
 575
 576    if fontsize_normal is None:
 577        fontsize_normal = width / 75
 578
 579    if fontsize_small is None:
 580        fontsize_small = width / 100
 581
 582    if fontsize_large is None:
 583        fontsize_large = width / 50
 584
 585    # instantiate with border and white background
 586    canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal)
 587    canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF"))
 588
 589    # header
 590    if header:
 591        header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2))
 592        header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
 593        header_shape.add(
 594            Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
 595                 style="font-size:%ipx" % fontsize_large))
 596        canvas.add(header_shape)
 597
 598    # footer (i.e. 4cat banner)
 599    if footer:
 600        footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2)
 601        footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize)
 602        footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000"))
 603        link = Hyperlink(href="https://4cat.nl")
 604        link.add(
 605            Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF",
 606                 style="font-size:%ipx" % fontsize_small))
 607        footer_shape.add(link)
 608        canvas.add(footer_shape)
 609
 610    return canvas
 611
 612
 613def call_api(action, payload=None, wait_for_response=True):
 614    """
 615    Send message to server
 616
 617    Calls the internal API and returns interpreted response. "status" is always 
 618    None if wait_for_response is False.
 619
 620    :param str action: API action
 621    :param payload: API payload
 622    :param bool wait_for_response:  Wait for response? If not close connection
 623    immediately after sending data.
 624
 625    :return: API response {"status": "success"|"error", "response": response, "error": error}
 626    """
 627    connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 628    connection.settimeout(15)
 629    config = CoreConfigManager()
 630    try:
 631        connection.connect((config.get('API_HOST'), config.get('API_PORT')))
 632    except ConnectionRefusedError:
 633        return {"status": "error", "error": "Connection refused"}
 634
 635    msg = json.dumps({"request": action, "payload": payload})
 636    connection.sendall(msg.encode("ascii", "ignore"))
 637
 638    response_data = {
 639        "status": None,
 640        "response": None,
 641        "error": None
 642    }
 643
 644    if wait_for_response:
 645        try:
 646            response = ""
 647            while True:
 648                bytes = connection.recv(2048)
 649                if not bytes:
 650                    break
 651
 652                response += bytes.decode("ascii", "ignore")
 653        except (socket.timeout, TimeoutError):
 654            response_data["status"] = "error"
 655            response_data["error"] = "Connection timed out"
 656
 657    try:
 658        connection.shutdown(socket.SHUT_RDWR)
 659    except OSError:
 660        # already shut down automatically
 661        pass
 662    connection.close()
 663
 664    if wait_for_response:
 665        try:
 666            json_response = json.loads(response)
 667            response_data["response"] = json_response["response"]
 668            response_data["error"] = json_response.get("error", None)
 669            response_data["status"] = "error" if json_response.get("error") else "success"
 670        except json.JSONDecodeError:
 671            response_data["status"] = "error"
 672            response_data["error"] = "Invalid JSON response"
 673            response_data["response"] = response
 674    
 675    return response_data
 676
 677def get_interval_descriptor(item, interval, item_column="timestamp"):
 678    """
 679    Get interval descriptor based on timestamp
 680
 681    :param dict item:  Item to generate descriptor for, should have a
 682    "timestamp" key
 683    :param str interval:  Interval, one of "all", "overall", "year",
 684    "month", "week", "day"
 685    :param str item_column:  Column name in the item dictionary that contains
 686    the timestamp. Defaults to "timestamp".
 687    :return str:  Interval descriptor, e.g. "overall", "unknown_date", "2020", "2020-08",
 688    "2020-43", "2020-08-01"
 689    """
 690    if interval in ("all", "overall"):
 691        return interval
 692    
 693    if not item.get(item_column, None):
 694        return "unknown_date"
 695
 696    # Catch cases where a custom timestamp has an epoch integer as value.
 697    try:
 698        timestamp = int(item[item_column])
 699        try:
 700            timestamp = datetime.datetime.fromtimestamp(timestamp)
 701        except (ValueError, TypeError):
 702            raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"]))
 703    except (TypeError, ValueError):
 704        try:
 705            timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S")
 706        except (ValueError, TypeError):
 707            raise ValueError("Invalid date '%s'" % str(item["timestamp"]))
 708
 709    if interval == "year":
 710        return str(timestamp.year)
 711    elif interval == "month":
 712        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2)
 713    elif interval == "week":
 714        return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2)
 715    elif interval == "hour":
 716        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
 717            2) + " " + str(timestamp.hour).zfill(2)
 718    elif interval == "minute":
 719        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(
 720            2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2)
 721    else:
 722        return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2)
 723
 724
 725def pad_interval(intervals, first_interval=None, last_interval=None):
 726    """
 727    Pad an interval so all intermediate intervals are filled
 728
 729    Warning, ugly code (PRs very welcome)
 730
 731    :param dict intervals:  A dictionary, with dates (YYYY{-MM}{-DD}) as keys
 732    and a numerical value.
 733    :param first_interval:
 734    :param last_interval:
 735    :return:
 736    """
 737    missing = 0
 738    try:
 739        test_key = list(intervals.keys())[0]
 740    except IndexError:
 741        return 0, {}
 742
 743    # first determine the boundaries of the interval
 744    # these may be passed as parameters, or they can be inferred from the
 745    # interval given
 746    if first_interval:
 747        first_interval = str(first_interval)
 748        first_year = int(first_interval[0:4])
 749        if len(first_interval) > 4:
 750            first_month = int(first_interval[5:7])
 751        if len(first_interval) > 7:
 752            first_day = int(first_interval[8:10])
 753        if len(first_interval) > 10:
 754            first_hour = int(first_interval[11:13])
 755        if len(first_interval) > 13:
 756            first_minute = int(first_interval[14:16])
 757
 758    else:
 759        first_year = min([int(i[0:4]) for i in intervals])
 760        if len(test_key) > 4:
 761            first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year])
 762        if len(test_key) > 7:
 763            first_day = min(
 764                [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month])
 765        if len(test_key) > 10:
 766            first_hour = min(
 767                [int(i[11:13]) for i in intervals if
 768                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day])
 769        if len(test_key) > 13:
 770            first_minute = min(
 771                [int(i[14:16]) for i in intervals if
 772                 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int(
 773                     i[11:13]) == first_hour])
 774
 775    if last_interval:
 776        last_interval = str(last_interval)
 777        last_year = int(last_interval[0:4])
 778        if len(last_interval) > 4:
 779            last_month = int(last_interval[5:7])
 780        if len(last_interval) > 7:
 781            last_day = int(last_interval[8:10])
 782        if len(last_interval) > 10:
 783            last_hour = int(last_interval[11:13])
 784        if len(last_interval) > 13:
 785            last_minute = int(last_interval[14:16])
 786    else:
 787        last_year = max([int(i[0:4]) for i in intervals])
 788        if len(test_key) > 4:
 789            last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year])
 790        if len(test_key) > 7:
 791            last_day = max(
 792                [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month])
 793        if len(test_key) > 10:
 794            last_hour = max(
 795                [int(i[11:13]) for i in intervals if
 796                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day])
 797        if len(test_key) > 13:
 798            last_minute = max(
 799                [int(i[14:16]) for i in intervals if
 800                 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int(
 801                     i[11:13]) == last_hour])
 802
 803    has_month = re.match(r"^[0-9]{4}-[0-9]", test_key)
 804    has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key)
 805    has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key)
 806    has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key)
 807
 808    all_intervals = []
 809    for year in range(first_year, last_year + 1):
 810        year_interval = str(year)
 811
 812        if not has_month:
 813            all_intervals.append(year_interval)
 814            continue
 815
 816        start_month = first_month if year == first_year else 1
 817        end_month = last_month if year == last_year else 12
 818        for month in range(start_month, end_month + 1):
 819            month_interval = year_interval + "-" + str(month).zfill(2)
 820
 821            if not has_day:
 822                all_intervals.append(month_interval)
 823                continue
 824
 825            start_day = first_day if all((year == first_year, month == first_month)) else 1
 826            end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1]
 827            for day in range(start_day, end_day + 1):
 828                day_interval = month_interval + "-" + str(day).zfill(2)
 829
 830                if not has_hour:
 831                    all_intervals.append(day_interval)
 832                    continue
 833
 834                start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0
 835                end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23
 836                for hour in range(start_hour, end_hour + 1):
 837                    hour_interval = day_interval + " " + str(hour).zfill(2)
 838
 839                    if not has_minute:
 840                        all_intervals.append(hour_interval)
 841                        continue
 842
 843                    start_minute = first_minute if all(
 844                        (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0
 845                    end_minute = last_minute if all(
 846                        (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59
 847
 848                    for minute in range(start_minute, end_minute + 1):
 849                        minute_interval = hour_interval + ":" + str(minute).zfill(2)
 850                        all_intervals.append(minute_interval)
 851
 852    for interval in all_intervals:
 853        if interval not in intervals:
 854            intervals[interval] = 0
 855            missing += 1
 856
 857    # sort while we're at it
 858    intervals = {key: intervals[key] for key in sorted(intervals)}
 859
 860    return missing, intervals
 861
 862
 863def remove_nuls(value):
 864    """
 865    Remove \0 from a value
 866
 867    The CSV library cries about a null byte when it encounters one :( :( :(
 868    poor little csv cannot handle a tiny little null byte
 869
 870    So remove them from the data because they should not occur in utf-8 data
 871    anyway.
 872
 873    :param value:  Value to remove nulls from. For dictionaries, sets, tuples
 874    and lists all items are parsed recursively.
 875    :return value:  Cleaned value
 876    """
 877    if type(value) is dict:
 878        for field in value:
 879            value[field] = remove_nuls(value[field])
 880    elif type(value) is list:
 881        value = [remove_nuls(item) for item in value]
 882    elif type(value) is tuple:
 883        value = tuple([remove_nuls(item) for item in value])
 884    elif type(value) is set:
 885        value = set([remove_nuls(item) for item in value])
 886    elif type(value) is str:
 887        value = value.replace("\0", "")
 888
 889    return value
 890
 891
 892class NullAwareTextIOWrapper(io.TextIOWrapper):
 893    """
 894    TextIOWrapper that skips null bytes
 895
 896    This can be used as a file reader that silently discards any null bytes it
 897    encounters.
 898    """
 899
 900    def __next__(self):
 901        value = super().__next__()
 902        return remove_nuls(value)
 903
 904
 905class HashCache:
 906    """
 907    Simple cache handler to cache hashed values
 908
 909    Avoids having to calculate a hash for values that have been hashed before
 910    """
 911
 912    def __init__(self, hasher):
 913        self.hash_cache = {}
 914        self.hasher = hasher
 915
 916    def update_cache(self, value):
 917        """
 918        Checks the hash_cache to see if the value has been cached previously,
 919        updates the hash_cache if needed, and returns the hashed value.
 920        """
 921        # value = str(value)
 922        if value not in self.hash_cache:
 923            author_hasher = self.hasher.copy()
 924            author_hasher.update(str(value).encode("utf-8"))
 925            self.hash_cache[value] = author_hasher.hexdigest()
 926            del author_hasher
 927        return self.hash_cache[value]
 928
 929
 930def dict_search_and_update(item, keyword_matches, function):
 931    """
 932    Filter fields in an object recursively
 933
 934    Apply a function to every item and sub item of a dictionary if the key
 935    contains one of the provided match terms.
 936
 937    Function loops through a dictionary or list and compares dictionary keys to
 938    the strings defined by keyword_matches. It then applies the change_function
 939    to corresponding values.
 940
 941    Note: if a matching term is found, all nested values will have the function
 942    applied to them. e.g., all these values would be changed even those with
 943    not_key_match:
 944
 945    {'key_match' : 'changed',
 946    'also_key_match' : {'not_key_match' : 'but_value_still_changed'},
 947    'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]}
 948
 949    This is a comprehensive (and expensive) approach to updating a dictionary.
 950    IF a dictionary structure is known, a better solution would be to update
 951    using specific keys.
 952
 953    :param Dict/List item:  dictionary/list/json to loop through
 954    :param String keyword_matches:  list of strings that will be matched to
 955    dictionary keys. Can contain wildcards which are matched using fnmatch.
 956    :param Function function:  function appled to all values of any items
 957    nested under a matching key
 958
 959    :return Dict/List: Copy of original item, but filtered
 960    """
 961
 962    def loop_helper_function(d_or_l, match_terms, change_function):
 963        """
 964        Recursive helper function that updates item in place
 965        """
 966        if isinstance(d_or_l, dict):
 967            # Iterate through dictionary
 968            for key, value in iter(d_or_l.items()):
 969                if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]):
 970                    # Match found; apply function to all items and sub-items
 971                    if isinstance(value, (list, dict)):
 972                        # Pass item through again with match_terms = True
 973                        loop_helper_function(value, 'True', change_function)
 974                    elif value is None:
 975                        pass
 976                    else:
 977                        # Update the value
 978                        d_or_l[key] = change_function(value)
 979                elif isinstance(value, (list, dict)):
 980                    # Continue search
 981                    loop_helper_function(value, match_terms, change_function)
 982        elif isinstance(d_or_l, list):
 983            # Iterate through list
 984            for n, value in enumerate(d_or_l):
 985                if isinstance(value, (list, dict)):
 986                    # Continue search
 987                    loop_helper_function(value, match_terms, change_function)
 988                elif match_terms == 'True':
 989                    # List item nested in matching
 990                    d_or_l[n] = change_function(value)
 991        else:
 992            raise Exception('Must pass list or dictionary')
 993
 994    # Lowercase keyword_matches
 995    keyword_matches = [keyword.lower() for keyword in keyword_matches]
 996
 997    # Create deepcopy and return new item
 998    temp_item = copy.deepcopy(item)
 999    loop_helper_function(temp_item, keyword_matches, function)
1000    return temp_item
1001
1002
1003def get_last_line(filepath):
1004    """
1005    Seeks from end of file for '\n' and returns that line
1006
1007    :param str filepath:  path to file
1008    :return str: last line of file
1009    """
1010    with open(filepath, "rb") as file:
1011        try:
1012            # start at the end of file
1013            file.seek(-2, os.SEEK_END)
1014            # check if NOT endline i.e. '\n'
1015            while file.read(1) != b'\n':
1016                # if not '\n', back up two characters and check again
1017                file.seek(-2, os.SEEK_CUR)
1018        except OSError:
1019            file.seek(0)
1020        last_line = file.readline().decode()
1021    return last_line
1022
1023
1024def add_notification(db, user, notification, expires=None, allow_dismiss=True):
1025    db.insert("users_notifications", {
1026        "username": user,
1027        "notification": notification,
1028        "timestamp_expires": expires,
1029        "allow_dismiss": allow_dismiss
1030    }, safe=True)
1031
1032
1033def send_email(recipient, message, mail_config):
1034    """
1035    Send an e-mail using the configured SMTP settings
1036
1037    Just a thin wrapper around smtplib, so we don't have to repeat ourselves.
1038    Exceptions are to be handled outside the function.
1039
1040    :param list recipient:  Recipient e-mail addresses
1041    :param MIMEMultipart message:  Message to send
1042    :param mail_config:  Configuration reader
1043    """
1044    # Create a secure SSL context
1045    context = ssl.create_default_context()
1046
1047    # Decide which connection type
1048    with smtplib.SMTP_SSL(mail_config.get('mail.server'), port=mail_config.get('mail.port', 0), context=context) if mail_config.get(
1049            'mail.ssl') == 'ssl' else smtplib.SMTP(mail_config.get('mail.server'),
1050                                                   port=mail_config.get('mail.port', 0)) as server:
1051        if mail_config.get('mail.ssl') == 'tls':
1052            # smtplib.SMTP adds TLS context here
1053            server.starttls(context=context)
1054
1055        # Log in
1056        if mail_config.get('mail.username') and mail_config.get('mail.password'):
1057            server.ehlo()
1058            server.login(mail_config.get('mail.username'), mail_config.get('mail.password'))
1059
1060        # Send message
1061        if type(message) is str:
1062            server.sendmail(mail_config.get('mail.noreply'), recipient, message)
1063        else:
1064            server.sendmail(mail_config.get('mail.noreply'), recipient, message.as_string())
1065
1066
1067def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'):
1068    """
1069    Return a flattened dictionary where nested dictionary objects are given new
1070    keys using the partent key combined using the seperator with the child key.
1071
1072    Lists will be converted to json strings via json.dumps()
1073
1074    :param MutableMapping d:  Dictionary like object
1075    :param str parent_key: The original parent key prepending future nested keys
1076    :param str sep: A seperator string used to combine parent and child keys
1077    :return dict:  A new dictionary with the no nested values
1078    """
1079
1080    def _flatten_dict_gen(d, parent_key, sep):
1081        for k, v in d.items():
1082            new_key = parent_key + sep + k if parent_key else k
1083            if isinstance(v, MutableMapping):
1084                yield from flatten_dict(v, new_key, sep=sep).items()
1085            elif isinstance(v, (list, set)):
1086                yield new_key, json.dumps(
1087                    [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v])
1088            else:
1089                yield new_key, v
1090
1091    return dict(_flatten_dict_gen(d, parent_key, sep))
1092
1093
1094def sets_to_lists(d: MutableMapping):
1095    """
1096    Return a dictionary where all nested sets have been converted to lists.
1097
1098    :param MutableMapping d:  Dictionary like object
1099    :return dict:  A new dictionary with the no nested sets
1100    """
1101
1102    def _check_list(lst):
1103        return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, (
1104        set, list)) else item for item in lst]
1105
1106    def _sets_to_lists_gen(d):
1107        for k, v in d.items():
1108            if isinstance(v, MutableMapping):
1109                yield k, sets_to_lists(v)
1110            elif isinstance(v, (list, set)):
1111                yield k, _check_list(v)
1112            else:
1113                yield k, v
1114
1115    return dict(_sets_to_lists_gen(d))
1116
1117
1118def url_to_hash(url, remove_scheme=True, remove_www=True):
1119    """
1120    Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the
1121    rest of the URL.
1122    """
1123    parsed_url = urlparse(url.lower())
1124    if parsed_url:
1125        if remove_scheme:
1126            parsed_url = parsed_url._replace(scheme="")
1127        if remove_www:
1128            netloc = re.sub(r"^www\.", "", parsed_url.netloc)
1129            parsed_url = parsed_url._replace(netloc=netloc)
1130
1131        url = re.sub(r"[^0-9a-z]+", "_", urlunparse(parsed_url).strip("/"))
1132    else:
1133        # Unable to parse URL; use regex
1134        if remove_scheme:
1135            url = re.sub(r"^https?://", "", url)
1136        if remove_www:
1137            if not remove_scheme:
1138                scheme = re.match(r"^https?://", url).group()
1139                temp_url = re.sub(r"^https?://", "", url)
1140                url = scheme + re.sub(r"^www\.", "", temp_url)
1141            else:
1142                url = re.sub(r"^www\.", "", url)
1143
1144        url = re.sub(r"[^0-9a-z]+", "_", url.lower().strip("/"))
1145
1146    return hashlib.blake2b(url.encode("utf-8"), digest_size=24).hexdigest()
1147
1148def url_to_filename(url, staging_area=None, default_name="file", default_ext=".png", max_bytes=255, existing_filenames=None):
1149        """
1150        Determine filenames for saved files
1151
1152        Prefer the original filename (extracted from the URL), but this may not
1153        always be possible or be an actual filename. Also, avoid using the same
1154        filename multiple times. Ensures filenames don't exceed max_bytes.
1155
1156        :param str url:  URLs to determine filenames for
1157        :param Path staging_area:  Path to the staging area where files are saved
1158        (to avoid collisions); if None, no collision avoidance is done.
1159        :param str default_name:  Default name to use if no filename can be
1160        extracted from the URL
1161        :param str default_ext:  Default extension to use if no filename can be
1162        extracted from the URL
1163        :param int max_bytes:  Maximum number of bytes for the filename
1164        :return str:  Suitable file name
1165        """
1166        clean_filename = url.split("/")[-1].split("?")[0].split("#")[0]
1167        if re.match(r"[^.]+\.[a-zA-Z0-9]{1,10}", clean_filename):
1168            base_filename = clean_filename
1169        else:
1170            base_filename = default_name + default_ext
1171
1172        if not existing_filenames:
1173            existing_filenames = []
1174
1175        # Split base filename into name and extension
1176        if '.' in base_filename:
1177            name_part, ext_part = base_filename.rsplit('.', 1)
1178            ext_part = '.' + ext_part
1179        else:
1180            name_part = base_filename
1181            ext_part = ''
1182
1183        # Truncate base filename if it exceeds max_bytes
1184        if len(base_filename.encode('utf-8')) > max_bytes:
1185            # Reserve space for extension
1186            available_bytes = max_bytes - len(ext_part.encode('utf-8'))
1187            if available_bytes <= 0:
1188                # If extension is too long, use minimal name
1189                name_part = default_name
1190                ext_part = default_ext
1191                available_bytes = max_bytes - len(ext_part.encode('utf-8'))
1192            
1193            # Truncate name part to fit
1194            name_bytes = name_part.encode('utf-8')
1195            if len(name_bytes) > available_bytes:
1196                # Truncate byte by byte to ensure valid UTF-8
1197                while len(name_bytes) > available_bytes:
1198                    name_part = name_part[:-1]
1199                    name_bytes = name_part.encode('utf-8')
1200            
1201            base_filename = name_part + ext_part
1202
1203        filename = base_filename
1204
1205        if staging_area:
1206            # Ensure the filename is unique in the staging area
1207            file_path = staging_area.joinpath(filename)
1208            file_index = 1
1209            
1210            while file_path.exists() or filename in existing_filenames:
1211                # Calculate space needed for index suffix
1212                index_suffix = f"-{file_index}"
1213                
1214                # Check if filename with index would exceed max_bytes
1215                test_filename = name_part + index_suffix + ext_part
1216                if len(test_filename.encode('utf-8')) > max_bytes:
1217                    # Need to truncate name_part to make room for index
1218                    available_bytes = max_bytes - len((index_suffix + ext_part).encode('utf-8'))
1219                    if available_bytes <= 0:
1220                        # Extreme case - use minimal name
1221                        truncated_name = "f"
1222                    else:
1223                        # Truncate name_part to fit
1224                        truncated_name = name_part
1225                        name_bytes = truncated_name.encode('utf-8')
1226                        while len(name_bytes) > available_bytes:
1227                            truncated_name = truncated_name[:-1]
1228                            name_bytes = truncated_name.encode('utf-8')
1229                    
1230                    filename = truncated_name + index_suffix + ext_part
1231                else:
1232                    filename = test_filename
1233                
1234                file_index += 1
1235                file_path = staging_area.joinpath(filename)
1236
1237        return filename
1238
1239
1240def split_urls(url_string, allowed_schemes=None):
1241    """
1242    Split URL text by \n and commas.
1243
1244    4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs
1245    and also check for commas within URLs using schemes.
1246
1247    Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com);
1248    this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail).
1249    """
1250    if allowed_schemes is None:
1251        allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://')
1252    potential_urls = []
1253    # Split the text by \n
1254    for line in url_string.split('\n'):
1255        # Handle commas that may exist within URLs
1256        parts = line.split(',')
1257        recombined_url = ""
1258        for part in parts:
1259            if part.startswith(allowed_schemes):  # Other schemes exist
1260                # New URL start detected
1261                if recombined_url:
1262                    # Already have a URL, add to list
1263                    potential_urls.append(recombined_url)
1264                # Start new URL
1265                recombined_url = part
1266            elif part:
1267                if recombined_url:
1268                    # Add to existing URL
1269                    recombined_url += "," + part
1270                else:
1271                    # No existing URL, start new
1272                    recombined_url = part
1273            else:
1274                # Ignore empty strings
1275                pass
1276        if recombined_url:
1277            # Add any remaining URL
1278            potential_urls.append(recombined_url)
1279    return potential_urls
1280
1281
1282def folder_size(path='.'):
1283    """
1284    Get the size of a folder using os.scandir for efficiency
1285    """
1286    total = 0
1287    for entry in os.scandir(path):
1288        if entry.is_file():
1289            total += entry.stat().st_size
1290        elif entry.is_dir():
1291            total += folder_size(entry.path)
1292    return total
1293
1294def hash_to_md5(string: str) -> str:
1295    """
1296    Hash a string with an md5 hash.
1297    """
1298    return hashlib.md5(string.encode("utf-8")).hexdigest()
class UserInput:
 16class UserInput:
 17    """
 18    Class for handling user input
 19
 20    It is important to sanitise user input, as carelessly entered parameters
 21    may in e.g. requesting far more data than needed, or lead to undefined
 22    behaviour. This class offers a set of pre-defined value types that can be
 23    consistently rendered as form elements in an interface and parsed.
 24    """
 25    OPTION_TOGGLE = "toggle"  # boolean toggle (checkbox)
 26    OPTION_CHOICE = "choice"  # one choice out of a list (select)
 27    OPTION_TEXT = "string"  # simple string or integer (input text)
 28    OPTION_MULTI = "multi"  # multiple values out of a list (select multiple)
 29    OPTION_MULTI_SELECT = "multi_select"  # multiple values out of a dropdown list (select multiple)
 30    OPTION_INFO = "info"  # just a bit of text, not actual input
 31    OPTION_TEXT_LARGE = "textarea"  # longer text
 32    OPTION_TEXT_JSON = "json"  # text, but should be valid JSON
 33    OPTION_DATE = "date"  # a single date
 34    OPTION_DATERANGE = "daterange"  # a beginning and end date
 35    OPTION_DIVIDER = "divider"  # meta-option, divides related sets of options
 36    OPTION_FILE = "file"  # file upload
 37    OPTION_HUE = "hue"  # colour hue
 38    OPTION_DATASOURCES = "datasources"  # data source toggling
 39    OPTION_DATASOURCES_TABLE = "datasources_table"  # a table with settings per data source
 40    OPTION_ANNOTATION = "annotation"  # checkbox for whether to an annotation
 41    OPTION_ANNOTATIONS = "annotations"  # table for whether to write multiple annotations
 42
 43    OPTIONS_COSMETIC = (OPTION_INFO, OPTION_DIVIDER)
 44
 45    @staticmethod
 46    def parse_all(options, input, silently_correct=True):
 47        """
 48        Parse form input for the provided options
 49
 50        Ignores all input not belonging to any of the defined options: parses
 51        and sanitises the rest, and returns a dictionary with the sanitised
 52        options. If an option is *not* present in the input, the default value
 53        is used, and if that is absent, `None`.
 54
 55        In other words, this ensures a dictionary with 1) only white-listed
 56        keys, 2) a value of an expected type for each key.
 57
 58        :param dict options:  Options, as a name -> settings dictionary
 59        :param dict input:  Input, as a form field -> value dictionary
 60        :param bool silently_correct:  If true, replace invalid values with the
 61        given default value; else, raise a QueryParametersException if a value
 62        is invalid.
 63
 64        :return dict:  Sanitised form input
 65        """
 66
 67        from common.lib.helpers import convert_to_int
 68        parsed_input = {}
 69
 70        if type(input) is not dict and type(input) is not ImmutableMultiDict:
 71            raise TypeError("input must be a dictionary or ImmutableMultiDict")
 72
 73        if type(input) is ImmutableMultiDict:
 74            # we are not using to_dict, because that messes up multi-selects
 75            input = {key: input.getlist(key) for key in input}
 76            for key, value in input.items():
 77                if type(value) is list and len(value) == 1:
 78                    input[key] = value[0]
 79
 80        # all parameters are submitted as option-[parameter ID], this is an 
 81        # artifact of how the web interface works and we can simply remove the
 82        # prefix
 83        input = {re.sub(r"^option-", "", field): input[field] for field in input}
 84
 85        # re-order input so that the fields relying on the value of other
 86        # fields are parsed last
 87        options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)}
 88
 89        for option, settings in options.items():
 90            if settings.get("indirect"):
 91                # these are settings that are derived from and set by other
 92                # settings
 93                continue
 94
 95            if settings.get("type") in UserInput.OPTIONS_COSMETIC:
 96                # these are structural form elements and never have a value
 97                continue
 98
 99            elif settings.get("type") == UserInput.OPTION_DATERANGE:
100                # special case, since it combines two inputs
101                option_min = option + "-min"
102                option_max = option + "-max"
103
104                # normally this is taken care of client-side, but in case this
105                # didn't work, try to salvage it server-side
106                if option_min not in input or input.get(option_min) == "-1":
107                    option_min += "_proxy"
108
109                if option_max not in input or input.get(option_max) == "-1":
110                    option_max += "_proxy"
111
112                # save as a tuple of unix timestamps (or None)
113                try:
114                    after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct))
115
116                    if before and after and after > before:
117                        if not silently_correct:
118                            raise QueryParametersException("End of date range must be after beginning of date range.")
119                        else:
120                            before = after
121
122                    parsed_input[option] = (after, before)
123                except RequirementsNotMetException:
124                    pass
125
126            elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
127                # special case too, since if a checkbox is unchecked, it simply
128                # does not show up in the input
129                try:
130                    if option in input:
131                        # Toggle needs to be parsed
132                        parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
133                    else:
134                        # Toggle was left blank
135                        parsed_input[option] = False
136                except RequirementsNotMetException:
137                    pass
138
139            elif settings.get("type") == UserInput.OPTION_DATASOURCES:
140                # special case, because this combines multiple inputs to
141                # configure data source availability and expiration
142                datasources = {datasource: {
143                    "enabled": f"{option}-enable-{datasource}" in input,
144                    "allow_optout": f"{option}-optout-{datasource}" in input,
145                    "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0)
146                } for datasource in input[option].split(",")}
147
148                parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]]
149                parsed_input[option.split(".")[0] + ".expiration"] = datasources
150
151            elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE:
152                # special case, parse table values to generate a dict
153                columns = list(settings["columns"].keys())
154                table_input = {}
155
156                for datasource in list(settings["default"].keys()):
157                    table_input[datasource] = {}
158                    for column in columns:
159
160                        choice = input.get(option + "-" + datasource + "-" + column, False)
161                        column_settings = settings["columns"][column]  # sub-settings per column
162                        table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True)
163
164                parsed_input[option] = table_input
165
166            elif option not in input:
167                # not provided? use default
168                parsed_input[option] = settings.get("default", None)
169
170            else:
171                # normal parsing and sanitisation
172                try:
173                    parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
174                except RequirementsNotMetException:
175                    pass
176
177        return parsed_input
178
179    @staticmethod
180    def parse_value(settings, choice, other_input=None, silently_correct=True):
181        """
182        Filter user input
183
184        Makes sure user input for post-processors is valid and within the
185        parameters specified by the post-processor
186
187        :param obj settings:  Settings, including defaults and valid options
188        :param choice:  The chosen option, to be parsed
189        :param dict other_input:  Other input, as parsed so far
190        :param bool silently_correct:  If true, replace invalid values with the
191        given default value; else, raise a QueryParametersException if a value
192        is invalid.
193
194        :return:  Validated and parsed input
195        """
196        # short-circuit if there is a requirement for the field to be parsed
197        # and the requirement isn't met
198        if settings.get("requires"):
199            try:
200                field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0]
201            except IndexError:
202                # invalid condition, interpret as 'does the field with this name have a value'
203                field, operator, value = (choice, "!=", "")
204
205            if field not in other_input:
206                raise RequirementsNotMetException()
207
208            other_value = other_input.get(field)
209            if type(other_value) is bool:
210                # evalues to a boolean, i.e. checkboxes etc
211                if operator == "!=":
212                    if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")):
213                        raise RequirementsNotMetException()
214                else:
215                    if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")):
216                        raise RequirementsNotMetException()
217
218            else:
219                if type(other_value) in (tuple, list):
220                # iterables are a bit special
221                    if len(other_value) == 1:
222                        # treat one-item lists as "normal" values
223                        other_value = other_value[0]
224                    elif operator == "~=":  # interpret as 'is in list?'
225                        if value not in other_value:
226                            raise RequirementsNotMetException()
227                    else:
228                        # condition doesn't make sense for a list, so assume it's not True
229                        raise RequirementsNotMetException()
230
231                if operator == "^=" and not str(other_value).startswith(value):
232                    raise RequirementsNotMetException()
233                elif operator == "$=" and not str(other_value).endswith(value):
234                    raise RequirementsNotMetException()
235                elif operator == "~=" and value not in str(other_value):
236                    raise RequirementsNotMetException()
237                elif operator == "!=" and value == other_value:
238                    raise RequirementsNotMetException()
239                elif operator in ("==", "=") and value != other_value:
240                    raise RequirementsNotMetException()
241
242        input_type = settings.get("type", "")
243        if input_type in UserInput.OPTIONS_COSMETIC:
244            # these are structural form elements and can never return a value
245            return None
246
247        elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
248            # simple boolean toggle
249            if type(choice) is bool:
250                return choice
251            elif choice in ['false', 'False']:
252                # Sanitized options passed back to Flask can be converted to strings as 'false'
253                return False
254            elif choice in ['true', 'True', 'on']:
255                # Toggle will have value 'on', but may also becomes a string 'true'
256                return True
257            else:
258                raise QueryParametersException("Toggle invalid input")
259
260        elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE):
261            # parse either integers (unix timestamps) or try to guess the date
262            # format (the latter may be used for input if JavaScript is turned
263            # off in the front-end and the input comes from there)
264            value = None
265            try:
266                value = int(choice)
267            except ValueError:
268                parsed_choice = parse_datetime(choice)
269                value = int(parsed_choice.timestamp())
270            finally:
271                return value
272
273        elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS):
274            # any number of values out of a list of possible values
275            # comma-separated during input, returned as a list of valid options
276            if not choice:
277                return settings.get("default", [])
278
279            chosen = choice.split(",")
280            return [item for item in chosen if item in settings.get("options", [])]
281
282        elif input_type == UserInput.OPTION_MULTI_SELECT:
283            # multiple number of values out of a dropdown list of possible values
284            # comma-separated during input, returned as a list of valid options
285            if not choice:
286                return settings.get("default", [])
287
288            if type(choice) is str:
289                # should be a list if the form control was actually a multiselect
290                # but we have some client side UI helpers that may produce a string
291                # instead
292                choice = choice.split(",")
293
294            return [item for item in choice if item in settings.get("options", [])]
295
296        elif input_type == UserInput.OPTION_CHOICE:
297            # select box
298            # one out of multiple options
299            # return option if valid, or default
300            if choice not in settings.get("options"):
301                if not silently_correct:
302                    raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}")
303                else:
304                    return settings.get("default", "")
305            else:
306                return choice
307
308        elif input_type == UserInput.OPTION_TEXT_JSON:
309            # verify that this is actually json
310            try:
311                json.dumps(json.loads(choice))
312            except json.JSONDecodeError:
313                raise QueryParametersException("Invalid JSON value '%s'" % choice)
314
315            return json.loads(choice)
316
317        elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE):
318            # text string
319            # optionally clamp it as an integer; return default if not a valid
320            # integer (or float; inferred from default or made explicit via the
321            # coerce_type setting)
322            if settings.get("coerce_type"):
323                value_type = settings["coerce_type"]
324            else:
325                value_type = type(settings.get("default"))
326                if value_type not in (int, float):
327                    value_type = int
328
329            if "max" in settings:
330                try:
331                    choice = min(settings["max"], value_type(choice))
332                except (ValueError, TypeError):
333                    if not silently_correct:
334                        raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"]))
335
336                    choice = settings.get("default")
337
338            if "min" in settings:
339                try:
340                    choice = max(settings["min"], value_type(choice))
341                except (ValueError, TypeError):
342                    if not silently_correct:
343                        raise QueryParametersException("Provide a value of %s or more." % str(settings["min"]))
344
345                    choice = settings.get("default")
346
347            if choice is None or choice == "":
348                choice = settings.get("default")
349
350            if choice is None:
351                choice = 0 if "min" in settings or "max" in settings else ""
352
353            if settings.get("coerce_type"):
354                try:
355                    return value_type(choice)
356                except (ValueError, TypeError):
357                    return settings.get("default")
358            else:
359                return choice
360
361        else:
362            # no filtering
363            return choice

Class for handling user input

It is important to sanitise user input, as carelessly entered parameters may in e.g. requesting far more data than needed, or lead to undefined behaviour. This class offers a set of pre-defined value types that can be consistently rendered as form elements in an interface and parsed.

OPTION_TOGGLE = 'toggle'
OPTION_CHOICE = 'choice'
OPTION_TEXT = 'string'
OPTION_MULTI = 'multi'
OPTION_MULTI_SELECT = 'multi_select'
OPTION_INFO = 'info'
OPTION_TEXT_LARGE = 'textarea'
OPTION_TEXT_JSON = 'json'
OPTION_DATE = 'date'
OPTION_DATERANGE = 'daterange'
OPTION_DIVIDER = 'divider'
OPTION_FILE = 'file'
OPTION_HUE = 'hue'
OPTION_DATASOURCES = 'datasources'
OPTION_DATASOURCES_TABLE = 'datasources_table'
OPTION_ANNOTATION = 'annotation'
OPTION_ANNOTATIONS = 'annotations'
OPTIONS_COSMETIC = ('info', 'divider')
@staticmethod
def parse_all(options, input, silently_correct=True):
 45    @staticmethod
 46    def parse_all(options, input, silently_correct=True):
 47        """
 48        Parse form input for the provided options
 49
 50        Ignores all input not belonging to any of the defined options: parses
 51        and sanitises the rest, and returns a dictionary with the sanitised
 52        options. If an option is *not* present in the input, the default value
 53        is used, and if that is absent, `None`.
 54
 55        In other words, this ensures a dictionary with 1) only white-listed
 56        keys, 2) a value of an expected type for each key.
 57
 58        :param dict options:  Options, as a name -> settings dictionary
 59        :param dict input:  Input, as a form field -> value dictionary
 60        :param bool silently_correct:  If true, replace invalid values with the
 61        given default value; else, raise a QueryParametersException if a value
 62        is invalid.
 63
 64        :return dict:  Sanitised form input
 65        """
 66
 67        from common.lib.helpers import convert_to_int
 68        parsed_input = {}
 69
 70        if type(input) is not dict and type(input) is not ImmutableMultiDict:
 71            raise TypeError("input must be a dictionary or ImmutableMultiDict")
 72
 73        if type(input) is ImmutableMultiDict:
 74            # we are not using to_dict, because that messes up multi-selects
 75            input = {key: input.getlist(key) for key in input}
 76            for key, value in input.items():
 77                if type(value) is list and len(value) == 1:
 78                    input[key] = value[0]
 79
 80        # all parameters are submitted as option-[parameter ID], this is an 
 81        # artifact of how the web interface works and we can simply remove the
 82        # prefix
 83        input = {re.sub(r"^option-", "", field): input[field] for field in input}
 84
 85        # re-order input so that the fields relying on the value of other
 86        # fields are parsed last
 87        options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)}
 88
 89        for option, settings in options.items():
 90            if settings.get("indirect"):
 91                # these are settings that are derived from and set by other
 92                # settings
 93                continue
 94
 95            if settings.get("type") in UserInput.OPTIONS_COSMETIC:
 96                # these are structural form elements and never have a value
 97                continue
 98
 99            elif settings.get("type") == UserInput.OPTION_DATERANGE:
100                # special case, since it combines two inputs
101                option_min = option + "-min"
102                option_max = option + "-max"
103
104                # normally this is taken care of client-side, but in case this
105                # didn't work, try to salvage it server-side
106                if option_min not in input or input.get(option_min) == "-1":
107                    option_min += "_proxy"
108
109                if option_max not in input or input.get(option_max) == "-1":
110                    option_max += "_proxy"
111
112                # save as a tuple of unix timestamps (or None)
113                try:
114                    after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct))
115
116                    if before and after and after > before:
117                        if not silently_correct:
118                            raise QueryParametersException("End of date range must be after beginning of date range.")
119                        else:
120                            before = after
121
122                    parsed_input[option] = (after, before)
123                except RequirementsNotMetException:
124                    pass
125
126            elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
127                # special case too, since if a checkbox is unchecked, it simply
128                # does not show up in the input
129                try:
130                    if option in input:
131                        # Toggle needs to be parsed
132                        parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
133                    else:
134                        # Toggle was left blank
135                        parsed_input[option] = False
136                except RequirementsNotMetException:
137                    pass
138
139            elif settings.get("type") == UserInput.OPTION_DATASOURCES:
140                # special case, because this combines multiple inputs to
141                # configure data source availability and expiration
142                datasources = {datasource: {
143                    "enabled": f"{option}-enable-{datasource}" in input,
144                    "allow_optout": f"{option}-optout-{datasource}" in input,
145                    "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0)
146                } for datasource in input[option].split(",")}
147
148                parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]]
149                parsed_input[option.split(".")[0] + ".expiration"] = datasources
150
151            elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE:
152                # special case, parse table values to generate a dict
153                columns = list(settings["columns"].keys())
154                table_input = {}
155
156                for datasource in list(settings["default"].keys()):
157                    table_input[datasource] = {}
158                    for column in columns:
159
160                        choice = input.get(option + "-" + datasource + "-" + column, False)
161                        column_settings = settings["columns"][column]  # sub-settings per column
162                        table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True)
163
164                parsed_input[option] = table_input
165
166            elif option not in input:
167                # not provided? use default
168                parsed_input[option] = settings.get("default", None)
169
170            else:
171                # normal parsing and sanitisation
172                try:
173                    parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct)
174                except RequirementsNotMetException:
175                    pass
176
177        return parsed_input

Parse form input for the provided options

Ignores all input not belonging to any of the defined options: parses and sanitises the rest, and returns a dictionary with the sanitised options. If an option is not present in the input, the default value is used, and if that is absent, None.

In other words, this ensures a dictionary with 1) only white-listed keys, 2) a value of an expected type for each key.

Parameters
  • dict options: Options, as a name -> settings dictionary
  • dict input: Input, as a form field -> value dictionary
  • bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns

Sanitised form input

@staticmethod
def parse_value(settings, choice, other_input=None, silently_correct=True):
179    @staticmethod
180    def parse_value(settings, choice, other_input=None, silently_correct=True):
181        """
182        Filter user input
183
184        Makes sure user input for post-processors is valid and within the
185        parameters specified by the post-processor
186
187        :param obj settings:  Settings, including defaults and valid options
188        :param choice:  The chosen option, to be parsed
189        :param dict other_input:  Other input, as parsed so far
190        :param bool silently_correct:  If true, replace invalid values with the
191        given default value; else, raise a QueryParametersException if a value
192        is invalid.
193
194        :return:  Validated and parsed input
195        """
196        # short-circuit if there is a requirement for the field to be parsed
197        # and the requirement isn't met
198        if settings.get("requires"):
199            try:
200                field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0]
201            except IndexError:
202                # invalid condition, interpret as 'does the field with this name have a value'
203                field, operator, value = (choice, "!=", "")
204
205            if field not in other_input:
206                raise RequirementsNotMetException()
207
208            other_value = other_input.get(field)
209            if type(other_value) is bool:
210                # evalues to a boolean, i.e. checkboxes etc
211                if operator == "!=":
212                    if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")):
213                        raise RequirementsNotMetException()
214                else:
215                    if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")):
216                        raise RequirementsNotMetException()
217
218            else:
219                if type(other_value) in (tuple, list):
220                # iterables are a bit special
221                    if len(other_value) == 1:
222                        # treat one-item lists as "normal" values
223                        other_value = other_value[0]
224                    elif operator == "~=":  # interpret as 'is in list?'
225                        if value not in other_value:
226                            raise RequirementsNotMetException()
227                    else:
228                        # condition doesn't make sense for a list, so assume it's not True
229                        raise RequirementsNotMetException()
230
231                if operator == "^=" and not str(other_value).startswith(value):
232                    raise RequirementsNotMetException()
233                elif operator == "$=" and not str(other_value).endswith(value):
234                    raise RequirementsNotMetException()
235                elif operator == "~=" and value not in str(other_value):
236                    raise RequirementsNotMetException()
237                elif operator == "!=" and value == other_value:
238                    raise RequirementsNotMetException()
239                elif operator in ("==", "=") and value != other_value:
240                    raise RequirementsNotMetException()
241
242        input_type = settings.get("type", "")
243        if input_type in UserInput.OPTIONS_COSMETIC:
244            # these are structural form elements and can never return a value
245            return None
246
247        elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION):
248            # simple boolean toggle
249            if type(choice) is bool:
250                return choice
251            elif choice in ['false', 'False']:
252                # Sanitized options passed back to Flask can be converted to strings as 'false'
253                return False
254            elif choice in ['true', 'True', 'on']:
255                # Toggle will have value 'on', but may also becomes a string 'true'
256                return True
257            else:
258                raise QueryParametersException("Toggle invalid input")
259
260        elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE):
261            # parse either integers (unix timestamps) or try to guess the date
262            # format (the latter may be used for input if JavaScript is turned
263            # off in the front-end and the input comes from there)
264            value = None
265            try:
266                value = int(choice)
267            except ValueError:
268                parsed_choice = parse_datetime(choice)
269                value = int(parsed_choice.timestamp())
270            finally:
271                return value
272
273        elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS):
274            # any number of values out of a list of possible values
275            # comma-separated during input, returned as a list of valid options
276            if not choice:
277                return settings.get("default", [])
278
279            chosen = choice.split(",")
280            return [item for item in chosen if item in settings.get("options", [])]
281
282        elif input_type == UserInput.OPTION_MULTI_SELECT:
283            # multiple number of values out of a dropdown list of possible values
284            # comma-separated during input, returned as a list of valid options
285            if not choice:
286                return settings.get("default", [])
287
288            if type(choice) is str:
289                # should be a list if the form control was actually a multiselect
290                # but we have some client side UI helpers that may produce a string
291                # instead
292                choice = choice.split(",")
293
294            return [item for item in choice if item in settings.get("options", [])]
295
296        elif input_type == UserInput.OPTION_CHOICE:
297            # select box
298            # one out of multiple options
299            # return option if valid, or default
300            if choice not in settings.get("options"):
301                if not silently_correct:
302                    raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}")
303                else:
304                    return settings.get("default", "")
305            else:
306                return choice
307
308        elif input_type == UserInput.OPTION_TEXT_JSON:
309            # verify that this is actually json
310            try:
311                json.dumps(json.loads(choice))
312            except json.JSONDecodeError:
313                raise QueryParametersException("Invalid JSON value '%s'" % choice)
314
315            return json.loads(choice)
316
317        elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE):
318            # text string
319            # optionally clamp it as an integer; return default if not a valid
320            # integer (or float; inferred from default or made explicit via the
321            # coerce_type setting)
322            if settings.get("coerce_type"):
323                value_type = settings["coerce_type"]
324            else:
325                value_type = type(settings.get("default"))
326                if value_type not in (int, float):
327                    value_type = int
328
329            if "max" in settings:
330                try:
331                    choice = min(settings["max"], value_type(choice))
332                except (ValueError, TypeError):
333                    if not silently_correct:
334                        raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"]))
335
336                    choice = settings.get("default")
337
338            if "min" in settings:
339                try:
340                    choice = max(settings["min"], value_type(choice))
341                except (ValueError, TypeError):
342                    if not silently_correct:
343                        raise QueryParametersException("Provide a value of %s or more." % str(settings["min"]))
344
345                    choice = settings.get("default")
346
347            if choice is None or choice == "":
348                choice = settings.get("default")
349
350            if choice is None:
351                choice = 0 if "min" in settings or "max" in settings else ""
352
353            if settings.get("coerce_type"):
354                try:
355                    return value_type(choice)
356                except (ValueError, TypeError):
357                    return settings.get("default")
358            else:
359                return choice
360
361        else:
362            # no filtering
363            return choice

Filter user input

Makes sure user input for post-processors is valid and within the parameters specified by the post-processor

Parameters
  • obj settings: Settings, including defaults and valid options
  • choice: The chosen option, to be parsed
  • dict other_input: Other input, as parsed so far
  • bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns

Validated and parsed input