common.lib.helpers
Miscellaneous helper functions for the 4CAT backend
1""" 2Miscellaneous helper functions for the 4CAT backend 3""" 4import subprocess 5import imagehash 6import hashlib 7import requests 8import datetime 9import smtplib 10import fnmatch 11import socket 12import oslex 13import copy 14import time 15import json 16import math 17import ural 18import csv 19import ssl 20import re 21import os 22import io 23 24from pathlib import Path 25from collections.abc import MutableMapping 26from html.parser import HTMLParser 27from urllib.parse import urlparse, urlunparse 28from calendar import monthrange 29from packaging import version 30from PIL import Image 31 32from common.config_manager import CoreConfigManager 33from common.lib.user_input import UserInput 34__all__ = ("UserInput",) 35 36core_config = CoreConfigManager() 37 38def init_datasource(database, logger, queue, name, config): 39 """ 40 Initialize data source 41 42 Queues jobs to scrape the boards that were configured to be scraped in the 43 4CAT configuration file. If none were configured, nothing happens. 44 45 :param Database database: Database connection instance 46 :param Logger logger: Log handler 47 :param JobQueue queue: Job Queue instance 48 :param string name: ID of datasource that is being initialised 49 :param config: Configuration reader 50 """ 51 pass 52 53def get_datasource_example_keys(db, modules, dataset_type): 54 """ 55 Get example keys for a datasource 56 """ 57 from common.lib.dataset import DataSet 58 example_dataset_key = db.fetchone("SELECT key from datasets WHERE type = %s and is_finished = True and num_rows > 0 ORDER BY timestamp_finished DESC LIMIT 1", (dataset_type,)) 59 if example_dataset_key: 60 example_dataset = DataSet(db=db, key=example_dataset_key["key"], modules=modules) 61 return example_dataset.get_columns() 62 return [] 63 64def strip_tags(html, convert_newlines=True): 65 """ 66 Strip HTML from a string 67 68 :param html: HTML to strip 69 :param convert_newlines: Convert <br> and </p> tags to \n before stripping 70 :return: Stripped HTML 71 """ 72 if not html: 73 return "" 74 75 deduplicate_newlines = re.compile(r"\n+") 76 77 if convert_newlines: 78 html = html.replace("<br>", "\n").replace("</p>", "</p>\n") 79 html = deduplicate_newlines.sub("\n", html) 80 81 class HTMLStripper(HTMLParser): 82 def __init__(self): 83 super().__init__() 84 self.reset() 85 self.strict = False 86 self.convert_charrefs = True 87 self.fed = [] 88 89 def handle_data(self, data): 90 self.fed.append(data) 91 92 def get_data(self): 93 return "".join(self.fed) 94 95 stripper = HTMLStripper() 96 stripper.feed(html) 97 return stripper.get_data() 98 99 100def sniff_encoding(file): 101 """ 102 Determine encoding from raw file bytes 103 104 Currently only distinguishes UTF-8 and UTF-8 with BOM 105 106 :param file: 107 :return: 108 """ 109 if type(file) is bytearray: 110 maybe_bom = file[:3] 111 elif hasattr(file, "getbuffer"): 112 buffer = file.getbuffer() 113 maybe_bom = buffer[:3].tobytes() 114 elif hasattr(file, "peek"): 115 buffer = file.peek(32) 116 maybe_bom = buffer[:3] 117 else: 118 maybe_bom = False 119 120 return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8" 121 122def sniff_csv_dialect(csv_input): 123 """ 124 Determine CSV dialect for an input stream 125 126 :param csv_input: Input stream 127 :return tuple: Tuple: Dialect object and a boolean representing whether 128 the CSV file seems to have a header 129 """ 130 encoding = sniff_encoding(csv_input) 131 if type(csv_input) is io.TextIOWrapper: 132 wrapped_input = csv_input 133 else: 134 wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding) 135 wrapped_input.seek(0) 136 sample = wrapped_input.read(1024 * 1024) 137 wrapped_input.seek(0) 138 has_header = csv.Sniffer().has_header(sample) 139 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 140 141 return dialect, has_header 142 143 144def get_git_branch(): 145 """ 146 Get current git branch 147 148 If the 4CAT root folder is a git repository, this function will return the 149 name of the currently checked-out branch. If the folder is not a git 150 repository or git is not installed an empty string is returned. 151 """ 152 try: 153 root_dir = str(core_config.get('PATH_ROOT').resolve()) 154 branch = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE) 155 if branch.returncode != 0: 156 raise ValueError() 157 branch_name = branch.stdout.decode("utf-8").strip() 158 if not branch_name: 159 # Check for detached HEAD state 160 # Most likely occuring because of checking out release tags (which are not branches) or commits 161 head_status = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} status"), stdout=subprocess.PIPE) 162 if head_status.returncode == 0: 163 for line in head_status.stdout.decode("utf-8").split("\n"): 164 if any([detached_message in line for detached_message in ("HEAD detached from", "HEAD detached at")]): 165 branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1] 166 return branch_name.strip() 167 except (subprocess.SubprocessError, ValueError, FileNotFoundError): 168 return "" 169 170 171def get_software_commit(worker=None): 172 """ 173 Get current 4CAT git commit hash 174 175 Use `get_software_version()` instead if you need the release version 176 number rather than the precise commit hash. 177 178 If no version file is available, run `git show` to test if there is a git 179 repository in the 4CAT root folder, and if so, what commit is currently 180 checked out in it. 181 182 For extensions, get the repository information for that extension, or if 183 the extension is not a git repository, return empty data. 184 185 :param BasicWorker processor: Worker to get commit for. If not given, get 186 version information for the main 4CAT installation. 187 188 :return tuple: 4CAT git commit hash, repository name 189 """ 190 # try git command line within the 4CAT root folder 191 # if it is a checked-out git repository, it will tell us the hash of 192 # the currently checked-out commit 193 194 # path has no Path.relative()... 195 try: 196 # if extension, go to the extension file's path 197 # we will run git here - if it is not its own repository, we have no 198 # useful version info (since the extension is by definition not in the 199 # main 4CAT repository) and will return an empty value 200 if worker and worker.is_extension: 201 relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent 202 working_dir = str(core_config.get("PATH_ROOT").joinpath(relative_filepath).resolve()) 203 # check if we are in the extensions' own repo or 4CAT's 204 git_cmd = f"git -C {oslex.quote(working_dir)} rev-parse --show-toplevel" 205 repo_level = subprocess.run(oslex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 206 if Path(repo_level.stdout.decode("utf-8")) == core_config.get("PATH_ROOT"): 207 # not its own repository 208 return ("", "") 209 210 else: 211 working_dir = str(core_config.get("PATH_ROOT").resolve()) 212 213 show = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 214 if show.returncode != 0: 215 raise ValueError() 216 commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1] 217 218 # now get the repository the commit belongs to, if we can 219 origin = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 220 if origin.returncode != 0 or not origin.stdout: 221 raise ValueError() 222 repository = origin.stdout.decode("utf-8").strip() 223 if repository.endswith(".git"): 224 repository = repository[:-4] 225 226 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError): 227 return ("", "") 228 229 return (commit, repository) 230 231def get_software_version(): 232 """ 233 Get current 4CAT version 234 235 This is the actual software version, i.e. not the commit hash (see 236 `get_software_hash()` for that). The current version is stored in a file 237 with a canonical location: if the file doesn't exist, an empty string is 238 returned. 239 240 :return str: Software version, for example `1.37`. 241 """ 242 current_version_file = core_config.get("PATH_ROOT").joinpath("config/.current-version") 243 if not current_version_file.exists(): 244 return "" 245 246 with current_version_file.open() as infile: 247 return infile.readline().strip() 248 249def get_github_version(repo_url, timeout=5): 250 """ 251 Get latest release tag version from GitHub 252 253 Will raise a ValueError if it cannot retrieve information from GitHub. 254 255 :param str repo_url: GitHub repository URL 256 :param int timeout: Timeout in seconds for HTTP request 257 258 :return tuple: Version, e.g. `1.26`, and release URL. 259 """ 260 if not repo_url.endswith("/"): 261 repo_url += "/" 262 263 repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url)) 264 265 api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id 266 response = requests.get(api_url, timeout=timeout) 267 response = response.json() 268 if response.get("message") == "Not Found": 269 raise ValueError("Invalid GitHub URL or repository name") 270 271 latest_tag = response.get("tag_name", "unknown") 272 if latest_tag.startswith("v"): 273 latest_tag = re.sub(r"^v", "", latest_tag) 274 275 return (latest_tag, response.get("html_url")) 276 277def get_ffmpeg_version(ffmpeg_path): 278 """ 279 Determine ffmpeg version 280 281 This can be necessary when using commands that change name between versions. 282 283 :param ffmpeg_path: ffmpeg executable path 284 :return packaging.version: Comparable ersion 285 """ 286 command = [ffmpeg_path, "-version"] 287 ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, 288 stderr=subprocess.PIPE) 289 290 ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1] 291 ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0] 292 293 return version.parse(ffmpeg_version) 294 295 296def find_extensions(): 297 """ 298 Find 4CAT extensions and load their metadata 299 300 Looks for subfolders of the extension folder, and loads additional metadata 301 where available. 302 303 :return tuple: A tuple with two items; the extensions, as an ID -> metadata 304 dictionary, and a list of (str) errors encountered while loading 305 """ 306 extension_path = core_config.get("PATH_ROOT").joinpath("extensions") 307 errors = [] 308 if not extension_path.exists() or not extension_path.is_dir(): 309 return [], None 310 311 # each folder in the extensions folder is an extension 312 extensions = { 313 extension.name: { 314 "name": extension.name, 315 "version": "", 316 "url": "", 317 "git_url": "", 318 "is_git": False 319 } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir() 320 } 321 322 # collect metadata for extensions 323 allowed_metadata_keys = ("name", "version", "url") 324 for extension in extensions: 325 extension_folder = extension_path.joinpath(extension) 326 metadata_file = extension_folder.joinpath("metadata.json") 327 if metadata_file.exists(): 328 with metadata_file.open() as infile: 329 try: 330 metadata = json.load(infile) 331 extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys}) 332 except (TypeError, ValueError) as e: 333 errors.append(f"Error reading metadata file for extension '{extension}' ({e})") 334 continue 335 336 extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists() 337 if extensions[extension]["is_git"]: 338 # try to get remote URL 339 try: 340 extension_root = str(extension_folder.resolve()) 341 origin = subprocess.run(oslex.split(f"git -C {oslex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE, 342 stdout=subprocess.PIPE) 343 if origin.returncode != 0 or not origin.stdout: 344 raise ValueError() 345 repository = origin.stdout.decode("utf-8").strip() 346 if repository.endswith(".git") and "github.com" in repository: 347 # use repo URL 348 repository = repository[:-4] 349 extensions[extension]["git_url"] = repository 350 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e: 351 print(e) 352 pass 353 354 return extensions, errors 355 356 357def convert_to_int(value, default=0): 358 """ 359 Convert a value to an integer, with a fallback 360 361 The fallback is used if an Error is thrown during converstion to int. 362 This is a convenience function, but beats putting try-catches everywhere 363 we're using user input as an integer. 364 365 :param value: Value to convert 366 :param int default: Default value, if conversion not possible 367 :return int: Converted value 368 """ 369 try: 370 return int(value) 371 except (ValueError, TypeError): 372 return default 373 374def convert_to_float(value, default=0, force=False) -> float: 375 """ 376 Convert a value to a floating point, with a fallback 377 378 The fallback is used if an Error is thrown during converstion to float. 379 This is a convenience function, but beats putting try-catches everywhere 380 we're using user input as a floating point number. 381 382 :param value: Value to convert 383 :param int default: Default value, if conversion not possible 384 :param force: Whether to force the value into a float if it is not empty or None. 385 :return float: Converted value 386 """ 387 if force: 388 return float(value) if value else default 389 try: 390 return float(value) 391 except (ValueError, TypeError): 392 return default 393 394 395def timify(number, short=False): 396 """ 397 Make a number look like an indication of time 398 399 :param number: Number to convert. If the number is larger than the current 400 UNIX timestamp, decrease by that amount 401 :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes` 402 """ 403 number = int(number) 404 405 components = [] 406 if number > time.time(): 407 number = time.time() - number 408 409 month_length = 30.42 * 86400 410 months = math.floor(number / month_length) 411 if months: 412 components.append(f"{months}{'mt' if short else ' month'}{'s' if months != 1 and not short else ''}") 413 number -= (months * month_length) 414 415 week_length = 7 * 86400 416 weeks = math.floor(number / week_length) 417 if weeks: 418 components.append(f"{weeks}{'w' if short else ' week'}{'s' if weeks != 1 and not short else ''}") 419 number -= (weeks * week_length) 420 421 day_length = 86400 422 days = math.floor(number / day_length) 423 if days: 424 components.append(f"{days}{'d' if short else ' day'}{'s' if days != 1 and not short else ''}") 425 number -= (days * day_length) 426 427 hour_length = 3600 428 hours = math.floor(number / hour_length) 429 if hours: 430 components.append(f"{hours}{'h' if short else ' hour'}{'s' if hours != 1 and not short else ''}") 431 number -= (hours * hour_length) 432 433 minute_length = 60 434 minutes = math.floor(number / minute_length) 435 if minutes: 436 components.append(f"{minutes}{'m' if short else ' minute'}{'s' if minutes != 1 and not short else ''}") 437 438 if not components: 439 components.append("less than a minute") 440 441 last_str = components.pop() 442 time_str = "" 443 if components: 444 time_str = ", ".join(components) 445 time_str += " and " 446 447 return time_str + last_str 448 449def andify(items): 450 """ 451 Format a list of items for use in text 452 453 Returns a comma-separated list, the last item preceded by "and" 454 455 :param items: Iterable list 456 :return str: Formatted string 457 """ 458 if len(items) == 0: 459 return "" 460 elif len(items) == 1: 461 return str(items[1]) 462 463 result = f" and {items.pop()}" 464 return ", ".join([str(item) for item in items]) + result 465 466def ellipsiate(text, length, inside=False, ellipsis_str="…"): 467 if len(text) <= length: 468 return text 469 470 elif not inside: 471 return text[:length] + ellipsis_str 472 473 else: 474 # two cases: URLs and normal text 475 # for URLs, try to only ellipsiate after the domain name 476 # this makes the URLs easier to read when shortened 477 if ural.is_url(text): 478 pre_part = "/".join(text.split("/")[:3]) 479 if len(pre_part) < length - 6: # kind of arbitrary 480 before = len(pre_part) + 1 481 else: 482 before = math.floor(length / 2) 483 else: 484 before = math.floor(length / 2) 485 486 after = len(text) - before 487 return text[:before] + ellipsis_str + text[after:] 488 489def hash_file(image_file, hash_type="file-hash"): 490 """ 491 Generate an image hash 492 493 :param Path image_file: Image file to hash 494 :param str hash_type: Hash type, one of `file-hash`, `colorhash`, 495 `phash`, `average_hash`, `dhash` 496 :return str: Hexadecimal hash value 497 """ 498 if not image_file.exists(): 499 raise FileNotFoundError() 500 501 if hash_type == "file-hash": 502 hasher = hashlib.sha1() 503 504 # Open the file in binary mode 505 with image_file.open("rb") as infile: 506 # Read and update hash in chunks to handle large files 507 while chunk := infile.read(1024): 508 hasher.update(chunk) 509 510 return hasher.hexdigest() 511 512 elif hash_type in ("colorhash", "phash", "average_hash", "dhash"): 513 image = Image.open(image_file) 514 515 return str(getattr(imagehash, hash_type)(image)) 516 517 else: 518 raise NotImplementedError(f"Unknown hash type '{hash_type}'") 519 520def get_yt_compatible_ids(yt_ids): 521 """ 522 :param yt_ids list, a list of strings 523 :returns list, a ist of joined strings in pairs of 50 524 525 Takes a list of IDs and returns list of joined strings 526 in pairs of fifty. This should be done for the YouTube API 527 that requires a comma-separated string and can only return 528 max fifty results. 529 """ 530 531 # If there's only one item, return a single list item 532 if isinstance(yt_ids, str): 533 return [yt_ids] 534 535 ids = [] 536 last_i = 0 537 for i, yt_id in enumerate(yt_ids): 538 539 # Add a joined string per fifty videos 540 if i % 50 == 0 and i != 0: 541 ids_string = ",".join(yt_ids[last_i:i]) 542 ids.append(ids_string) 543 last_i = i 544 545 # If the end of the list is reached, add the last data 546 elif i == (len(yt_ids) - 1): 547 ids_string = ",".join(yt_ids[last_i:i]) 548 ids.append(ids_string) 549 550 return ids 551 552 553def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None, 554 fontsize_small=None, fontsize_large=None): 555 """ 556 Get a standard SVG canvas to draw 4CAT graphs to 557 558 Adds a border, footer, header, and some basic text styling 559 560 :param path: The path where the SVG graph will be saved 561 :param width: Width of the canvas 562 :param height: Height of the canvas 563 :param header: Header, if necessary to draw 564 :param footer: Footer text, if necessary to draw. Defaults to shameless 565 4CAT advertisement. 566 :param fontsize_normal: Font size of normal text 567 :param fontsize_small: Font size of small text (e.g. footer) 568 :param fontsize_large: Font size of large text (e.g. header) 569 :return SVG: SVG canvas (via svgwrite) that can be drawn to 570 """ 571 from svgwrite.container import SVG, Hyperlink 572 from svgwrite.drawing import Drawing 573 from svgwrite.shapes import Rect 574 from svgwrite.text import Text 575 576 if fontsize_normal is None: 577 fontsize_normal = width / 75 578 579 if fontsize_small is None: 580 fontsize_small = width / 100 581 582 if fontsize_large is None: 583 fontsize_large = width / 50 584 585 # instantiate with border and white background 586 canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal) 587 canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF")) 588 589 # header 590 if header: 591 header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2)) 592 header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 593 header_shape.add( 594 Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 595 style="font-size:%ipx" % fontsize_large)) 596 canvas.add(header_shape) 597 598 # footer (i.e. 4cat banner) 599 if footer: 600 footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2) 601 footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize) 602 footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 603 link = Hyperlink(href="https://4cat.nl") 604 link.add( 605 Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 606 style="font-size:%ipx" % fontsize_small)) 607 footer_shape.add(link) 608 canvas.add(footer_shape) 609 610 return canvas 611 612 613def call_api(action, payload=None, wait_for_response=True): 614 """ 615 Send message to server 616 617 Calls the internal API and returns interpreted response. "status" is always 618 None if wait_for_response is False. 619 620 :param str action: API action 621 :param payload: API payload 622 :param bool wait_for_response: Wait for response? If not close connection 623 immediately after sending data. 624 625 :return: API response {"status": "success"|"error", "response": response, "error": error} 626 """ 627 connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 628 connection.settimeout(15) 629 config = CoreConfigManager() 630 try: 631 connection.connect((config.get('API_HOST'), config.get('API_PORT'))) 632 except ConnectionRefusedError: 633 return {"status": "error", "error": "Connection refused"} 634 635 msg = json.dumps({"request": action, "payload": payload}) 636 connection.sendall(msg.encode("ascii", "ignore")) 637 638 response_data = { 639 "status": None, 640 "response": None, 641 "error": None 642 } 643 644 if wait_for_response: 645 try: 646 response = "" 647 while True: 648 bytes = connection.recv(2048) 649 if not bytes: 650 break 651 652 response += bytes.decode("ascii", "ignore") 653 except (socket.timeout, TimeoutError): 654 response_data["status"] = "error" 655 response_data["error"] = "Connection timed out" 656 657 try: 658 connection.shutdown(socket.SHUT_RDWR) 659 except OSError: 660 # already shut down automatically 661 pass 662 connection.close() 663 664 if wait_for_response: 665 try: 666 json_response = json.loads(response) 667 response_data["response"] = json_response["response"] 668 response_data["error"] = json_response.get("error", None) 669 response_data["status"] = "error" if json_response.get("error") else "success" 670 except json.JSONDecodeError: 671 response_data["status"] = "error" 672 response_data["error"] = "Invalid JSON response" 673 response_data["response"] = response 674 675 return response_data 676 677def get_interval_descriptor(item, interval, item_column="timestamp"): 678 """ 679 Get interval descriptor based on timestamp 680 681 :param dict item: Item to generate descriptor for, should have a 682 "timestamp" key 683 :param str interval: Interval, one of "all", "overall", "year", 684 "month", "week", "day" 685 :param str item_column: Column name in the item dictionary that contains 686 the timestamp. Defaults to "timestamp". 687 :return str: Interval descriptor, e.g. "overall", "unknown_date", "2020", "2020-08", 688 "2020-43", "2020-08-01" 689 """ 690 if interval in ("all", "overall"): 691 return interval 692 693 if not item.get(item_column, None): 694 return "unknown_date" 695 696 # Catch cases where a custom timestamp has an epoch integer as value. 697 try: 698 timestamp = int(item[item_column]) 699 try: 700 timestamp = datetime.datetime.fromtimestamp(timestamp) 701 except (ValueError, TypeError): 702 raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"])) 703 except (TypeError, ValueError): 704 try: 705 timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S") 706 except (ValueError, TypeError): 707 raise ValueError("Invalid date '%s'" % str(item["timestamp"])) 708 709 if interval == "year": 710 return str(timestamp.year) 711 elif interval == "month": 712 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) 713 elif interval == "week": 714 return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2) 715 elif interval == "hour": 716 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 717 2) + " " + str(timestamp.hour).zfill(2) 718 elif interval == "minute": 719 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 720 2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2) 721 else: 722 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2) 723 724 725def pad_interval(intervals, first_interval=None, last_interval=None): 726 """ 727 Pad an interval so all intermediate intervals are filled 728 729 Warning, ugly code (PRs very welcome) 730 731 :param dict intervals: A dictionary, with dates (YYYY{-MM}{-DD}) as keys 732 and a numerical value. 733 :param first_interval: 734 :param last_interval: 735 :return: 736 """ 737 missing = 0 738 try: 739 test_key = list(intervals.keys())[0] 740 except IndexError: 741 return 0, {} 742 743 # first determine the boundaries of the interval 744 # these may be passed as parameters, or they can be inferred from the 745 # interval given 746 if first_interval: 747 first_interval = str(first_interval) 748 first_year = int(first_interval[0:4]) 749 if len(first_interval) > 4: 750 first_month = int(first_interval[5:7]) 751 if len(first_interval) > 7: 752 first_day = int(first_interval[8:10]) 753 if len(first_interval) > 10: 754 first_hour = int(first_interval[11:13]) 755 if len(first_interval) > 13: 756 first_minute = int(first_interval[14:16]) 757 758 else: 759 first_year = min([int(i[0:4]) for i in intervals]) 760 if len(test_key) > 4: 761 first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year]) 762 if len(test_key) > 7: 763 first_day = min( 764 [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month]) 765 if len(test_key) > 10: 766 first_hour = min( 767 [int(i[11:13]) for i in intervals if 768 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day]) 769 if len(test_key) > 13: 770 first_minute = min( 771 [int(i[14:16]) for i in intervals if 772 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int( 773 i[11:13]) == first_hour]) 774 775 if last_interval: 776 last_interval = str(last_interval) 777 last_year = int(last_interval[0:4]) 778 if len(last_interval) > 4: 779 last_month = int(last_interval[5:7]) 780 if len(last_interval) > 7: 781 last_day = int(last_interval[8:10]) 782 if len(last_interval) > 10: 783 last_hour = int(last_interval[11:13]) 784 if len(last_interval) > 13: 785 last_minute = int(last_interval[14:16]) 786 else: 787 last_year = max([int(i[0:4]) for i in intervals]) 788 if len(test_key) > 4: 789 last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year]) 790 if len(test_key) > 7: 791 last_day = max( 792 [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month]) 793 if len(test_key) > 10: 794 last_hour = max( 795 [int(i[11:13]) for i in intervals if 796 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day]) 797 if len(test_key) > 13: 798 last_minute = max( 799 [int(i[14:16]) for i in intervals if 800 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int( 801 i[11:13]) == last_hour]) 802 803 has_month = re.match(r"^[0-9]{4}-[0-9]", test_key) 804 has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key) 805 has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key) 806 has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key) 807 808 all_intervals = [] 809 for year in range(first_year, last_year + 1): 810 year_interval = str(year) 811 812 if not has_month: 813 all_intervals.append(year_interval) 814 continue 815 816 start_month = first_month if year == first_year else 1 817 end_month = last_month if year == last_year else 12 818 for month in range(start_month, end_month + 1): 819 month_interval = year_interval + "-" + str(month).zfill(2) 820 821 if not has_day: 822 all_intervals.append(month_interval) 823 continue 824 825 start_day = first_day if all((year == first_year, month == first_month)) else 1 826 end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1] 827 for day in range(start_day, end_day + 1): 828 day_interval = month_interval + "-" + str(day).zfill(2) 829 830 if not has_hour: 831 all_intervals.append(day_interval) 832 continue 833 834 start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0 835 end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23 836 for hour in range(start_hour, end_hour + 1): 837 hour_interval = day_interval + " " + str(hour).zfill(2) 838 839 if not has_minute: 840 all_intervals.append(hour_interval) 841 continue 842 843 start_minute = first_minute if all( 844 (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0 845 end_minute = last_minute if all( 846 (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59 847 848 for minute in range(start_minute, end_minute + 1): 849 minute_interval = hour_interval + ":" + str(minute).zfill(2) 850 all_intervals.append(minute_interval) 851 852 for interval in all_intervals: 853 if interval not in intervals: 854 intervals[interval] = 0 855 missing += 1 856 857 # sort while we're at it 858 intervals = {key: intervals[key] for key in sorted(intervals)} 859 860 return missing, intervals 861 862 863def remove_nuls(value): 864 """ 865 Remove \0 from a value 866 867 The CSV library cries about a null byte when it encounters one :( :( :( 868 poor little csv cannot handle a tiny little null byte 869 870 So remove them from the data because they should not occur in utf-8 data 871 anyway. 872 873 :param value: Value to remove nulls from. For dictionaries, sets, tuples 874 and lists all items are parsed recursively. 875 :return value: Cleaned value 876 """ 877 if type(value) is dict: 878 for field in value: 879 value[field] = remove_nuls(value[field]) 880 elif type(value) is list: 881 value = [remove_nuls(item) for item in value] 882 elif type(value) is tuple: 883 value = tuple([remove_nuls(item) for item in value]) 884 elif type(value) is set: 885 value = set([remove_nuls(item) for item in value]) 886 elif type(value) is str: 887 value = value.replace("\0", "") 888 889 return value 890 891 892class NullAwareTextIOWrapper(io.TextIOWrapper): 893 """ 894 TextIOWrapper that skips null bytes 895 896 This can be used as a file reader that silently discards any null bytes it 897 encounters. 898 """ 899 900 def __next__(self): 901 value = super().__next__() 902 return remove_nuls(value) 903 904 905class HashCache: 906 """ 907 Simple cache handler to cache hashed values 908 909 Avoids having to calculate a hash for values that have been hashed before 910 """ 911 912 def __init__(self, hasher): 913 self.hash_cache = {} 914 self.hasher = hasher 915 916 def update_cache(self, value): 917 """ 918 Checks the hash_cache to see if the value has been cached previously, 919 updates the hash_cache if needed, and returns the hashed value. 920 """ 921 # value = str(value) 922 if value not in self.hash_cache: 923 author_hasher = self.hasher.copy() 924 author_hasher.update(str(value).encode("utf-8")) 925 self.hash_cache[value] = author_hasher.hexdigest() 926 del author_hasher 927 return self.hash_cache[value] 928 929 930def dict_search_and_update(item, keyword_matches, function): 931 """ 932 Filter fields in an object recursively 933 934 Apply a function to every item and sub item of a dictionary if the key 935 contains one of the provided match terms. 936 937 Function loops through a dictionary or list and compares dictionary keys to 938 the strings defined by keyword_matches. It then applies the change_function 939 to corresponding values. 940 941 Note: if a matching term is found, all nested values will have the function 942 applied to them. e.g., all these values would be changed even those with 943 not_key_match: 944 945 {'key_match' : 'changed', 946 'also_key_match' : {'not_key_match' : 'but_value_still_changed'}, 947 'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]} 948 949 This is a comprehensive (and expensive) approach to updating a dictionary. 950 IF a dictionary structure is known, a better solution would be to update 951 using specific keys. 952 953 :param Dict/List item: dictionary/list/json to loop through 954 :param String keyword_matches: list of strings that will be matched to 955 dictionary keys. Can contain wildcards which are matched using fnmatch. 956 :param Function function: function appled to all values of any items 957 nested under a matching key 958 959 :return Dict/List: Copy of original item, but filtered 960 """ 961 962 def loop_helper_function(d_or_l, match_terms, change_function): 963 """ 964 Recursive helper function that updates item in place 965 """ 966 if isinstance(d_or_l, dict): 967 # Iterate through dictionary 968 for key, value in iter(d_or_l.items()): 969 if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]): 970 # Match found; apply function to all items and sub-items 971 if isinstance(value, (list, dict)): 972 # Pass item through again with match_terms = True 973 loop_helper_function(value, 'True', change_function) 974 elif value is None: 975 pass 976 else: 977 # Update the value 978 d_or_l[key] = change_function(value) 979 elif isinstance(value, (list, dict)): 980 # Continue search 981 loop_helper_function(value, match_terms, change_function) 982 elif isinstance(d_or_l, list): 983 # Iterate through list 984 for n, value in enumerate(d_or_l): 985 if isinstance(value, (list, dict)): 986 # Continue search 987 loop_helper_function(value, match_terms, change_function) 988 elif match_terms == 'True': 989 # List item nested in matching 990 d_or_l[n] = change_function(value) 991 else: 992 raise Exception('Must pass list or dictionary') 993 994 # Lowercase keyword_matches 995 keyword_matches = [keyword.lower() for keyword in keyword_matches] 996 997 # Create deepcopy and return new item 998 temp_item = copy.deepcopy(item) 999 loop_helper_function(temp_item, keyword_matches, function) 1000 return temp_item 1001 1002 1003def get_last_line(filepath): 1004 """ 1005 Seeks from end of file for '\n' and returns that line 1006 1007 :param str filepath: path to file 1008 :return str: last line of file 1009 """ 1010 with open(filepath, "rb") as file: 1011 try: 1012 # start at the end of file 1013 file.seek(-2, os.SEEK_END) 1014 # check if NOT endline i.e. '\n' 1015 while file.read(1) != b'\n': 1016 # if not '\n', back up two characters and check again 1017 file.seek(-2, os.SEEK_CUR) 1018 except OSError: 1019 file.seek(0) 1020 last_line = file.readline().decode() 1021 return last_line 1022 1023 1024def add_notification(db, user, notification, expires=None, allow_dismiss=True): 1025 db.insert("users_notifications", { 1026 "username": user, 1027 "notification": notification, 1028 "timestamp_expires": expires, 1029 "allow_dismiss": allow_dismiss 1030 }, safe=True) 1031 1032 1033def send_email(recipient, message, mail_config): 1034 """ 1035 Send an e-mail using the configured SMTP settings 1036 1037 Just a thin wrapper around smtplib, so we don't have to repeat ourselves. 1038 Exceptions are to be handled outside the function. 1039 1040 :param list recipient: Recipient e-mail addresses 1041 :param MIMEMultipart message: Message to send 1042 :param mail_config: Configuration reader 1043 """ 1044 # Create a secure SSL context 1045 context = ssl.create_default_context() 1046 1047 # Decide which connection type 1048 with smtplib.SMTP_SSL(mail_config.get('mail.server'), port=mail_config.get('mail.port', 0), context=context) if mail_config.get( 1049 'mail.ssl') == 'ssl' else smtplib.SMTP(mail_config.get('mail.server'), 1050 port=mail_config.get('mail.port', 0)) as server: 1051 if mail_config.get('mail.ssl') == 'tls': 1052 # smtplib.SMTP adds TLS context here 1053 server.starttls(context=context) 1054 1055 # Log in 1056 if mail_config.get('mail.username') and mail_config.get('mail.password'): 1057 server.ehlo() 1058 server.login(mail_config.get('mail.username'), mail_config.get('mail.password')) 1059 1060 # Send message 1061 if type(message) is str: 1062 server.sendmail(mail_config.get('mail.noreply'), recipient, message) 1063 else: 1064 server.sendmail(mail_config.get('mail.noreply'), recipient, message.as_string()) 1065 1066 1067def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'): 1068 """ 1069 Return a flattened dictionary where nested dictionary objects are given new 1070 keys using the partent key combined using the seperator with the child key. 1071 1072 Lists will be converted to json strings via json.dumps() 1073 1074 :param MutableMapping d: Dictionary like object 1075 :param str parent_key: The original parent key prepending future nested keys 1076 :param str sep: A seperator string used to combine parent and child keys 1077 :return dict: A new dictionary with the no nested values 1078 """ 1079 1080 def _flatten_dict_gen(d, parent_key, sep): 1081 for k, v in d.items(): 1082 new_key = parent_key + sep + k if parent_key else k 1083 if isinstance(v, MutableMapping): 1084 yield from flatten_dict(v, new_key, sep=sep).items() 1085 elif isinstance(v, (list, set)): 1086 yield new_key, json.dumps( 1087 [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v]) 1088 else: 1089 yield new_key, v 1090 1091 return dict(_flatten_dict_gen(d, parent_key, sep)) 1092 1093 1094def sets_to_lists(d: MutableMapping): 1095 """ 1096 Return a dictionary where all nested sets have been converted to lists. 1097 1098 :param MutableMapping d: Dictionary like object 1099 :return dict: A new dictionary with the no nested sets 1100 """ 1101 1102 def _check_list(lst): 1103 return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, ( 1104 set, list)) else item for item in lst] 1105 1106 def _sets_to_lists_gen(d): 1107 for k, v in d.items(): 1108 if isinstance(v, MutableMapping): 1109 yield k, sets_to_lists(v) 1110 elif isinstance(v, (list, set)): 1111 yield k, _check_list(v) 1112 else: 1113 yield k, v 1114 1115 return dict(_sets_to_lists_gen(d)) 1116 1117 1118def url_to_hash(url, remove_scheme=True, remove_www=True): 1119 """ 1120 Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the 1121 rest of the URL. 1122 """ 1123 parsed_url = urlparse(url.lower()) 1124 if parsed_url: 1125 if remove_scheme: 1126 parsed_url = parsed_url._replace(scheme="") 1127 if remove_www: 1128 netloc = re.sub(r"^www\.", "", parsed_url.netloc) 1129 parsed_url = parsed_url._replace(netloc=netloc) 1130 1131 url = re.sub(r"[^0-9a-z]+", "_", urlunparse(parsed_url).strip("/")) 1132 else: 1133 # Unable to parse URL; use regex 1134 if remove_scheme: 1135 url = re.sub(r"^https?://", "", url) 1136 if remove_www: 1137 if not remove_scheme: 1138 scheme = re.match(r"^https?://", url).group() 1139 temp_url = re.sub(r"^https?://", "", url) 1140 url = scheme + re.sub(r"^www\.", "", temp_url) 1141 else: 1142 url = re.sub(r"^www\.", "", url) 1143 1144 url = re.sub(r"[^0-9a-z]+", "_", url.lower().strip("/")) 1145 1146 return hashlib.blake2b(url.encode("utf-8"), digest_size=24).hexdigest() 1147 1148def url_to_filename(url, staging_area=None, default_name="file", default_ext=".png", max_bytes=255, existing_filenames=None): 1149 """ 1150 Determine filenames for saved files 1151 1152 Prefer the original filename (extracted from the URL), but this may not 1153 always be possible or be an actual filename. Also, avoid using the same 1154 filename multiple times. Ensures filenames don't exceed max_bytes. 1155 1156 :param str url: URLs to determine filenames for 1157 :param Path staging_area: Path to the staging area where files are saved 1158 (to avoid collisions); if None, no collision avoidance is done. 1159 :param str default_name: Default name to use if no filename can be 1160 extracted from the URL 1161 :param str default_ext: Default extension to use if no filename can be 1162 extracted from the URL 1163 :param int max_bytes: Maximum number of bytes for the filename 1164 :return str: Suitable file name 1165 """ 1166 clean_filename = url.split("/")[-1].split("?")[0].split("#")[0] 1167 if re.match(r"[^.]+\.[a-zA-Z0-9]{1,10}", clean_filename): 1168 base_filename = clean_filename 1169 else: 1170 base_filename = default_name + default_ext 1171 1172 if not existing_filenames: 1173 existing_filenames = [] 1174 1175 # Split base filename into name and extension 1176 if '.' in base_filename: 1177 name_part, ext_part = base_filename.rsplit('.', 1) 1178 ext_part = '.' + ext_part 1179 else: 1180 name_part = base_filename 1181 ext_part = '' 1182 1183 # Truncate base filename if it exceeds max_bytes 1184 if len(base_filename.encode('utf-8')) > max_bytes: 1185 # Reserve space for extension 1186 available_bytes = max_bytes - len(ext_part.encode('utf-8')) 1187 if available_bytes <= 0: 1188 # If extension is too long, use minimal name 1189 name_part = default_name 1190 ext_part = default_ext 1191 available_bytes = max_bytes - len(ext_part.encode('utf-8')) 1192 1193 # Truncate name part to fit 1194 name_bytes = name_part.encode('utf-8') 1195 if len(name_bytes) > available_bytes: 1196 # Truncate byte by byte to ensure valid UTF-8 1197 while len(name_bytes) > available_bytes: 1198 name_part = name_part[:-1] 1199 name_bytes = name_part.encode('utf-8') 1200 1201 base_filename = name_part + ext_part 1202 1203 filename = base_filename 1204 1205 if staging_area: 1206 # Ensure the filename is unique in the staging area 1207 file_path = staging_area.joinpath(filename) 1208 file_index = 1 1209 1210 while file_path.exists() or filename in existing_filenames: 1211 # Calculate space needed for index suffix 1212 index_suffix = f"-{file_index}" 1213 1214 # Check if filename with index would exceed max_bytes 1215 test_filename = name_part + index_suffix + ext_part 1216 if len(test_filename.encode('utf-8')) > max_bytes: 1217 # Need to truncate name_part to make room for index 1218 available_bytes = max_bytes - len((index_suffix + ext_part).encode('utf-8')) 1219 if available_bytes <= 0: 1220 # Extreme case - use minimal name 1221 truncated_name = "f" 1222 else: 1223 # Truncate name_part to fit 1224 truncated_name = name_part 1225 name_bytes = truncated_name.encode('utf-8') 1226 while len(name_bytes) > available_bytes: 1227 truncated_name = truncated_name[:-1] 1228 name_bytes = truncated_name.encode('utf-8') 1229 1230 filename = truncated_name + index_suffix + ext_part 1231 else: 1232 filename = test_filename 1233 1234 file_index += 1 1235 file_path = staging_area.joinpath(filename) 1236 1237 return filename 1238 1239 1240def split_urls(url_string, allowed_schemes=None): 1241 """ 1242 Split URL text by \n and commas. 1243 1244 4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs 1245 and also check for commas within URLs using schemes. 1246 1247 Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com); 1248 this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail). 1249 """ 1250 if allowed_schemes is None: 1251 allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://') 1252 potential_urls = [] 1253 # Split the text by \n 1254 for line in url_string.split('\n'): 1255 # Handle commas that may exist within URLs 1256 parts = line.split(',') 1257 recombined_url = "" 1258 for part in parts: 1259 if part.startswith(allowed_schemes): # Other schemes exist 1260 # New URL start detected 1261 if recombined_url: 1262 # Already have a URL, add to list 1263 potential_urls.append(recombined_url) 1264 # Start new URL 1265 recombined_url = part 1266 elif part: 1267 if recombined_url: 1268 # Add to existing URL 1269 recombined_url += "," + part 1270 else: 1271 # No existing URL, start new 1272 recombined_url = part 1273 else: 1274 # Ignore empty strings 1275 pass 1276 if recombined_url: 1277 # Add any remaining URL 1278 potential_urls.append(recombined_url) 1279 return potential_urls 1280 1281 1282def folder_size(path='.'): 1283 """ 1284 Get the size of a folder using os.scandir for efficiency 1285 """ 1286 total = 0 1287 for entry in os.scandir(path): 1288 if entry.is_file(): 1289 total += entry.stat().st_size 1290 elif entry.is_dir(): 1291 total += folder_size(entry.path) 1292 return total 1293 1294def hash_to_md5(string: str) -> str: 1295 """ 1296 Hash a string with an md5 hash. 1297 """ 1298 return hashlib.md5(string.encode("utf-8")).hexdigest()
16class UserInput: 17 """ 18 Class for handling user input 19 20 It is important to sanitise user input, as carelessly entered parameters 21 may in e.g. requesting far more data than needed, or lead to undefined 22 behaviour. This class offers a set of pre-defined value types that can be 23 consistently rendered as form elements in an interface and parsed. 24 """ 25 OPTION_TOGGLE = "toggle" # boolean toggle (checkbox) 26 OPTION_CHOICE = "choice" # one choice out of a list (select) 27 OPTION_TEXT = "string" # simple string or integer (input text) 28 OPTION_MULTI = "multi" # multiple values out of a list (select multiple) 29 OPTION_MULTI_SELECT = "multi_select" # multiple values out of a dropdown list (select multiple) 30 OPTION_INFO = "info" # just a bit of text, not actual input 31 OPTION_TEXT_LARGE = "textarea" # longer text 32 OPTION_TEXT_JSON = "json" # text, but should be valid JSON 33 OPTION_DATE = "date" # a single date 34 OPTION_DATERANGE = "daterange" # a beginning and end date 35 OPTION_DIVIDER = "divider" # meta-option, divides related sets of options 36 OPTION_FILE = "file" # file upload 37 OPTION_HUE = "hue" # colour hue 38 OPTION_DATASOURCES = "datasources" # data source toggling 39 OPTION_DATASOURCES_TABLE = "datasources_table" # a table with settings per data source 40 OPTION_ANNOTATION = "annotation" # checkbox for whether to an annotation 41 OPTION_ANNOTATIONS = "annotations" # table for whether to write multiple annotations 42 43 OPTIONS_COSMETIC = (OPTION_INFO, OPTION_DIVIDER) 44 45 @staticmethod 46 def parse_all(options, input, silently_correct=True): 47 """ 48 Parse form input for the provided options 49 50 Ignores all input not belonging to any of the defined options: parses 51 and sanitises the rest, and returns a dictionary with the sanitised 52 options. If an option is *not* present in the input, the default value 53 is used, and if that is absent, `None`. 54 55 In other words, this ensures a dictionary with 1) only white-listed 56 keys, 2) a value of an expected type for each key. 57 58 :param dict options: Options, as a name -> settings dictionary 59 :param dict input: Input, as a form field -> value dictionary 60 :param bool silently_correct: If true, replace invalid values with the 61 given default value; else, raise a QueryParametersException if a value 62 is invalid. 63 64 :return dict: Sanitised form input 65 """ 66 67 from common.lib.helpers import convert_to_int 68 parsed_input = {} 69 70 if type(input) is not dict and type(input) is not ImmutableMultiDict: 71 raise TypeError("input must be a dictionary or ImmutableMultiDict") 72 73 if type(input) is ImmutableMultiDict: 74 # we are not using to_dict, because that messes up multi-selects 75 input = {key: input.getlist(key) for key in input} 76 for key, value in input.items(): 77 if type(value) is list and len(value) == 1: 78 input[key] = value[0] 79 80 # all parameters are submitted as option-[parameter ID], this is an 81 # artifact of how the web interface works and we can simply remove the 82 # prefix 83 input = {re.sub(r"^option-", "", field): input[field] for field in input} 84 85 # re-order input so that the fields relying on the value of other 86 # fields are parsed last 87 options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)} 88 89 for option, settings in options.items(): 90 if settings.get("indirect"): 91 # these are settings that are derived from and set by other 92 # settings 93 continue 94 95 if settings.get("type") in UserInput.OPTIONS_COSMETIC: 96 # these are structural form elements and never have a value 97 continue 98 99 elif settings.get("type") == UserInput.OPTION_DATERANGE: 100 # special case, since it combines two inputs 101 option_min = option + "-min" 102 option_max = option + "-max" 103 104 # normally this is taken care of client-side, but in case this 105 # didn't work, try to salvage it server-side 106 if option_min not in input or input.get(option_min) == "-1": 107 option_min += "_proxy" 108 109 if option_max not in input or input.get(option_max) == "-1": 110 option_max += "_proxy" 111 112 # save as a tuple of unix timestamps (or None) 113 try: 114 after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct)) 115 116 if before and after and after > before: 117 if not silently_correct: 118 raise QueryParametersException("End of date range must be after beginning of date range.") 119 else: 120 before = after 121 122 parsed_input[option] = (after, before) 123 except RequirementsNotMetException: 124 pass 125 126 elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 127 # special case too, since if a checkbox is unchecked, it simply 128 # does not show up in the input 129 try: 130 if option in input: 131 # Toggle needs to be parsed 132 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 133 else: 134 # Toggle was left blank 135 parsed_input[option] = False 136 except RequirementsNotMetException: 137 pass 138 139 elif settings.get("type") == UserInput.OPTION_DATASOURCES: 140 # special case, because this combines multiple inputs to 141 # configure data source availability and expiration 142 datasources = {datasource: { 143 "enabled": f"{option}-enable-{datasource}" in input, 144 "allow_optout": f"{option}-optout-{datasource}" in input, 145 "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0) 146 } for datasource in input[option].split(",")} 147 148 parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]] 149 parsed_input[option.split(".")[0] + ".expiration"] = datasources 150 151 elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE: 152 # special case, parse table values to generate a dict 153 columns = list(settings["columns"].keys()) 154 table_input = {} 155 156 for datasource in list(settings["default"].keys()): 157 table_input[datasource] = {} 158 for column in columns: 159 160 choice = input.get(option + "-" + datasource + "-" + column, False) 161 column_settings = settings["columns"][column] # sub-settings per column 162 table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True) 163 164 parsed_input[option] = table_input 165 166 elif option not in input: 167 # not provided? use default 168 parsed_input[option] = settings.get("default", None) 169 170 else: 171 # normal parsing and sanitisation 172 try: 173 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 174 except RequirementsNotMetException: 175 pass 176 177 return parsed_input 178 179 @staticmethod 180 def parse_value(settings, choice, other_input=None, silently_correct=True): 181 """ 182 Filter user input 183 184 Makes sure user input for post-processors is valid and within the 185 parameters specified by the post-processor 186 187 :param obj settings: Settings, including defaults and valid options 188 :param choice: The chosen option, to be parsed 189 :param dict other_input: Other input, as parsed so far 190 :param bool silently_correct: If true, replace invalid values with the 191 given default value; else, raise a QueryParametersException if a value 192 is invalid. 193 194 :return: Validated and parsed input 195 """ 196 # short-circuit if there is a requirement for the field to be parsed 197 # and the requirement isn't met 198 if settings.get("requires"): 199 try: 200 field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0] 201 except IndexError: 202 # invalid condition, interpret as 'does the field with this name have a value' 203 field, operator, value = (choice, "!=", "") 204 205 if field not in other_input: 206 raise RequirementsNotMetException() 207 208 other_value = other_input.get(field) 209 if type(other_value) is bool: 210 # evalues to a boolean, i.e. checkboxes etc 211 if operator == "!=": 212 if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")): 213 raise RequirementsNotMetException() 214 else: 215 if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")): 216 raise RequirementsNotMetException() 217 218 else: 219 if type(other_value) in (tuple, list): 220 # iterables are a bit special 221 if len(other_value) == 1: 222 # treat one-item lists as "normal" values 223 other_value = other_value[0] 224 elif operator == "~=": # interpret as 'is in list?' 225 if value not in other_value: 226 raise RequirementsNotMetException() 227 else: 228 # condition doesn't make sense for a list, so assume it's not True 229 raise RequirementsNotMetException() 230 231 if operator == "^=" and not str(other_value).startswith(value): 232 raise RequirementsNotMetException() 233 elif operator == "$=" and not str(other_value).endswith(value): 234 raise RequirementsNotMetException() 235 elif operator == "~=" and value not in str(other_value): 236 raise RequirementsNotMetException() 237 elif operator == "!=" and value == other_value: 238 raise RequirementsNotMetException() 239 elif operator in ("==", "=") and value != other_value: 240 raise RequirementsNotMetException() 241 242 input_type = settings.get("type", "") 243 if input_type in UserInput.OPTIONS_COSMETIC: 244 # these are structural form elements and can never return a value 245 return None 246 247 elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 248 # simple boolean toggle 249 if type(choice) is bool: 250 return choice 251 elif choice in ['false', 'False']: 252 # Sanitized options passed back to Flask can be converted to strings as 'false' 253 return False 254 elif choice in ['true', 'True', 'on']: 255 # Toggle will have value 'on', but may also becomes a string 'true' 256 return True 257 else: 258 raise QueryParametersException("Toggle invalid input") 259 260 elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE): 261 # parse either integers (unix timestamps) or try to guess the date 262 # format (the latter may be used for input if JavaScript is turned 263 # off in the front-end and the input comes from there) 264 value = None 265 try: 266 value = int(choice) 267 except ValueError: 268 parsed_choice = parse_datetime(choice) 269 value = int(parsed_choice.timestamp()) 270 finally: 271 return value 272 273 elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS): 274 # any number of values out of a list of possible values 275 # comma-separated during input, returned as a list of valid options 276 if not choice: 277 return settings.get("default", []) 278 279 chosen = choice.split(",") 280 return [item for item in chosen if item in settings.get("options", [])] 281 282 elif input_type == UserInput.OPTION_MULTI_SELECT: 283 # multiple number of values out of a dropdown list of possible values 284 # comma-separated during input, returned as a list of valid options 285 if not choice: 286 return settings.get("default", []) 287 288 if type(choice) is str: 289 # should be a list if the form control was actually a multiselect 290 # but we have some client side UI helpers that may produce a string 291 # instead 292 choice = choice.split(",") 293 294 return [item for item in choice if item in settings.get("options", [])] 295 296 elif input_type == UserInput.OPTION_CHOICE: 297 # select box 298 # one out of multiple options 299 # return option if valid, or default 300 if choice not in settings.get("options"): 301 if not silently_correct: 302 raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}") 303 else: 304 return settings.get("default", "") 305 else: 306 return choice 307 308 elif input_type == UserInput.OPTION_TEXT_JSON: 309 # verify that this is actually json 310 try: 311 json.dumps(json.loads(choice)) 312 except json.JSONDecodeError: 313 raise QueryParametersException("Invalid JSON value '%s'" % choice) 314 315 return json.loads(choice) 316 317 elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE): 318 # text string 319 # optionally clamp it as an integer; return default if not a valid 320 # integer (or float; inferred from default or made explicit via the 321 # coerce_type setting) 322 if settings.get("coerce_type"): 323 value_type = settings["coerce_type"] 324 else: 325 value_type = type(settings.get("default")) 326 if value_type not in (int, float): 327 value_type = int 328 329 if "max" in settings: 330 try: 331 choice = min(settings["max"], value_type(choice)) 332 except (ValueError, TypeError): 333 if not silently_correct: 334 raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"])) 335 336 choice = settings.get("default") 337 338 if "min" in settings: 339 try: 340 choice = max(settings["min"], value_type(choice)) 341 except (ValueError, TypeError): 342 if not silently_correct: 343 raise QueryParametersException("Provide a value of %s or more." % str(settings["min"])) 344 345 choice = settings.get("default") 346 347 if choice is None or choice == "": 348 choice = settings.get("default") 349 350 if choice is None: 351 choice = 0 if "min" in settings or "max" in settings else "" 352 353 if settings.get("coerce_type"): 354 try: 355 return value_type(choice) 356 except (ValueError, TypeError): 357 return settings.get("default") 358 else: 359 return choice 360 361 else: 362 # no filtering 363 return choice
Class for handling user input
It is important to sanitise user input, as carelessly entered parameters may in e.g. requesting far more data than needed, or lead to undefined behaviour. This class offers a set of pre-defined value types that can be consistently rendered as form elements in an interface and parsed.
45 @staticmethod 46 def parse_all(options, input, silently_correct=True): 47 """ 48 Parse form input for the provided options 49 50 Ignores all input not belonging to any of the defined options: parses 51 and sanitises the rest, and returns a dictionary with the sanitised 52 options. If an option is *not* present in the input, the default value 53 is used, and if that is absent, `None`. 54 55 In other words, this ensures a dictionary with 1) only white-listed 56 keys, 2) a value of an expected type for each key. 57 58 :param dict options: Options, as a name -> settings dictionary 59 :param dict input: Input, as a form field -> value dictionary 60 :param bool silently_correct: If true, replace invalid values with the 61 given default value; else, raise a QueryParametersException if a value 62 is invalid. 63 64 :return dict: Sanitised form input 65 """ 66 67 from common.lib.helpers import convert_to_int 68 parsed_input = {} 69 70 if type(input) is not dict and type(input) is not ImmutableMultiDict: 71 raise TypeError("input must be a dictionary or ImmutableMultiDict") 72 73 if type(input) is ImmutableMultiDict: 74 # we are not using to_dict, because that messes up multi-selects 75 input = {key: input.getlist(key) for key in input} 76 for key, value in input.items(): 77 if type(value) is list and len(value) == 1: 78 input[key] = value[0] 79 80 # all parameters are submitted as option-[parameter ID], this is an 81 # artifact of how the web interface works and we can simply remove the 82 # prefix 83 input = {re.sub(r"^option-", "", field): input[field] for field in input} 84 85 # re-order input so that the fields relying on the value of other 86 # fields are parsed last 87 options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)} 88 89 for option, settings in options.items(): 90 if settings.get("indirect"): 91 # these are settings that are derived from and set by other 92 # settings 93 continue 94 95 if settings.get("type") in UserInput.OPTIONS_COSMETIC: 96 # these are structural form elements and never have a value 97 continue 98 99 elif settings.get("type") == UserInput.OPTION_DATERANGE: 100 # special case, since it combines two inputs 101 option_min = option + "-min" 102 option_max = option + "-max" 103 104 # normally this is taken care of client-side, but in case this 105 # didn't work, try to salvage it server-side 106 if option_min not in input or input.get(option_min) == "-1": 107 option_min += "_proxy" 108 109 if option_max not in input or input.get(option_max) == "-1": 110 option_max += "_proxy" 111 112 # save as a tuple of unix timestamps (or None) 113 try: 114 after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct)) 115 116 if before and after and after > before: 117 if not silently_correct: 118 raise QueryParametersException("End of date range must be after beginning of date range.") 119 else: 120 before = after 121 122 parsed_input[option] = (after, before) 123 except RequirementsNotMetException: 124 pass 125 126 elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 127 # special case too, since if a checkbox is unchecked, it simply 128 # does not show up in the input 129 try: 130 if option in input: 131 # Toggle needs to be parsed 132 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 133 else: 134 # Toggle was left blank 135 parsed_input[option] = False 136 except RequirementsNotMetException: 137 pass 138 139 elif settings.get("type") == UserInput.OPTION_DATASOURCES: 140 # special case, because this combines multiple inputs to 141 # configure data source availability and expiration 142 datasources = {datasource: { 143 "enabled": f"{option}-enable-{datasource}" in input, 144 "allow_optout": f"{option}-optout-{datasource}" in input, 145 "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0) 146 } for datasource in input[option].split(",")} 147 148 parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]] 149 parsed_input[option.split(".")[0] + ".expiration"] = datasources 150 151 elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE: 152 # special case, parse table values to generate a dict 153 columns = list(settings["columns"].keys()) 154 table_input = {} 155 156 for datasource in list(settings["default"].keys()): 157 table_input[datasource] = {} 158 for column in columns: 159 160 choice = input.get(option + "-" + datasource + "-" + column, False) 161 column_settings = settings["columns"][column] # sub-settings per column 162 table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True) 163 164 parsed_input[option] = table_input 165 166 elif option not in input: 167 # not provided? use default 168 parsed_input[option] = settings.get("default", None) 169 170 else: 171 # normal parsing and sanitisation 172 try: 173 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 174 except RequirementsNotMetException: 175 pass 176 177 return parsed_input
Parse form input for the provided options
Ignores all input not belonging to any of the defined options: parses
and sanitises the rest, and returns a dictionary with the sanitised
options. If an option is not present in the input, the default value
is used, and if that is absent, None
.
In other words, this ensures a dictionary with 1) only white-listed keys, 2) a value of an expected type for each key.
Parameters
- dict options: Options, as a name -> settings dictionary
- dict input: Input, as a form field -> value dictionary
- bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns
Sanitised form input
179 @staticmethod 180 def parse_value(settings, choice, other_input=None, silently_correct=True): 181 """ 182 Filter user input 183 184 Makes sure user input for post-processors is valid and within the 185 parameters specified by the post-processor 186 187 :param obj settings: Settings, including defaults and valid options 188 :param choice: The chosen option, to be parsed 189 :param dict other_input: Other input, as parsed so far 190 :param bool silently_correct: If true, replace invalid values with the 191 given default value; else, raise a QueryParametersException if a value 192 is invalid. 193 194 :return: Validated and parsed input 195 """ 196 # short-circuit if there is a requirement for the field to be parsed 197 # and the requirement isn't met 198 if settings.get("requires"): 199 try: 200 field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0] 201 except IndexError: 202 # invalid condition, interpret as 'does the field with this name have a value' 203 field, operator, value = (choice, "!=", "") 204 205 if field not in other_input: 206 raise RequirementsNotMetException() 207 208 other_value = other_input.get(field) 209 if type(other_value) is bool: 210 # evalues to a boolean, i.e. checkboxes etc 211 if operator == "!=": 212 if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")): 213 raise RequirementsNotMetException() 214 else: 215 if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")): 216 raise RequirementsNotMetException() 217 218 else: 219 if type(other_value) in (tuple, list): 220 # iterables are a bit special 221 if len(other_value) == 1: 222 # treat one-item lists as "normal" values 223 other_value = other_value[0] 224 elif operator == "~=": # interpret as 'is in list?' 225 if value not in other_value: 226 raise RequirementsNotMetException() 227 else: 228 # condition doesn't make sense for a list, so assume it's not True 229 raise RequirementsNotMetException() 230 231 if operator == "^=" and not str(other_value).startswith(value): 232 raise RequirementsNotMetException() 233 elif operator == "$=" and not str(other_value).endswith(value): 234 raise RequirementsNotMetException() 235 elif operator == "~=" and value not in str(other_value): 236 raise RequirementsNotMetException() 237 elif operator == "!=" and value == other_value: 238 raise RequirementsNotMetException() 239 elif operator in ("==", "=") and value != other_value: 240 raise RequirementsNotMetException() 241 242 input_type = settings.get("type", "") 243 if input_type in UserInput.OPTIONS_COSMETIC: 244 # these are structural form elements and can never return a value 245 return None 246 247 elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 248 # simple boolean toggle 249 if type(choice) is bool: 250 return choice 251 elif choice in ['false', 'False']: 252 # Sanitized options passed back to Flask can be converted to strings as 'false' 253 return False 254 elif choice in ['true', 'True', 'on']: 255 # Toggle will have value 'on', but may also becomes a string 'true' 256 return True 257 else: 258 raise QueryParametersException("Toggle invalid input") 259 260 elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE): 261 # parse either integers (unix timestamps) or try to guess the date 262 # format (the latter may be used for input if JavaScript is turned 263 # off in the front-end and the input comes from there) 264 value = None 265 try: 266 value = int(choice) 267 except ValueError: 268 parsed_choice = parse_datetime(choice) 269 value = int(parsed_choice.timestamp()) 270 finally: 271 return value 272 273 elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS): 274 # any number of values out of a list of possible values 275 # comma-separated during input, returned as a list of valid options 276 if not choice: 277 return settings.get("default", []) 278 279 chosen = choice.split(",") 280 return [item for item in chosen if item in settings.get("options", [])] 281 282 elif input_type == UserInput.OPTION_MULTI_SELECT: 283 # multiple number of values out of a dropdown list of possible values 284 # comma-separated during input, returned as a list of valid options 285 if not choice: 286 return settings.get("default", []) 287 288 if type(choice) is str: 289 # should be a list if the form control was actually a multiselect 290 # but we have some client side UI helpers that may produce a string 291 # instead 292 choice = choice.split(",") 293 294 return [item for item in choice if item in settings.get("options", [])] 295 296 elif input_type == UserInput.OPTION_CHOICE: 297 # select box 298 # one out of multiple options 299 # return option if valid, or default 300 if choice not in settings.get("options"): 301 if not silently_correct: 302 raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}") 303 else: 304 return settings.get("default", "") 305 else: 306 return choice 307 308 elif input_type == UserInput.OPTION_TEXT_JSON: 309 # verify that this is actually json 310 try: 311 json.dumps(json.loads(choice)) 312 except json.JSONDecodeError: 313 raise QueryParametersException("Invalid JSON value '%s'" % choice) 314 315 return json.loads(choice) 316 317 elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE): 318 # text string 319 # optionally clamp it as an integer; return default if not a valid 320 # integer (or float; inferred from default or made explicit via the 321 # coerce_type setting) 322 if settings.get("coerce_type"): 323 value_type = settings["coerce_type"] 324 else: 325 value_type = type(settings.get("default")) 326 if value_type not in (int, float): 327 value_type = int 328 329 if "max" in settings: 330 try: 331 choice = min(settings["max"], value_type(choice)) 332 except (ValueError, TypeError): 333 if not silently_correct: 334 raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"])) 335 336 choice = settings.get("default") 337 338 if "min" in settings: 339 try: 340 choice = max(settings["min"], value_type(choice)) 341 except (ValueError, TypeError): 342 if not silently_correct: 343 raise QueryParametersException("Provide a value of %s or more." % str(settings["min"])) 344 345 choice = settings.get("default") 346 347 if choice is None or choice == "": 348 choice = settings.get("default") 349 350 if choice is None: 351 choice = 0 if "min" in settings or "max" in settings else "" 352 353 if settings.get("coerce_type"): 354 try: 355 return value_type(choice) 356 except (ValueError, TypeError): 357 return settings.get("default") 358 else: 359 return choice 360 361 else: 362 # no filtering 363 return choice
Filter user input
Makes sure user input for post-processors is valid and within the parameters specified by the post-processor
Parameters
- obj settings: Settings, including defaults and valid options
- choice: The chosen option, to be parsed
- dict other_input: Other input, as parsed so far
- bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns
Validated and parsed input