common.lib.helpers
Miscellaneous helper functions for the 4CAT backend
1""" 2Miscellaneous helper functions for the 4CAT backend 3""" 4import subprocess 5import imagehash 6import hashlib 7import requests 8import datetime 9import smtplib 10import fnmatch 11import socket 12import oslex 13import copy 14import time 15import json 16import math 17import ural 18import csv 19import ssl 20import re 21import os 22import io 23 24from pathlib import Path 25from collections.abc import MutableMapping 26from html.parser import HTMLParser 27from urllib.parse import urlparse, urlunparse 28from calendar import monthrange 29from packaging import version 30from PIL import Image 31 32from common.config_manager import CoreConfigManager 33from common.lib.user_input import UserInput 34__all__ = ("UserInput",) 35 36core_config = CoreConfigManager() 37 38def init_datasource(database, logger, queue, name, config): 39 """ 40 Initialize data source 41 42 Queues jobs to scrape the boards that were configured to be scraped in the 43 4CAT configuration file. If none were configured, nothing happens. 44 45 :param Database database: Database connection instance 46 :param Logger logger: Log handler 47 :param JobQueue queue: Job Queue instance 48 :param string name: ID of datasource that is being initialised 49 :param config: Configuration reader 50 """ 51 pass 52 53def get_datasource_example_keys(db, modules, dataset_type): 54 """ 55 Get example keys for a datasource 56 """ 57 from common.lib.dataset import DataSet 58 example_dataset_key = db.fetchone("SELECT key from datasets WHERE type = %s and is_finished = True and num_rows > 0 ORDER BY timestamp_finished DESC LIMIT 1", (dataset_type,)) 59 if example_dataset_key: 60 example_dataset = DataSet(db=db, key=example_dataset_key["key"], modules=modules) 61 return example_dataset.get_columns() 62 return [] 63 64def strip_tags(html, convert_newlines=True): 65 """ 66 Strip HTML from a string 67 68 :param html: HTML to strip 69 :param convert_newlines: Convert <br> and </p> tags to \n before stripping 70 :return: Stripped HTML 71 """ 72 if not html: 73 return "" 74 75 deduplicate_newlines = re.compile(r"\n+") 76 77 if convert_newlines: 78 html = html.replace("<br>", "\n").replace("</p>", "</p>\n") 79 html = deduplicate_newlines.sub("\n", html) 80 81 class HTMLStripper(HTMLParser): 82 def __init__(self): 83 super().__init__() 84 self.reset() 85 self.strict = False 86 self.convert_charrefs = True 87 self.fed = [] 88 89 def handle_data(self, data): 90 self.fed.append(data) 91 92 def get_data(self): 93 return "".join(self.fed) 94 95 stripper = HTMLStripper() 96 stripper.feed(html) 97 return stripper.get_data() 98 99 100def sniff_encoding(file): 101 """ 102 Determine encoding from raw file bytes 103 104 Currently only distinguishes UTF-8 and UTF-8 with BOM 105 106 :param file: 107 :return: 108 """ 109 if type(file) is bytearray: 110 maybe_bom = file[:3] 111 elif hasattr(file, "getbuffer"): 112 buffer = file.getbuffer() 113 maybe_bom = buffer[:3].tobytes() 114 elif hasattr(file, "peek"): 115 buffer = file.peek(32) 116 maybe_bom = buffer[:3] 117 else: 118 maybe_bom = False 119 120 return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8" 121 122def sniff_csv_dialect(csv_input): 123 """ 124 Determine CSV dialect for an input stream 125 126 :param csv_input: Input stream 127 :return tuple: Tuple: Dialect object and a boolean representing whether 128 the CSV file seems to have a header 129 """ 130 encoding = sniff_encoding(csv_input) 131 if type(csv_input) is io.TextIOWrapper: 132 wrapped_input = csv_input 133 else: 134 wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding) 135 wrapped_input.seek(0) 136 sample = wrapped_input.read(1024 * 1024) 137 wrapped_input.seek(0) 138 has_header = csv.Sniffer().has_header(sample) 139 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 140 141 return dialect, has_header 142 143 144def get_git_branch(): 145 """ 146 Get current git branch 147 148 If the 4CAT root folder is a git repository, this function will return the 149 name of the currently checked-out branch. If the folder is not a git 150 repository or git is not installed an empty string is returned. 151 """ 152 try: 153 root_dir = str(core_config.get('PATH_ROOT').resolve()) 154 branch = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE) 155 if branch.returncode != 0: 156 raise ValueError() 157 branch_name = branch.stdout.decode("utf-8").strip() 158 if not branch_name: 159 # Check for detached HEAD state 160 # Most likely occuring because of checking out release tags (which are not branches) or commits 161 head_status = subprocess.run(oslex.split(f"git -C {oslex.quote(root_dir)} status"), stdout=subprocess.PIPE) 162 if head_status.returncode == 0: 163 for line in head_status.stdout.decode("utf-8").split("\n"): 164 if any([detached_message in line for detached_message in ("HEAD detached from", "HEAD detached at")]): 165 branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1] 166 return branch_name.strip() 167 except (subprocess.SubprocessError, ValueError, FileNotFoundError): 168 return "" 169 170 171def get_software_commit(worker=None): 172 """ 173 Get current 4CAT git commit hash 174 175 Use `get_software_version()` instead if you need the release version 176 number rather than the precise commit hash. 177 178 If no version file is available, run `git show` to test if there is a git 179 repository in the 4CAT root folder, and if so, what commit is currently 180 checked out in it. 181 182 For extensions, get the repository information for that extension, or if 183 the extension is not a git repository, return empty data. 184 185 :param BasicWorker processor: Worker to get commit for. If not given, get 186 version information for the main 4CAT installation. 187 188 :return tuple: 4CAT git commit hash, repository name 189 """ 190 # try git command line within the 4CAT root folder 191 # if it is a checked-out git repository, it will tell us the hash of 192 # the currently checked-out commit 193 194 # path has no Path.relative()... 195 try: 196 # if extension, go to the extension file's path 197 # we will run git here - if it is not its own repository, we have no 198 # useful version info (since the extension is by definition not in the 199 # main 4CAT repository) and will return an empty value 200 if worker and worker.is_extension: 201 relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent 202 working_dir = str(core_config.get("PATH_ROOT").joinpath(relative_filepath).resolve()) 203 # check if we are in the extensions' own repo or 4CAT's 204 git_cmd = f"git -C {oslex.quote(working_dir)} rev-parse --show-toplevel" 205 repo_level = subprocess.run(oslex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 206 if Path(repo_level.stdout.decode("utf-8")) == core_config.get("PATH_ROOT"): 207 # not its own repository 208 return ("", "") 209 210 else: 211 working_dir = str(core_config.get("PATH_ROOT").resolve()) 212 213 show = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 214 if show.returncode != 0: 215 raise ValueError() 216 commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1] 217 218 # now get the repository the commit belongs to, if we can 219 origin = subprocess.run(oslex.split(f"git -C {oslex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 220 if origin.returncode != 0 or not origin.stdout: 221 raise ValueError() 222 repository = origin.stdout.decode("utf-8").strip() 223 if repository.endswith(".git"): 224 repository = repository[:-4] 225 226 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError): 227 return ("", "") 228 229 return (commit, repository) 230 231def get_software_version(): 232 """ 233 Get current 4CAT version 234 235 This is the actual software version, i.e. not the commit hash (see 236 `get_software_hash()` for that). The current version is stored in a file 237 with a canonical location: if the file doesn't exist, an empty string is 238 returned. 239 240 :return str: Software version, for example `1.37`. 241 """ 242 current_version_file = core_config.get("PATH_CONFIG").joinpath(".current-version") 243 if not current_version_file.exists(): 244 return "" 245 246 with current_version_file.open() as infile: 247 return infile.readline().strip() 248 249def get_github_version(repo_url, timeout=5): 250 """ 251 Get latest release tag version from GitHub 252 253 Will raise a ValueError if it cannot retrieve information from GitHub. 254 255 :param str repo_url: GitHub repository URL 256 :param int timeout: Timeout in seconds for HTTP request 257 258 :return tuple: Version, e.g. `1.26`, and release URL. 259 """ 260 if not repo_url.endswith("/"): 261 repo_url += "/" 262 263 repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url)) 264 265 api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id 266 response = requests.get(api_url, timeout=timeout) 267 response = response.json() 268 if response.get("message") == "Not Found": 269 raise ValueError("Invalid GitHub URL or repository name") 270 271 latest_tag = response.get("tag_name", "unknown") 272 if latest_tag.startswith("v"): 273 latest_tag = re.sub(r"^v", "", latest_tag) 274 275 return (latest_tag, response.get("html_url")) 276 277def get_ffmpeg_version(ffmpeg_path): 278 """ 279 Determine ffmpeg version 280 281 This can be necessary when using commands that change name between versions. 282 283 :param ffmpeg_path: ffmpeg executable path 284 :return packaging.version: Comparable ersion 285 """ 286 command = [ffmpeg_path, "-version"] 287 ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, 288 stderr=subprocess.PIPE) 289 290 ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1] 291 ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0] 292 293 return version.parse(ffmpeg_version) 294 295 296def find_extensions(): 297 """ 298 Find 4CAT extensions and load their metadata 299 300 Looks for subfolders of the extension folder, and loads additional metadata 301 where available. 302 303 :return tuple: A tuple with two items; the extensions, as an ID -> metadata 304 dictionary, and a list of (str) errors encountered while loading 305 """ 306 extension_path = core_config.get("PATH_EXTENSIONS") 307 errors = [] 308 if not extension_path.exists() or not extension_path.is_dir(): 309 return {}, errors 310 311 # each folder in the extensions folder is an extension 312 extensions = { 313 extension.name: { 314 "name": extension.name, 315 "version": "", 316 "url": "", 317 "git_url": "", 318 "is_git": False, 319 } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir() 320 } 321 322 # collect metadata for extensions 323 allowed_metadata_keys = ("name", "version", "url") 324 for extension in extensions: 325 extension_folder = extension_path.joinpath(extension) 326 metadata_file = extension_folder.joinpath("metadata.json") 327 if metadata_file.exists(): 328 with metadata_file.open() as infile: 329 try: 330 metadata = json.load(infile) 331 extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys}) 332 except (TypeError, ValueError) as e: 333 errors.append(f"Error reading metadata file for extension '{extension}' ({e})") 334 continue 335 336 extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists() 337 if extensions[extension]["is_git"]: 338 # try to get remote URL 339 try: 340 extension_root = str(extension_folder.resolve()) 341 origin = subprocess.run(oslex.split(f"git -C {oslex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE, 342 stdout=subprocess.PIPE) 343 if origin.returncode != 0 or not origin.stdout: 344 raise ValueError() 345 repository = origin.stdout.decode("utf-8").strip() 346 if repository.endswith(".git") and "github.com" in repository: 347 # use repo URL 348 repository = repository[:-4] 349 extensions[extension]["git_url"] = repository 350 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e: 351 print(e) 352 pass 353 354 return extensions, errors 355 356 357def convert_to_int(value, default=0): 358 """ 359 Convert a value to an integer, with a fallback 360 361 The fallback is used if an Error is thrown during converstion to int. 362 This is a convenience function, but beats putting try-catches everywhere 363 we're using user input as an integer. 364 365 :param value: Value to convert 366 :param int default: Default value, if conversion not possible 367 :return int: Converted value 368 """ 369 try: 370 return int(value) 371 except (ValueError, TypeError): 372 return default 373 374def convert_to_float(value, default=0, force=False) -> float: 375 """ 376 Convert a value to a floating point, with a fallback 377 378 The fallback is used if an Error is thrown during converstion to float. 379 This is a convenience function, but beats putting try-catches everywhere 380 we're using user input as a floating point number. 381 382 :param value: Value to convert 383 :param int default: Default value, if conversion not possible 384 :param force: Whether to force the value into a float if it is not empty or None. 385 :return float: Converted value 386 """ 387 if force: 388 return float(value) if value else default 389 try: 390 return float(value) 391 except (ValueError, TypeError): 392 return default 393 394 395def timify(number, short=False): 396 """ 397 Make a number look like an indication of time 398 399 :param number: Number to convert. If the number is larger than the current 400 UNIX timestamp, decrease by that amount 401 :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes` 402 """ 403 number = int(number) 404 405 components = [] 406 if number > time.time(): 407 number = time.time() - number 408 409 month_length = 30.42 * 86400 410 months = math.floor(number / month_length) 411 if months: 412 components.append(f"{months}{'mt' if short else ' month'}{'s' if months != 1 and not short else ''}") 413 number -= (months * month_length) 414 415 week_length = 7 * 86400 416 weeks = math.floor(number / week_length) 417 if weeks: 418 components.append(f"{weeks}{'w' if short else ' week'}{'s' if weeks != 1 and not short else ''}") 419 number -= (weeks * week_length) 420 421 day_length = 86400 422 days = math.floor(number / day_length) 423 if days: 424 components.append(f"{days}{'d' if short else ' day'}{'s' if days != 1 and not short else ''}") 425 number -= (days * day_length) 426 427 hour_length = 3600 428 hours = math.floor(number / hour_length) 429 if hours: 430 components.append(f"{hours}{'h' if short else ' hour'}{'s' if hours != 1 and not short else ''}") 431 number -= (hours * hour_length) 432 433 minute_length = 60 434 minutes = math.floor(number / minute_length) 435 if minutes: 436 components.append(f"{minutes}{'m' if short else ' minute'}{'s' if minutes != 1 and not short else ''}") 437 438 if not components: 439 components.append("less than a minute") 440 441 last_str = components.pop() 442 time_str = "" 443 if components: 444 time_str = ", ".join(components) 445 time_str += " and " 446 447 return time_str + last_str 448 449def nthify(integer: int) -> str: 450 """ 451 Takes an integer and returns a string with 'st', 'nd', 'rd', or 'th' as suffix, depending on the number. 452 """ 453 int_str = str(integer).strip() 454 if int_str.endswith("1"): 455 suffix = "st" 456 elif int_str.endswith("2"): 457 suffix = "nd" 458 elif int_str.endswith("3"): 459 suffix = "rd" 460 else: 461 suffix = "th" 462 return int_str + suffix 463 464def andify(items): 465 """ 466 Format a list of items for use in text 467 468 Returns a comma-separated list, the last item preceded by "and" 469 470 :param items: Iterable list 471 :return str: Formatted string 472 """ 473 474 items = items.copy() 475 476 if len(items) == 0: 477 return "" 478 elif len(items) == 1: 479 return str(items[1]) 480 481 result = f" and {items.pop()}" 482 return ", ".join([str(item) for item in items]) + result 483 484def ellipsiate(text, length, inside=False, ellipsis_str="…"): 485 if len(text) <= length: 486 return text 487 488 elif not inside: 489 return text[:length] + ellipsis_str 490 491 else: 492 # two cases: URLs and normal text 493 # for URLs, try to only ellipsiate after the domain name 494 # this makes the URLs easier to read when shortened 495 if ural.is_url(text): 496 pre_part = "/".join(text.split("/")[:3]) 497 if len(pre_part) < length - 6: # kind of arbitrary 498 before = len(pre_part) + 1 499 else: 500 before = math.floor(length / 2) 501 else: 502 before = math.floor(length / 2) 503 504 after = len(text) - before 505 return text[:before] + ellipsis_str + text[after:] 506 507def hash_file(image_file, hash_type="file-hash"): 508 """ 509 Generate an image hash 510 511 :param Path image_file: Image file to hash 512 :param str hash_type: Hash type, one of `file-hash`, `colorhash`, 513 `phash`, `average_hash`, `dhash` 514 :return str: Hexadecimal hash value 515 """ 516 if not image_file.exists(): 517 raise FileNotFoundError() 518 519 if hash_type == "file-hash": 520 hasher = hashlib.sha1() 521 522 # Open the file in binary mode 523 with image_file.open("rb") as infile: 524 # Read and update hash in chunks to handle large files 525 while chunk := infile.read(1024): 526 hasher.update(chunk) 527 528 return hasher.hexdigest() 529 530 elif hash_type in ("colorhash", "phash", "average_hash", "dhash"): 531 image = Image.open(image_file) 532 533 return str(getattr(imagehash, hash_type)(image)) 534 535 else: 536 raise NotImplementedError(f"Unknown hash type '{hash_type}'") 537 538def get_yt_compatible_ids(yt_ids): 539 """ 540 :param yt_ids list, a list of strings 541 :returns list, a ist of joined strings in pairs of 50 542 543 Takes a list of IDs and returns list of joined strings 544 in pairs of fifty. This should be done for the YouTube API 545 that requires a comma-separated string and can only return 546 max fifty results. 547 """ 548 549 # If there's only one item, return a single list item 550 if isinstance(yt_ids, str): 551 return [yt_ids] 552 553 ids = [] 554 last_i = 0 555 for i, yt_id in enumerate(yt_ids): 556 557 # Add a joined string per fifty videos 558 if i % 50 == 0 and i != 0: 559 ids_string = ",".join(yt_ids[last_i:i]) 560 ids.append(ids_string) 561 last_i = i 562 563 # If the end of the list is reached, add the last data 564 elif i == (len(yt_ids) - 1): 565 ids_string = ",".join(yt_ids[last_i:i]) 566 ids.append(ids_string) 567 568 return ids 569 570 571def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None, 572 fontsize_small=None, fontsize_large=None): 573 """ 574 Get a standard SVG canvas to draw 4CAT graphs to 575 576 Adds a border, footer, header, and some basic text styling 577 578 :param path: The path where the SVG graph will be saved 579 :param width: Width of the canvas 580 :param height: Height of the canvas 581 :param header: Header, if necessary to draw 582 :param footer: Footer text, if necessary to draw. Defaults to shameless 583 4CAT advertisement. 584 :param fontsize_normal: Font size of normal text 585 :param fontsize_small: Font size of small text (e.g. footer) 586 :param fontsize_large: Font size of large text (e.g. header) 587 :return SVG: SVG canvas (via svgwrite) that can be drawn to 588 """ 589 from svgwrite.container import SVG, Hyperlink 590 from svgwrite.drawing import Drawing 591 from svgwrite.shapes import Rect 592 from svgwrite.text import Text 593 594 if fontsize_normal is None: 595 fontsize_normal = width / 75 596 597 if fontsize_small is None: 598 fontsize_small = width / 100 599 600 if fontsize_large is None: 601 fontsize_large = width / 50 602 603 # instantiate with border and white background 604 canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal) 605 canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF")) 606 607 # header 608 if header: 609 header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2)) 610 header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 611 header_shape.add( 612 Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 613 style="font-size:%ipx" % fontsize_large)) 614 canvas.add(header_shape) 615 616 # footer (i.e. 4cat banner) 617 if footer: 618 footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2) 619 footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize) 620 footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 621 link = Hyperlink(href="https://4cat.nl") 622 link.add( 623 Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 624 style="font-size:%ipx" % fontsize_small)) 625 footer_shape.add(link) 626 canvas.add(footer_shape) 627 628 return canvas 629 630 631def call_api(action, payload=None, wait_for_response=True): 632 """ 633 Send message to server 634 635 Calls the internal API and returns interpreted response. "status" is always 636 None if wait_for_response is False. 637 638 :param str action: API action 639 :param payload: API payload 640 :param bool wait_for_response: Wait for response? If not close connection 641 immediately after sending data. 642 643 :return: API response {"status": "success"|"error", "response": response, "error": error} 644 """ 645 connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 646 connection.settimeout(15) 647 config = CoreConfigManager() 648 try: 649 connection.connect((config.get('API_HOST'), config.get('API_PORT'))) 650 except ConnectionRefusedError: 651 return {"status": "error", "error": "Connection refused"} 652 653 msg = json.dumps({"request": action, "payload": payload}) 654 connection.sendall(msg.encode("ascii", "ignore")) 655 656 response_data = { 657 "status": None, 658 "response": None, 659 "error": None 660 } 661 662 if wait_for_response: 663 try: 664 response = "" 665 while True: 666 bytes = connection.recv(2048) 667 if not bytes: 668 break 669 670 response += bytes.decode("ascii", "ignore") 671 except (socket.timeout, TimeoutError): 672 response_data["status"] = "error" 673 response_data["error"] = "Connection timed out" 674 675 try: 676 connection.shutdown(socket.SHUT_RDWR) 677 except OSError: 678 # already shut down automatically 679 pass 680 connection.close() 681 682 if wait_for_response: 683 try: 684 json_response = json.loads(response) 685 response_data["response"] = json_response["response"] 686 response_data["error"] = json_response.get("error", None) 687 response_data["status"] = "error" if json_response.get("error") else "success" 688 except json.JSONDecodeError: 689 response_data["status"] = "error" 690 response_data["error"] = "Invalid JSON response" 691 response_data["response"] = response 692 693 return response_data 694 695def get_interval_descriptor(item, interval, item_column="timestamp"): 696 """ 697 Get interval descriptor based on timestamp 698 699 :param dict item: Item to generate descriptor for, should have a 700 "timestamp" key 701 :param str interval: Interval, one of "all", "overall", "year", 702 "month", "week", "day" 703 :param str item_column: Column name in the item dictionary that contains 704 the timestamp. Defaults to "timestamp". 705 :return str: Interval descriptor, e.g. "overall", "unknown_date", "2020", "2020-08", 706 "2020-43", "2020-08-01" 707 """ 708 if interval in ("all", "overall"): 709 return interval 710 711 if not item.get(item_column, None): 712 return "unknown_date" 713 714 # Catch cases where a custom timestamp has an epoch integer as value. 715 try: 716 timestamp = int(item[item_column]) 717 try: 718 timestamp = datetime.datetime.fromtimestamp(timestamp) 719 except (ValueError, TypeError): 720 raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"])) 721 except (TypeError, ValueError): 722 try: 723 timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S") 724 except (ValueError, TypeError): 725 raise ValueError("Invalid date '%s'" % str(item["timestamp"])) 726 727 if interval == "year": 728 return str(timestamp.year) 729 elif interval == "month": 730 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) 731 elif interval == "week": 732 return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2) 733 elif interval == "hour": 734 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 735 2) + " " + str(timestamp.hour).zfill(2) 736 elif interval == "minute": 737 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 738 2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2) 739 else: 740 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2) 741 742 743def pad_interval(intervals, first_interval=None, last_interval=None): 744 """ 745 Pad an interval so all intermediate intervals are filled 746 747 Warning, ugly code (PRs very welcome) 748 749 :param dict intervals: A dictionary, with dates (YYYY{-MM}{-DD}) as keys 750 and a numerical value. 751 :param first_interval: 752 :param last_interval: 753 :return: 754 """ 755 missing = 0 756 try: 757 test_key = list(intervals.keys())[0] 758 except IndexError: 759 return 0, {} 760 761 # first determine the boundaries of the interval 762 # these may be passed as parameters, or they can be inferred from the 763 # interval given 764 if first_interval: 765 first_interval = str(first_interval) 766 first_year = int(first_interval[0:4]) 767 if len(first_interval) > 4: 768 first_month = int(first_interval[5:7]) 769 if len(first_interval) > 7: 770 first_day = int(first_interval[8:10]) 771 if len(first_interval) > 10: 772 first_hour = int(first_interval[11:13]) 773 if len(first_interval) > 13: 774 first_minute = int(first_interval[14:16]) 775 776 else: 777 first_year = min([int(i[0:4]) for i in intervals]) 778 if len(test_key) > 4: 779 first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year]) 780 if len(test_key) > 7: 781 first_day = min( 782 [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month]) 783 if len(test_key) > 10: 784 first_hour = min( 785 [int(i[11:13]) for i in intervals if 786 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day]) 787 if len(test_key) > 13: 788 first_minute = min( 789 [int(i[14:16]) for i in intervals if 790 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int( 791 i[11:13]) == first_hour]) 792 793 if last_interval: 794 last_interval = str(last_interval) 795 last_year = int(last_interval[0:4]) 796 if len(last_interval) > 4: 797 last_month = int(last_interval[5:7]) 798 if len(last_interval) > 7: 799 last_day = int(last_interval[8:10]) 800 if len(last_interval) > 10: 801 last_hour = int(last_interval[11:13]) 802 if len(last_interval) > 13: 803 last_minute = int(last_interval[14:16]) 804 else: 805 last_year = max([int(i[0:4]) for i in intervals]) 806 if len(test_key) > 4: 807 last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year]) 808 if len(test_key) > 7: 809 last_day = max( 810 [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month]) 811 if len(test_key) > 10: 812 last_hour = max( 813 [int(i[11:13]) for i in intervals if 814 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day]) 815 if len(test_key) > 13: 816 last_minute = max( 817 [int(i[14:16]) for i in intervals if 818 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int( 819 i[11:13]) == last_hour]) 820 821 has_month = re.match(r"^[0-9]{4}-[0-9]", test_key) 822 has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key) 823 has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key) 824 has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key) 825 826 all_intervals = [] 827 for year in range(first_year, last_year + 1): 828 year_interval = str(year) 829 830 if not has_month: 831 all_intervals.append(year_interval) 832 continue 833 834 start_month = first_month if year == first_year else 1 835 end_month = last_month if year == last_year else 12 836 for month in range(start_month, end_month + 1): 837 month_interval = year_interval + "-" + str(month).zfill(2) 838 839 if not has_day: 840 all_intervals.append(month_interval) 841 continue 842 843 start_day = first_day if all((year == first_year, month == first_month)) else 1 844 end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1] 845 for day in range(start_day, end_day + 1): 846 day_interval = month_interval + "-" + str(day).zfill(2) 847 848 if not has_hour: 849 all_intervals.append(day_interval) 850 continue 851 852 start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0 853 end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23 854 for hour in range(start_hour, end_hour + 1): 855 hour_interval = day_interval + " " + str(hour).zfill(2) 856 857 if not has_minute: 858 all_intervals.append(hour_interval) 859 continue 860 861 start_minute = first_minute if all( 862 (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0 863 end_minute = last_minute if all( 864 (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59 865 866 for minute in range(start_minute, end_minute + 1): 867 minute_interval = hour_interval + ":" + str(minute).zfill(2) 868 all_intervals.append(minute_interval) 869 870 for interval in all_intervals: 871 if interval not in intervals: 872 intervals[interval] = 0 873 missing += 1 874 875 # sort while we're at it 876 intervals = {key: intervals[key] for key in sorted(intervals)} 877 878 return missing, intervals 879 880 881def remove_nuls(value): 882 """ 883 Remove \0 from a value 884 885 The CSV library cries about a null byte when it encounters one :( :( :( 886 poor little csv cannot handle a tiny little null byte 887 888 So remove them from the data because they should not occur in utf-8 data 889 anyway. 890 891 :param value: Value to remove nulls from. For dictionaries, sets, tuples 892 and lists all items are parsed recursively. 893 :return value: Cleaned value 894 """ 895 if type(value) is dict: 896 for field in value: 897 value[field] = remove_nuls(value[field]) 898 elif type(value) is list: 899 value = [remove_nuls(item) for item in value] 900 elif type(value) is tuple: 901 value = tuple([remove_nuls(item) for item in value]) 902 elif type(value) is set: 903 value = set([remove_nuls(item) for item in value]) 904 elif type(value) is str: 905 value = value.replace("\0", "") 906 907 return value 908 909 910class NullAwareTextIOWrapper(io.TextIOWrapper): 911 """ 912 TextIOWrapper that skips null bytes 913 914 This can be used as a file reader that silently discards any null bytes it 915 encounters. 916 """ 917 918 def __next__(self): 919 value = super().__next__() 920 return remove_nuls(value) 921 922 923class HashCache: 924 """ 925 Simple cache handler to cache hashed values 926 927 Avoids having to calculate a hash for values that have been hashed before 928 """ 929 930 def __init__(self, hasher): 931 self.hash_cache = {} 932 self.hasher = hasher 933 934 def update_cache(self, value): 935 """ 936 Checks the hash_cache to see if the value has been cached previously, 937 updates the hash_cache if needed, and returns the hashed value. 938 """ 939 # value = str(value) 940 if value not in self.hash_cache: 941 author_hasher = self.hasher.copy() 942 author_hasher.update(str(value).encode("utf-8")) 943 self.hash_cache[value] = author_hasher.hexdigest() 944 del author_hasher 945 return self.hash_cache[value] 946 947 948def dict_search_and_update(item, keyword_matches, function): 949 """ 950 Filter fields in an object recursively 951 952 Apply a function to every item and sub item of a dictionary if the key 953 contains one of the provided match terms. 954 955 Function loops through a dictionary or list and compares dictionary keys to 956 the strings defined by keyword_matches. It then applies the change_function 957 to corresponding values. 958 959 Note: if a matching term is found, all nested values will have the function 960 applied to them. e.g., all these values would be changed even those with 961 not_key_match: 962 963 {'key_match' : 'changed', 964 'also_key_match' : {'not_key_match' : 'but_value_still_changed'}, 965 'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]} 966 967 This is a comprehensive (and expensive) approach to updating a dictionary. 968 IF a dictionary structure is known, a better solution would be to update 969 using specific keys. 970 971 :param Dict/List item: dictionary/list/json to loop through 972 :param String keyword_matches: list of strings that will be matched to 973 dictionary keys. Can contain wildcards which are matched using fnmatch. 974 :param Function function: function appled to all values of any items 975 nested under a matching key 976 977 :return Dict/List: Copy of original item, but filtered 978 """ 979 980 def loop_helper_function(d_or_l, match_terms, change_function): 981 """ 982 Recursive helper function that updates item in place 983 """ 984 if isinstance(d_or_l, dict): 985 # Iterate through dictionary 986 for key, value in iter(d_or_l.items()): 987 if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]): 988 # Match found; apply function to all items and sub-items 989 if isinstance(value, (list, dict)): 990 # Pass item through again with match_terms = True 991 loop_helper_function(value, 'True', change_function) 992 elif value is None: 993 pass 994 else: 995 # Update the value 996 d_or_l[key] = change_function(value) 997 elif isinstance(value, (list, dict)): 998 # Continue search 999 loop_helper_function(value, match_terms, change_function) 1000 elif isinstance(d_or_l, list): 1001 # Iterate through list 1002 for n, value in enumerate(d_or_l): 1003 if isinstance(value, (list, dict)): 1004 # Continue search 1005 loop_helper_function(value, match_terms, change_function) 1006 elif match_terms == 'True': 1007 # List item nested in matching 1008 d_or_l[n] = change_function(value) 1009 else: 1010 raise Exception('Must pass list or dictionary') 1011 1012 # Lowercase keyword_matches 1013 keyword_matches = [keyword.lower() for keyword in keyword_matches] 1014 1015 # Create deepcopy and return new item 1016 temp_item = copy.deepcopy(item) 1017 loop_helper_function(temp_item, keyword_matches, function) 1018 return temp_item 1019 1020 1021def get_last_line(filepath): 1022 """ 1023 Seeks from end of file for '\n' and returns that line 1024 1025 :param str filepath: path to file 1026 :return str: last line of file 1027 """ 1028 with open(filepath, "rb") as file: 1029 try: 1030 # start at the end of file 1031 file.seek(-2, os.SEEK_END) 1032 # check if NOT endline i.e. '\n' 1033 while file.read(1) != b'\n': 1034 # if not '\n', back up two characters and check again 1035 file.seek(-2, os.SEEK_CUR) 1036 except OSError: 1037 file.seek(0) 1038 last_line = file.readline().decode() 1039 return last_line 1040 1041 1042def add_notification(db, user, notification, expires=None, allow_dismiss=True): 1043 db.insert("users_notifications", { 1044 "username": user, 1045 "notification": notification, 1046 "timestamp_expires": expires, 1047 "allow_dismiss": allow_dismiss 1048 }, safe=True) 1049 1050 1051def send_email(recipient, message, mail_config): 1052 """ 1053 Send an e-mail using the configured SMTP settings 1054 1055 Just a thin wrapper around smtplib, so we don't have to repeat ourselves. 1056 Exceptions are to be handled outside the function. 1057 1058 :param list recipient: Recipient e-mail addresses 1059 :param MIMEMultipart message: Message to send 1060 :param mail_config: Configuration reader 1061 """ 1062 # Create a secure SSL context 1063 context = ssl.create_default_context() 1064 1065 # Decide which connection type 1066 with smtplib.SMTP_SSL(mail_config.get('mail.server'), port=mail_config.get('mail.port', 0), context=context) if mail_config.get( 1067 'mail.ssl') == 'ssl' else smtplib.SMTP(mail_config.get('mail.server'), 1068 port=mail_config.get('mail.port', 0)) as server: 1069 if mail_config.get('mail.ssl') == 'tls': 1070 # smtplib.SMTP adds TLS context here 1071 server.starttls(context=context) 1072 1073 # Log in 1074 if mail_config.get('mail.username') and mail_config.get('mail.password'): 1075 server.ehlo() 1076 server.login(mail_config.get('mail.username'), mail_config.get('mail.password')) 1077 1078 # Send message 1079 if type(message) is str: 1080 server.sendmail(mail_config.get('mail.noreply'), recipient, message) 1081 else: 1082 server.sendmail(mail_config.get('mail.noreply'), recipient, message.as_string()) 1083 1084 1085def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'): 1086 """ 1087 Return a flattened dictionary where nested dictionary objects are given new 1088 keys using the parent key combined using the seperator with the child key. 1089 1090 Lists will be converted to json strings via json.dumps() 1091 1092 :param MutableMapping d: Dictionary like object 1093 :param str parent_key: The original parent key prepending future nested keys 1094 :param str sep: A seperator string used to combine parent and child keys 1095 :return dict: A new dictionary with the no nested values 1096 """ 1097 1098 def _flatten_dict_gen(d, parent_key, sep): 1099 for k, v in d.items(): 1100 new_key = parent_key + sep + k if parent_key else k 1101 if isinstance(v, MutableMapping): 1102 yield from flatten_dict(v, new_key, sep=sep).items() 1103 elif isinstance(v, (list, set)): 1104 yield new_key, json.dumps( 1105 [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v]) 1106 else: 1107 yield new_key, v 1108 1109 return dict(_flatten_dict_gen(d, parent_key, sep)) 1110 1111 1112def sets_to_lists(d: MutableMapping): 1113 """ 1114 Return a dictionary where all nested sets have been converted to lists. 1115 1116 :param MutableMapping d: Dictionary like object 1117 :return dict: A new dictionary with the no nested sets 1118 """ 1119 1120 def _check_list(lst): 1121 return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, ( 1122 set, list)) else item for item in lst] 1123 1124 def _sets_to_lists_gen(d): 1125 for k, v in d.items(): 1126 if isinstance(v, MutableMapping): 1127 yield k, sets_to_lists(v) 1128 elif isinstance(v, (list, set)): 1129 yield k, _check_list(v) 1130 else: 1131 yield k, v 1132 1133 return dict(_sets_to_lists_gen(d)) 1134 1135 1136def url_to_hash(url, remove_scheme=True, remove_www=True): 1137 """ 1138 Convert a URL to a hash. Allows removing scheme and www prefix before hashing. 1139 1140 :param url: URL to hash 1141 :param remove_scheme: If True, removes the scheme from URL before hashing 1142 :param remove_www: If True, removes the www. prefix from URL before hashing 1143 :return: Hash of the URL 1144 """ 1145 parsed_url = urlparse(url.lower()) 1146 if parsed_url: 1147 if remove_scheme: 1148 parsed_url = parsed_url._replace(scheme="") 1149 if remove_www: 1150 netloc = re.sub(r"^www\.", "", parsed_url.netloc) 1151 parsed_url = parsed_url._replace(netloc=netloc) 1152 1153 # Hash the normalized URL directly 1154 normalized_url = urlunparse(parsed_url).strip("/") 1155 else: 1156 # Unable to parse URL; use regex normalization 1157 normalized_url = url.lower().strip("/") 1158 if remove_scheme: 1159 normalized_url = re.sub(r"^https?://", "", normalized_url) 1160 if remove_www: 1161 if not remove_scheme: 1162 scheme_match = re.match(r"^https?://", normalized_url) 1163 if scheme_match: 1164 scheme = scheme_match.group() 1165 temp_url = re.sub(r"^https?://", "", normalized_url) 1166 normalized_url = scheme + re.sub(r"^www\.", "", temp_url) 1167 else: 1168 normalized_url = re.sub(r"^www\.", "", normalized_url) 1169 1170 return hashlib.blake2b(normalized_url.encode("utf-8"), digest_size=24).hexdigest() 1171 1172def url_to_filename(url, staging_area=None, default_name="file", default_ext=".png", max_bytes=255, existing_filenames=None): 1173 """ 1174 Determine filenames for saved files 1175 1176 Prefer the original filename (extracted from the URL), but this may not 1177 always be possible or be an actual filename. Also, avoid using the same 1178 filename multiple times. Ensures filenames don't exceed max_bytes. 1179 1180 Note: Collision possible without staging area (used to check for already 1181 existing filenames). 1182 1183 :param str url: URLs to determine filenames for 1184 :param Path staging_area: Path to the staging area where files are saved 1185 (to avoid collisions); if None, no collision avoidance is done. 1186 :param str default_name: Default name to use if no filename can be 1187 extracted from the URL 1188 :param str default_ext: Default extension to use if no filename can be 1189 extracted from the URL 1190 :param int max_bytes: Maximum number of bytes for the filename 1191 :return str: Suitable file name 1192 """ 1193 clean_filename = url.split("/")[-1].split("?")[0].split("#")[0] 1194 if re.match(r"[^.]+\.[a-zA-Z0-9]{1,10}", clean_filename): 1195 base_filename = clean_filename 1196 else: 1197 base_filename = default_name + default_ext 1198 1199 if not existing_filenames: 1200 existing_filenames = [] 1201 1202 # Split base filename into name and extension 1203 if '.' in base_filename: 1204 name_part, ext_part = base_filename.rsplit('.', 1) 1205 ext_part = '.' + ext_part 1206 else: 1207 name_part = base_filename 1208 ext_part = '' 1209 1210 # Truncate base filename if it exceeds max_bytes 1211 if len(base_filename.encode('utf-8')) > max_bytes: 1212 # Reserve space for extension 1213 available_bytes = max_bytes - len(ext_part.encode('utf-8')) 1214 if available_bytes <= 0: 1215 # If extension is too long, use minimal name 1216 name_part = default_name 1217 ext_part = default_ext 1218 available_bytes = max_bytes - len(ext_part.encode('utf-8')) 1219 1220 # Truncate name part to fit 1221 name_bytes = name_part.encode('utf-8') 1222 if len(name_bytes) > available_bytes: 1223 # Truncate byte by byte to ensure valid UTF-8 1224 while len(name_bytes) > available_bytes: 1225 name_part = name_part[:-1] 1226 name_bytes = name_part.encode('utf-8') 1227 1228 base_filename = name_part + ext_part 1229 1230 filename = base_filename 1231 1232 if staging_area: 1233 # Ensure the filename is unique in the staging area 1234 file_path = staging_area.joinpath(filename) 1235 file_index = 1 1236 1237 while file_path.exists() or filename in existing_filenames: 1238 # Calculate space needed for index suffix 1239 index_suffix = f"-{file_index}" 1240 1241 # Check if filename with index would exceed max_bytes 1242 test_filename = name_part + index_suffix + ext_part 1243 if len(test_filename.encode('utf-8')) > max_bytes: 1244 # Need to truncate name_part to make room for index 1245 available_bytes = max_bytes - len((index_suffix + ext_part).encode('utf-8')) 1246 if available_bytes <= 0: 1247 # Extreme case - use minimal name 1248 truncated_name = "f" 1249 else: 1250 # Truncate name_part to fit 1251 truncated_name = name_part 1252 name_bytes = truncated_name.encode('utf-8') 1253 while len(name_bytes) > available_bytes: 1254 truncated_name = truncated_name[:-1] 1255 name_bytes = truncated_name.encode('utf-8') 1256 1257 filename = truncated_name + index_suffix + ext_part 1258 else: 1259 filename = test_filename 1260 1261 file_index += 1 1262 file_path = staging_area.joinpath(filename) 1263 1264 return filename 1265 1266 1267def split_urls(url_string, allowed_schemes=None): 1268 """ 1269 Split URL text by \n and commas. 1270 1271 4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs 1272 and also check for commas within URLs using schemes. 1273 1274 Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com); 1275 this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail). 1276 """ 1277 if allowed_schemes is None: 1278 allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://') 1279 potential_urls = [] 1280 # Split the text by \n 1281 for line in url_string.split('\n'): 1282 # Handle commas that may exist within URLs 1283 parts = line.split(',') 1284 recombined_url = "" 1285 for part in parts: 1286 if part.startswith(allowed_schemes): # Other schemes exist 1287 # New URL start detected 1288 if recombined_url: 1289 # Already have a URL, add to list 1290 potential_urls.append(recombined_url) 1291 # Start new URL 1292 recombined_url = part 1293 elif part: 1294 if recombined_url: 1295 # Add to existing URL 1296 recombined_url += "," + part 1297 else: 1298 # No existing URL, start new 1299 recombined_url = part 1300 else: 1301 # Ignore empty strings 1302 pass 1303 if recombined_url: 1304 # Add any remaining URL 1305 potential_urls.append(recombined_url) 1306 return potential_urls 1307 1308 1309def folder_size(path='.'): 1310 """ 1311 Get the size of a folder using os.scandir for efficiency 1312 """ 1313 total = 0 1314 for entry in os.scandir(path): 1315 if entry.is_file(): 1316 total += entry.stat().st_size 1317 elif entry.is_dir(): 1318 total += folder_size(entry.path) 1319 return total 1320 1321def hash_to_md5(string: str) -> str: 1322 """ 1323 Hash a string with an md5 hash. 1324 """ 1325 return hashlib.md5(string.encode("utf-8")).hexdigest()
16class UserInput: 17 """ 18 Class for handling user input 19 20 It is important to sanitise user input, as carelessly entered parameters 21 may in e.g. requesting far more data than needed, or lead to undefined 22 behaviour. This class offers a set of pre-defined value types that can be 23 consistently rendered as form elements in an interface and parsed. 24 """ 25 OPTION_TOGGLE = "toggle" # boolean toggle (checkbox) 26 OPTION_CHOICE = "choice" # one choice out of a list (select) 27 OPTION_TEXT = "string" # simple string or integer (input text) 28 OPTION_MULTI = "multi" # multiple values out of a list (select multiple) 29 OPTION_MULTI_SELECT = "multi_select" # multiple values out of a dropdown list (select multiple) 30 OPTION_INFO = "info" # just a bit of text, not actual input 31 OPTION_TEXT_LARGE = "textarea" # longer text 32 OPTION_TEXT_JSON = "json" # text, but should be valid JSON 33 OPTION_DATE = "date" # a single date 34 OPTION_DATERANGE = "daterange" # a beginning and end date 35 OPTION_DIVIDER = "divider" # meta-option, divides related sets of options 36 OPTION_FILE = "file" # file upload 37 OPTION_HUE = "hue" # colour hue 38 OPTION_DATASOURCES = "datasources" # data source toggling 39 OPTION_EXTENSIONS = "extensions" # extension toggling 40 OPTION_DATASOURCES_TABLE = "datasources_table" # a table with settings per data source 41 OPTION_ANNOTATION = "annotation" # checkbox for whether to an annotation 42 OPTION_ANNOTATIONS = "annotations" # table for whether to write multiple annotations 43 44 OPTIONS_COSMETIC = (OPTION_INFO, OPTION_DIVIDER) 45 46 @staticmethod 47 def parse_all(options, input, silently_correct=True): 48 """ 49 Parse form input for the provided options 50 51 Ignores all input not belonging to any of the defined options: parses 52 and sanitises the rest, and returns a dictionary with the sanitised 53 options. If an option is *not* present in the input, the default value 54 is used, and if that is absent, `None`. 55 56 In other words, this ensures a dictionary with 1) only white-listed 57 keys, 2) a value of an expected type for each key. 58 59 :param dict options: Options, as a name -> settings dictionary 60 :param dict input: Input, as a form field -> value dictionary 61 :param bool silently_correct: If true, replace invalid values with the 62 given default value; else, raise a QueryParametersException if a value 63 is invalid. 64 65 :return dict: Sanitised form input 66 """ 67 68 from common.lib.helpers import convert_to_int 69 parsed_input = {} 70 71 if type(input) is not dict and type(input) is not ImmutableMultiDict: 72 raise TypeError("input must be a dictionary or ImmutableMultiDict") 73 74 if type(input) is ImmutableMultiDict: 75 # we are not using to_dict, because that messes up multi-selects 76 input = {key: input.getlist(key) for key in input} 77 for key, value in input.items(): 78 if type(value) is list and len(value) == 1: 79 input[key] = value[0] 80 81 # all parameters are submitted as option-[parameter ID], this is an 82 # artifact of how the web interface works and we can simply remove the 83 # prefix 84 input = {re.sub(r"^option-", "", field): input[field] for field in input} 85 86 # re-order input so that the fields relying on the value of other 87 # fields are parsed last 88 options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)} 89 90 for option, settings in options.items(): 91 if settings.get("indirect"): 92 # these are settings that are derived from and set by other 93 # settings 94 continue 95 96 if settings.get("type") in UserInput.OPTIONS_COSMETIC: 97 # these are structural form elements and never have a value 98 continue 99 100 elif settings.get("type") == UserInput.OPTION_DATERANGE: 101 # special case, since it combines two inputs 102 option_min = option + "-min" 103 option_max = option + "-max" 104 105 # normally this is taken care of client-side, but in case this 106 # didn't work, try to salvage it server-side 107 if option_min not in input or input.get(option_min) == "-1": 108 option_min += "_proxy" 109 110 if option_max not in input or input.get(option_max) == "-1": 111 option_max += "_proxy" 112 113 # save as a tuple of unix timestamps (or None) 114 try: 115 after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct)) 116 117 if before and after and after > before: 118 if not silently_correct: 119 raise QueryParametersException("End of date range must be after beginning of date range.") 120 else: 121 before = after 122 123 parsed_input[option] = (after, before) 124 except RequirementsNotMetException: 125 pass 126 127 elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 128 # special case too, since if a checkbox is unchecked, it simply 129 # does not show up in the input 130 try: 131 if option in input: 132 # Toggle needs to be parsed 133 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 134 else: 135 # Toggle was left blank 136 parsed_input[option] = False 137 except RequirementsNotMetException: 138 pass 139 140 elif settings.get("type") == UserInput.OPTION_DATASOURCES: 141 # special case, because this combines multiple inputs to 142 # configure data source availability and expiration 143 datasources = {datasource: { 144 "enabled": f"{option}-enable-{datasource}" in input, 145 "allow_optout": f"{option}-optout-{datasource}" in input, 146 "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0) 147 } for datasource in input[option].split(",")} 148 149 parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]] 150 parsed_input[option.split(".")[0] + ".expiration"] = datasources 151 152 elif settings.get("type") == UserInput.OPTION_EXTENSIONS: 153 # also a special case 154 parsed_input[option] = {extension: { 155 "enabled": f"{option}-enable-{extension}" in input 156 } for extension in input[option].split(",")} 157 158 elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE: 159 # special case, parse table values to generate a dict 160 columns = list(settings["columns"].keys()) 161 table_input = {} 162 163 for datasource in list(settings["default"].keys()): 164 table_input[datasource] = {} 165 for column in columns: 166 167 choice = input.get(option + "-" + datasource + "-" + column, False) 168 column_settings = settings["columns"][column] # sub-settings per column 169 table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True) 170 171 parsed_input[option] = table_input 172 173 elif option not in input: 174 # not provided? use default 175 parsed_input[option] = settings.get("default", None) 176 177 else: 178 # normal parsing and sanitisation 179 try: 180 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 181 except RequirementsNotMetException: 182 pass 183 184 return parsed_input 185 186 @staticmethod 187 def parse_value(settings, choice, other_input=None, silently_correct=True): 188 """ 189 Filter user input 190 191 Makes sure user input for post-processors is valid and within the 192 parameters specified by the post-processor 193 194 :param obj settings: Settings, including defaults and valid options 195 :param choice: The chosen option, to be parsed 196 :param dict other_input: Other input, as parsed so far 197 :param bool silently_correct: If true, replace invalid values with the 198 given default value; else, raise a QueryParametersException if a value 199 is invalid. 200 201 :return: Validated and parsed input 202 """ 203 # short-circuit if there is a requirement for the field to be parsed 204 # and the requirement isn't met 205 if settings.get("requires"): 206 try: 207 field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0] 208 except IndexError: 209 # invalid condition, interpret as 'does the field with this name have a value' 210 field, operator, value = (choice, "!=", "") 211 212 if field not in other_input: 213 raise RequirementsNotMetException() 214 215 other_value = other_input.get(field) 216 if type(other_value) is bool: 217 # evalues to a boolean, i.e. checkboxes etc 218 if operator == "!=": 219 if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")): 220 raise RequirementsNotMetException() 221 else: 222 if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")): 223 raise RequirementsNotMetException() 224 225 else: 226 if type(other_value) in (tuple, list): 227 # iterables are a bit special 228 if len(other_value) == 1: 229 # treat one-item lists as "normal" values 230 other_value = other_value[0] 231 elif operator == "~=": # interpret as 'is in list?' 232 if value not in other_value: 233 raise RequirementsNotMetException() 234 else: 235 # condition doesn't make sense for a list, so assume it's not True 236 raise RequirementsNotMetException() 237 238 if operator == "^=" and not str(other_value).startswith(value): 239 raise RequirementsNotMetException() 240 elif operator == "$=" and not str(other_value).endswith(value): 241 raise RequirementsNotMetException() 242 elif operator == "~=" and value not in str(other_value): 243 raise RequirementsNotMetException() 244 elif operator == "!=" and value == other_value: 245 raise RequirementsNotMetException() 246 elif operator in ("==", "=") and value != other_value: 247 raise RequirementsNotMetException() 248 249 input_type = settings.get("type", "") 250 if input_type in UserInput.OPTIONS_COSMETIC: 251 # these are structural form elements and can never return a value 252 return None 253 254 elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 255 # simple boolean toggle 256 if type(choice) is bool: 257 return choice 258 elif choice in ['false', 'False']: 259 # Sanitized options passed back to Flask can be converted to strings as 'false' 260 return False 261 elif choice in ['true', 'True', 'on']: 262 # Toggle will have value 'on', but may also becomes a string 'true' 263 return True 264 else: 265 raise QueryParametersException("Toggle invalid input") 266 267 elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE): 268 # parse either integers (unix timestamps) or try to guess the date 269 # format (the latter may be used for input if JavaScript is turned 270 # off in the front-end and the input comes from there) 271 value = None 272 try: 273 value = int(choice) 274 except ValueError: 275 parsed_choice = parse_datetime(choice) 276 value = int(parsed_choice.timestamp()) 277 finally: 278 return value 279 280 elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS): 281 # any number of values out of a list of possible values 282 # comma-separated during input, returned as a list of valid options 283 if not choice: 284 return settings.get("default", []) 285 286 chosen = choice.split(",") 287 return [item for item in chosen if item in settings.get("options", [])] 288 289 elif input_type == UserInput.OPTION_MULTI_SELECT: 290 # multiple number of values out of a dropdown list of possible values 291 # comma-separated during input, returned as a list of valid options 292 if not choice: 293 return settings.get("default", []) 294 295 if type(choice) is str: 296 # should be a list if the form control was actually a multiselect 297 # but we have some client side UI helpers that may produce a string 298 # instead 299 choice = choice.split(",") 300 301 return [item for item in choice if item in settings.get("options", [])] 302 303 elif input_type == UserInput.OPTION_CHOICE: 304 # select box 305 # one out of multiple options 306 # return option if valid, or default 307 if choice not in settings.get("options"): 308 if not silently_correct: 309 raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}") 310 else: 311 return settings.get("default", "") 312 else: 313 return choice 314 315 elif input_type == UserInput.OPTION_TEXT_JSON: 316 # verify that this is actually json 317 try: 318 json.dumps(json.loads(choice)) 319 except json.JSONDecodeError: 320 raise QueryParametersException("Invalid JSON value '%s'" % choice) 321 322 return json.loads(choice) 323 324 elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE): 325 # text string 326 # optionally clamp it as an integer; return default if not a valid 327 # integer (or float; inferred from default or made explicit via the 328 # coerce_type setting) 329 if settings.get("coerce_type"): 330 value_type = settings["coerce_type"] 331 else: 332 value_type = type(settings.get("default")) 333 if value_type not in (int, float): 334 value_type = int 335 336 if "max" in settings: 337 try: 338 choice = min(settings["max"], value_type(choice)) 339 except (ValueError, TypeError): 340 if not silently_correct: 341 raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"])) 342 343 choice = settings.get("default") 344 345 if "min" in settings: 346 try: 347 choice = max(settings["min"], value_type(choice)) 348 except (ValueError, TypeError): 349 if not silently_correct: 350 raise QueryParametersException("Provide a value of %s or more." % str(settings["min"])) 351 352 choice = settings.get("default") 353 354 if choice is None or choice == "": 355 choice = settings.get("default") 356 357 if choice is None: 358 choice = 0 if "min" in settings or "max" in settings else "" 359 360 if settings.get("coerce_type"): 361 try: 362 return value_type(choice) 363 except (ValueError, TypeError): 364 return settings.get("default") 365 else: 366 return choice 367 368 else: 369 # no filtering 370 return choice
Class for handling user input
It is important to sanitise user input, as carelessly entered parameters may in e.g. requesting far more data than needed, or lead to undefined behaviour. This class offers a set of pre-defined value types that can be consistently rendered as form elements in an interface and parsed.
46 @staticmethod 47 def parse_all(options, input, silently_correct=True): 48 """ 49 Parse form input for the provided options 50 51 Ignores all input not belonging to any of the defined options: parses 52 and sanitises the rest, and returns a dictionary with the sanitised 53 options. If an option is *not* present in the input, the default value 54 is used, and if that is absent, `None`. 55 56 In other words, this ensures a dictionary with 1) only white-listed 57 keys, 2) a value of an expected type for each key. 58 59 :param dict options: Options, as a name -> settings dictionary 60 :param dict input: Input, as a form field -> value dictionary 61 :param bool silently_correct: If true, replace invalid values with the 62 given default value; else, raise a QueryParametersException if a value 63 is invalid. 64 65 :return dict: Sanitised form input 66 """ 67 68 from common.lib.helpers import convert_to_int 69 parsed_input = {} 70 71 if type(input) is not dict and type(input) is not ImmutableMultiDict: 72 raise TypeError("input must be a dictionary or ImmutableMultiDict") 73 74 if type(input) is ImmutableMultiDict: 75 # we are not using to_dict, because that messes up multi-selects 76 input = {key: input.getlist(key) for key in input} 77 for key, value in input.items(): 78 if type(value) is list and len(value) == 1: 79 input[key] = value[0] 80 81 # all parameters are submitted as option-[parameter ID], this is an 82 # artifact of how the web interface works and we can simply remove the 83 # prefix 84 input = {re.sub(r"^option-", "", field): input[field] for field in input} 85 86 # re-order input so that the fields relying on the value of other 87 # fields are parsed last 88 options = {k: options[k] for k in sorted(options, key=lambda k: options[k].get("requires") is not None)} 89 90 for option, settings in options.items(): 91 if settings.get("indirect"): 92 # these are settings that are derived from and set by other 93 # settings 94 continue 95 96 if settings.get("type") in UserInput.OPTIONS_COSMETIC: 97 # these are structural form elements and never have a value 98 continue 99 100 elif settings.get("type") == UserInput.OPTION_DATERANGE: 101 # special case, since it combines two inputs 102 option_min = option + "-min" 103 option_max = option + "-max" 104 105 # normally this is taken care of client-side, but in case this 106 # didn't work, try to salvage it server-side 107 if option_min not in input or input.get(option_min) == "-1": 108 option_min += "_proxy" 109 110 if option_max not in input or input.get(option_max) == "-1": 111 option_max += "_proxy" 112 113 # save as a tuple of unix timestamps (or None) 114 try: 115 after, before = (UserInput.parse_value(settings, input.get(option_min), parsed_input, silently_correct), UserInput.parse_value(settings, input.get(option_max), parsed_input, silently_correct)) 116 117 if before and after and after > before: 118 if not silently_correct: 119 raise QueryParametersException("End of date range must be after beginning of date range.") 120 else: 121 before = after 122 123 parsed_input[option] = (after, before) 124 except RequirementsNotMetException: 125 pass 126 127 elif settings.get("type") in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 128 # special case too, since if a checkbox is unchecked, it simply 129 # does not show up in the input 130 try: 131 if option in input: 132 # Toggle needs to be parsed 133 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 134 else: 135 # Toggle was left blank 136 parsed_input[option] = False 137 except RequirementsNotMetException: 138 pass 139 140 elif settings.get("type") == UserInput.OPTION_DATASOURCES: 141 # special case, because this combines multiple inputs to 142 # configure data source availability and expiration 143 datasources = {datasource: { 144 "enabled": f"{option}-enable-{datasource}" in input, 145 "allow_optout": f"{option}-optout-{datasource}" in input, 146 "timeout": convert_to_int(input[f"{option}-timeout-{datasource}"], 0) 147 } for datasource in input[option].split(",")} 148 149 parsed_input[option] = [datasource for datasource, v in datasources.items() if v["enabled"]] 150 parsed_input[option.split(".")[0] + ".expiration"] = datasources 151 152 elif settings.get("type") == UserInput.OPTION_EXTENSIONS: 153 # also a special case 154 parsed_input[option] = {extension: { 155 "enabled": f"{option}-enable-{extension}" in input 156 } for extension in input[option].split(",")} 157 158 elif settings.get("type") == UserInput.OPTION_DATASOURCES_TABLE: 159 # special case, parse table values to generate a dict 160 columns = list(settings["columns"].keys()) 161 table_input = {} 162 163 for datasource in list(settings["default"].keys()): 164 table_input[datasource] = {} 165 for column in columns: 166 167 choice = input.get(option + "-" + datasource + "-" + column, False) 168 column_settings = settings["columns"][column] # sub-settings per column 169 table_input[datasource][column] = UserInput.parse_value(column_settings, choice, table_input, silently_correct=True) 170 171 parsed_input[option] = table_input 172 173 elif option not in input: 174 # not provided? use default 175 parsed_input[option] = settings.get("default", None) 176 177 else: 178 # normal parsing and sanitisation 179 try: 180 parsed_input[option] = UserInput.parse_value(settings, input[option], parsed_input, silently_correct) 181 except RequirementsNotMetException: 182 pass 183 184 return parsed_input
Parse form input for the provided options
Ignores all input not belonging to any of the defined options: parses
and sanitises the rest, and returns a dictionary with the sanitised
options. If an option is not present in the input, the default value
is used, and if that is absent, None
.
In other words, this ensures a dictionary with 1) only white-listed keys, 2) a value of an expected type for each key.
Parameters
- dict options: Options, as a name -> settings dictionary
- dict input: Input, as a form field -> value dictionary
- bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns
Sanitised form input
186 @staticmethod 187 def parse_value(settings, choice, other_input=None, silently_correct=True): 188 """ 189 Filter user input 190 191 Makes sure user input for post-processors is valid and within the 192 parameters specified by the post-processor 193 194 :param obj settings: Settings, including defaults and valid options 195 :param choice: The chosen option, to be parsed 196 :param dict other_input: Other input, as parsed so far 197 :param bool silently_correct: If true, replace invalid values with the 198 given default value; else, raise a QueryParametersException if a value 199 is invalid. 200 201 :return: Validated and parsed input 202 """ 203 # short-circuit if there is a requirement for the field to be parsed 204 # and the requirement isn't met 205 if settings.get("requires"): 206 try: 207 field, operator, value = re.findall(r"([a-zA-Z0-9_-]+)([!=$~^]+)(.*)", settings.get("requires"))[0] 208 except IndexError: 209 # invalid condition, interpret as 'does the field with this name have a value' 210 field, operator, value = (choice, "!=", "") 211 212 if field not in other_input: 213 raise RequirementsNotMetException() 214 215 other_value = other_input.get(field) 216 if type(other_value) is bool: 217 # evalues to a boolean, i.e. checkboxes etc 218 if operator == "!=": 219 if (other_value and value in ("", "false")) or (not other_value and value in ("true", "checked")): 220 raise RequirementsNotMetException() 221 else: 222 if (other_value and value not in ("true", "checked")) or (not other_value and value not in ("", "false")): 223 raise RequirementsNotMetException() 224 225 else: 226 if type(other_value) in (tuple, list): 227 # iterables are a bit special 228 if len(other_value) == 1: 229 # treat one-item lists as "normal" values 230 other_value = other_value[0] 231 elif operator == "~=": # interpret as 'is in list?' 232 if value not in other_value: 233 raise RequirementsNotMetException() 234 else: 235 # condition doesn't make sense for a list, so assume it's not True 236 raise RequirementsNotMetException() 237 238 if operator == "^=" and not str(other_value).startswith(value): 239 raise RequirementsNotMetException() 240 elif operator == "$=" and not str(other_value).endswith(value): 241 raise RequirementsNotMetException() 242 elif operator == "~=" and value not in str(other_value): 243 raise RequirementsNotMetException() 244 elif operator == "!=" and value == other_value: 245 raise RequirementsNotMetException() 246 elif operator in ("==", "=") and value != other_value: 247 raise RequirementsNotMetException() 248 249 input_type = settings.get("type", "") 250 if input_type in UserInput.OPTIONS_COSMETIC: 251 # these are structural form elements and can never return a value 252 return None 253 254 elif input_type in (UserInput.OPTION_TOGGLE, UserInput.OPTION_ANNOTATION): 255 # simple boolean toggle 256 if type(choice) is bool: 257 return choice 258 elif choice in ['false', 'False']: 259 # Sanitized options passed back to Flask can be converted to strings as 'false' 260 return False 261 elif choice in ['true', 'True', 'on']: 262 # Toggle will have value 'on', but may also becomes a string 'true' 263 return True 264 else: 265 raise QueryParametersException("Toggle invalid input") 266 267 elif input_type in (UserInput.OPTION_DATE, UserInput.OPTION_DATERANGE): 268 # parse either integers (unix timestamps) or try to guess the date 269 # format (the latter may be used for input if JavaScript is turned 270 # off in the front-end and the input comes from there) 271 value = None 272 try: 273 value = int(choice) 274 except ValueError: 275 parsed_choice = parse_datetime(choice) 276 value = int(parsed_choice.timestamp()) 277 finally: 278 return value 279 280 elif input_type in (UserInput.OPTION_MULTI, UserInput.OPTION_ANNOTATIONS): 281 # any number of values out of a list of possible values 282 # comma-separated during input, returned as a list of valid options 283 if not choice: 284 return settings.get("default", []) 285 286 chosen = choice.split(",") 287 return [item for item in chosen if item in settings.get("options", [])] 288 289 elif input_type == UserInput.OPTION_MULTI_SELECT: 290 # multiple number of values out of a dropdown list of possible values 291 # comma-separated during input, returned as a list of valid options 292 if not choice: 293 return settings.get("default", []) 294 295 if type(choice) is str: 296 # should be a list if the form control was actually a multiselect 297 # but we have some client side UI helpers that may produce a string 298 # instead 299 choice = choice.split(",") 300 301 return [item for item in choice if item in settings.get("options", [])] 302 303 elif input_type == UserInput.OPTION_CHOICE: 304 # select box 305 # one out of multiple options 306 # return option if valid, or default 307 if choice not in settings.get("options"): 308 if not silently_correct: 309 raise QueryParametersException(f"Invalid value selected; must be one of {', '.join(settings.get('options', {}).keys())}. {settings}") 310 else: 311 return settings.get("default", "") 312 else: 313 return choice 314 315 elif input_type == UserInput.OPTION_TEXT_JSON: 316 # verify that this is actually json 317 try: 318 json.dumps(json.loads(choice)) 319 except json.JSONDecodeError: 320 raise QueryParametersException("Invalid JSON value '%s'" % choice) 321 322 return json.loads(choice) 323 324 elif input_type in (UserInput.OPTION_TEXT, UserInput.OPTION_TEXT_LARGE, UserInput.OPTION_HUE): 325 # text string 326 # optionally clamp it as an integer; return default if not a valid 327 # integer (or float; inferred from default or made explicit via the 328 # coerce_type setting) 329 if settings.get("coerce_type"): 330 value_type = settings["coerce_type"] 331 else: 332 value_type = type(settings.get("default")) 333 if value_type not in (int, float): 334 value_type = int 335 336 if "max" in settings: 337 try: 338 choice = min(settings["max"], value_type(choice)) 339 except (ValueError, TypeError): 340 if not silently_correct: 341 raise QueryParametersException("Provide a value of %s or lower." % str(settings["max"])) 342 343 choice = settings.get("default") 344 345 if "min" in settings: 346 try: 347 choice = max(settings["min"], value_type(choice)) 348 except (ValueError, TypeError): 349 if not silently_correct: 350 raise QueryParametersException("Provide a value of %s or more." % str(settings["min"])) 351 352 choice = settings.get("default") 353 354 if choice is None or choice == "": 355 choice = settings.get("default") 356 357 if choice is None: 358 choice = 0 if "min" in settings or "max" in settings else "" 359 360 if settings.get("coerce_type"): 361 try: 362 return value_type(choice) 363 except (ValueError, TypeError): 364 return settings.get("default") 365 else: 366 return choice 367 368 else: 369 # no filtering 370 return choice
Filter user input
Makes sure user input for post-processors is valid and within the parameters specified by the post-processor
Parameters
- obj settings: Settings, including defaults and valid options
- choice: The chosen option, to be parsed
- dict other_input: Other input, as parsed so far
- bool silently_correct: If true, replace invalid values with the given default value; else, raise a QueryParametersException if a value is invalid.
Returns
Validated and parsed input