common.lib.helpers
Miscellaneous helper functions for the 4CAT backend
1""" 2Miscellaneous helper functions for the 4CAT backend 3""" 4import subprocess 5import imagehash 6import hashlib 7import requests 8import datetime 9import smtplib 10import fnmatch 11import socket 12import shlex 13import copy 14import time 15import json 16import math 17import csv 18import ssl 19import re 20import os 21import io 22 23from pathlib import Path 24from collections.abc import MutableMapping 25from html.parser import HTMLParser 26from urllib.parse import urlparse, urlunparse 27from calendar import monthrange 28from packaging import version 29from PIL import Image 30 31from common.lib.user_input import UserInput 32from common.config_manager import config 33 34 35def init_datasource(database, logger, queue, name): 36 """ 37 Initialize data source 38 39 Queues jobs to scrape the boards that were configured to be scraped in the 40 4CAT configuration file. If none were configured, nothing happens. 41 42 :param Database database: Database connection instance 43 :param Logger logger: Log handler 44 :param JobQueue queue: Job Queue instance 45 :param string name: ID of datasource that is being initialised 46 """ 47 pass 48 49def strip_tags(html, convert_newlines=True): 50 """ 51 Strip HTML from a string 52 53 :param html: HTML to strip 54 :param convert_newlines: Convert <br> and </p> tags to \n before stripping 55 :return: Stripped HTML 56 """ 57 if not html: 58 return "" 59 60 deduplicate_newlines = re.compile(r"\n+") 61 62 if convert_newlines: 63 html = html.replace("<br>", "\n").replace("</p>", "</p>\n") 64 html = deduplicate_newlines.sub("\n", html) 65 66 class HTMLStripper(HTMLParser): 67 def __init__(self): 68 super().__init__() 69 self.reset() 70 self.strict = False 71 self.convert_charrefs = True 72 self.fed = [] 73 74 def handle_data(self, data): 75 self.fed.append(data) 76 77 def get_data(self): 78 return "".join(self.fed) 79 80 stripper = HTMLStripper() 81 stripper.feed(html) 82 return stripper.get_data() 83 84 85def sniff_encoding(file): 86 """ 87 Determine encoding from raw file bytes 88 89 Currently only distinguishes UTF-8 and UTF-8 with BOM 90 91 :param file: 92 :return: 93 """ 94 if type(file) == bytearray: 95 maybe_bom = file[:3] 96 elif hasattr(file, "getbuffer"): 97 buffer = file.getbuffer() 98 maybe_bom = buffer[:3].tobytes() 99 elif hasattr(file, "peek"): 100 buffer = file.peek(32) 101 maybe_bom = buffer[:3] 102 else: 103 maybe_bom = False 104 105 return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8" 106 107def sniff_csv_dialect(csv_input): 108 """ 109 Determine CSV dialect for an input stream 110 111 :param csv_input: Input stream 112 :return tuple: Tuple: Dialect object and a boolean representing whether 113 the CSV file seems to have a header 114 """ 115 encoding = sniff_encoding(csv_input) 116 if type(csv_input) is io.TextIOWrapper: 117 wrapped_input = csv_input 118 else: 119 wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding) 120 wrapped_input.seek(0) 121 sample = wrapped_input.read(1024 * 1024) 122 wrapped_input.seek(0) 123 has_header = csv.Sniffer().has_header(sample) 124 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 125 126 return dialect, has_header 127 128 129def get_git_branch(): 130 """ 131 Get current git branch 132 133 If the 4CAT root folder is a git repository, this function will return the 134 name of the currently checked-out branch. If the folder is not a git 135 repository or git is not installed an empty string is returned. 136 """ 137 try: 138 root_dir = str(config.get('PATH_ROOT').resolve()) 139 branch = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE) 140 if branch.returncode != 0: 141 raise ValueError() 142 branch_name = branch.stdout.decode("utf-8").strip() 143 if not branch_name: 144 # Check for detached HEAD state 145 # Most likely occuring because of checking out release tags (which are not branches) or commits 146 head_status = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} status"), stdout=subprocess.PIPE) 147 if head_status.returncode == 0: 148 for line in head_status.stdout.decode("utf-8").split("\n"): 149 if "HEAD detached at" in line: 150 branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1] 151 return branch_name 152 except (subprocess.SubprocessError, ValueError, FileNotFoundError): 153 return "" 154 155 156def get_software_commit(worker=None): 157 """ 158 Get current 4CAT git commit hash 159 160 Use `get_software_version()` instead if you need the release version 161 number rather than the precise commit hash. 162 163 If no version file is available, run `git show` to test if there is a git 164 repository in the 4CAT root folder, and if so, what commit is currently 165 checked out in it. 166 167 For extensions, get the repository information for that extension, or if 168 the extension is not a git repository, return empty data. 169 170 :param BasicWorker processor: Worker to get commit for. If not given, get 171 version information for the main 4CAT installation. 172 173 :return tuple: 4CAT git commit hash, repository name 174 """ 175 # try git command line within the 4CAT root folder 176 # if it is a checked-out git repository, it will tell us the hash of 177 # the currently checked-out commit 178 179 # path has no Path.relative()... 180 relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent 181 try: 182 # if extension, go to the extension file's path 183 # we will run git here - if it is not its own repository, we have no 184 # useful version info (since the extension is by definition not in the 185 # main 4CAT repository) and will return an empty value 186 if worker and worker.is_extension: 187 working_dir = str(config.get("PATH_ROOT").joinpath(relative_filepath).resolve()) 188 # check if we are in the extensions' own repo or 4CAT's 189 git_cmd = f"git -C {shlex.quote(working_dir)} rev-parse --show-toplevel" 190 repo_level = subprocess.run(shlex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 191 if Path(repo_level.stdout.decode("utf-8")) == config.get("PATH_ROOT"): 192 # not its own repository 193 return ("", "") 194 195 else: 196 working_dir = str(config.get("PATH_ROOT").resolve()) 197 198 show = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 199 if show.returncode != 0: 200 raise ValueError() 201 commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1] 202 203 # now get the repository the commit belongs to, if we can 204 origin = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 205 if origin.returncode != 0 or not origin.stdout: 206 raise ValueError() 207 repository = origin.stdout.decode("utf-8").strip() 208 if repository.endswith(".git"): 209 repository = repository[:-4] 210 211 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e: 212 return ("", "") 213 214 return (commit, repository) 215 216def get_software_version(): 217 """ 218 Get current 4CAT version 219 220 This is the actual software version, i.e. not the commit hash (see 221 `get_software_hash()` for that). The current version is stored in a file 222 with a canonical location: if the file doesn't exist, an empty string is 223 returned. 224 225 :return str: Software version, for example `1.37`. 226 """ 227 current_version_file = config.get("PATH_ROOT").joinpath("config/.current-version") 228 if not current_version_file.exists(): 229 return "" 230 231 with current_version_file.open() as infile: 232 return infile.readline().strip() 233 234def get_github_version(timeout=5): 235 """ 236 Get latest release tag version from GitHub 237 238 Will raise a ValueError if it cannot retrieve information from GitHub. 239 240 :param int timeout: Timeout in seconds for HTTP request 241 242 :return tuple: Version, e.g. `1.26`, and release URL. 243 """ 244 repo_url = config.get("4cat.github_url") 245 if not repo_url.endswith("/"): 246 repo_url += "/" 247 248 repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url)) 249 250 api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id 251 response = requests.get(api_url, timeout=timeout) 252 response = response.json() 253 if response.get("message") == "Not Found": 254 raise ValueError("Invalid GitHub URL or repository name") 255 256 latest_tag = response.get("tag_name", "unknown") 257 if latest_tag.startswith("v"): 258 latest_tag = re.sub(r"^v", "", latest_tag) 259 260 return (latest_tag, response.get("html_url")) 261 262def get_ffmpeg_version(ffmpeg_path): 263 """ 264 Determine ffmpeg version 265 266 This can be necessary when using commands that change name between versions. 267 268 :param ffmpeg_path: ffmpeg executable path 269 :return packaging.version: Comparable ersion 270 """ 271 command = [ffmpeg_path, "-version"] 272 ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, 273 stderr=subprocess.PIPE) 274 275 ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1] 276 ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0] 277 278 return version.parse(ffmpeg_version) 279 280 281def find_extensions(): 282 """ 283 Find 4CAT extensions and load their metadata 284 285 Looks for subfolders of the extension folder, and loads additional metadata 286 where available. 287 288 :return tuple: A tuple with two items; the extensions, as an ID -> metadata 289 dictionary, and a list of (str) errors encountered while loading 290 """ 291 extension_path = config.get("PATH_ROOT").joinpath("extensions") 292 errors = [] 293 if not extension_path.exists() or not extension_path.is_dir(): 294 return [], None 295 296 # each folder in the extensions folder is an extension 297 extensions = { 298 extension.name: { 299 "name": extension.name, 300 "version": "", 301 "url": "", 302 "git_url": "", 303 "is_git": False 304 } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir() 305 } 306 307 # collect metadata for extensions 308 allowed_metadata_keys = ("name", "version", "url") 309 for extension in extensions: 310 extension_folder = extension_path.joinpath(extension) 311 metadata_file = extension_folder.joinpath("metadata.json") 312 if metadata_file.exists(): 313 with metadata_file.open() as infile: 314 try: 315 metadata = json.load(infile) 316 extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys}) 317 except (TypeError, ValueError) as e: 318 errors.append(f"Error reading metadata file for extension '{extension}' ({e})") 319 continue 320 321 extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists() 322 if extensions[extension]["is_git"]: 323 # try to get remote URL 324 try: 325 extension_root = str(extension_folder.resolve()) 326 origin = subprocess.run(shlex.split(f"git -C {shlex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE, 327 stdout=subprocess.PIPE) 328 if origin.returncode != 0 or not origin.stdout: 329 raise ValueError() 330 repository = origin.stdout.decode("utf-8").strip() 331 if repository.endswith(".git") and "github.com" in repository: 332 # use repo URL 333 repository = repository[:-4] 334 extensions[extension]["git_url"] = repository 335 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e: 336 print(e) 337 pass 338 339 return extensions, errors 340 341 342def convert_to_int(value, default=0): 343 """ 344 Convert a value to an integer, with a fallback 345 346 The fallback is used if an Error is thrown during converstion to int. 347 This is a convenience function, but beats putting try-catches everywhere 348 we're using user input as an integer. 349 350 :param value: Value to convert 351 :param int default: Default value, if conversion not possible 352 :return int: Converted value 353 """ 354 try: 355 return int(value) 356 except (ValueError, TypeError): 357 return default 358 359 360def timify_long(number): 361 """ 362 Make a number look like an indication of time 363 364 :param number: Number to convert. If the number is larger than the current 365 UNIX timestamp, decrease by that amount 366 :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes` 367 """ 368 number = int(number) 369 370 components = [] 371 if number > time.time(): 372 number = time.time() - number 373 374 month_length = 30.42 * 86400 375 months = math.floor(number / month_length) 376 if months: 377 components.append("%i month%s" % (months, "s" if months != 1 else "")) 378 number -= (months * month_length) 379 380 week_length = 7 * 86400 381 weeks = math.floor(number / week_length) 382 if weeks: 383 components.append("%i week%s" % (weeks, "s" if weeks != 1 else "")) 384 number -= (weeks * week_length) 385 386 day_length = 86400 387 days = math.floor(number / day_length) 388 if days: 389 components.append("%i day%s" % (days, "s" if days != 1 else "")) 390 number -= (days * day_length) 391 392 hour_length = 3600 393 hours = math.floor(number / hour_length) 394 if hours: 395 components.append("%i hour%s" % (hours, "s" if hours != 1 else "")) 396 number -= (hours * hour_length) 397 398 minute_length = 60 399 minutes = math.floor(number / minute_length) 400 if minutes: 401 components.append("%i minute%s" % (minutes, "s" if minutes != 1 else "")) 402 403 if not components: 404 components.append("less than a minute") 405 406 last_str = components.pop() 407 time_str = "" 408 if components: 409 time_str = ", ".join(components) 410 time_str += " and " 411 412 return time_str + last_str 413 414def andify(items): 415 """ 416 Format a list of items for use in text 417 418 Returns a comma-separated list, the last item preceded by "and" 419 420 :param items: Iterable list 421 :return str: Formatted string 422 """ 423 if len(items) == 0: 424 return "" 425 elif len(items) == 1: 426 return str(items[1]) 427 428 result = f" and {items.pop()}" 429 return ", ".join([str(item) for item in items]) + result 430 431 432def hash_file(image_file, hash_type="file-hash"): 433 """ 434 Generate an image hash 435 436 :param Path image_file: Image file to hash 437 :param str hash_type: Hash type, one of `file-hash`, `colorhash`, 438 `phash`, `average_hash`, `dhash` 439 :return str: Hexadecimal hash value 440 """ 441 if not image_file.exists(): 442 raise FileNotFoundError() 443 444 if hash_type == "file-hash": 445 hasher = hashlib.sha1() 446 447 # Open the file in binary mode 448 with image_file.open("rb") as infile: 449 # Read and update hash in chunks to handle large files 450 while chunk := infile.read(1024): 451 hasher.update(chunk) 452 453 return hasher.hexdigest() 454 455 elif hash_type in ("colorhash", "phash", "average_hash", "dhash"): 456 image = Image.open(image_file) 457 458 return str(getattr(imagehash, hash_type)(image)) 459 460 else: 461 raise NotImplementedError(f"Unknown hash type '{hash_type}'") 462 463def get_yt_compatible_ids(yt_ids): 464 """ 465 :param yt_ids list, a list of strings 466 :returns list, a ist of joined strings in pairs of 50 467 468 Takes a list of IDs and returns list of joined strings 469 in pairs of fifty. This should be done for the YouTube API 470 that requires a comma-separated string and can only return 471 max fifty results. 472 """ 473 474 # If there's only one item, return a single list item 475 if isinstance(yt_ids, str): 476 return [yt_ids] 477 478 ids = [] 479 last_i = 0 480 for i, yt_id in enumerate(yt_ids): 481 482 # Add a joined string per fifty videos 483 if i % 50 == 0 and i != 0: 484 ids_string = ",".join(yt_ids[last_i:i]) 485 ids.append(ids_string) 486 last_i = i 487 488 # If the end of the list is reached, add the last data 489 elif i == (len(yt_ids) - 1): 490 ids_string = ",".join(yt_ids[last_i:i]) 491 ids.append(ids_string) 492 493 return ids 494 495 496def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None, 497 fontsize_small=None, fontsize_large=None): 498 """ 499 Get a standard SVG canvas to draw 4CAT graphs to 500 501 Adds a border, footer, header, and some basic text styling 502 503 :param path: The path where the SVG graph will be saved 504 :param width: Width of the canvas 505 :param height: Height of the canvas 506 :param header: Header, if necessary to draw 507 :param footer: Footer text, if necessary to draw. Defaults to shameless 508 4CAT advertisement. 509 :param fontsize_normal: Font size of normal text 510 :param fontsize_small: Font size of small text (e.g. footer) 511 :param fontsize_large: Font size of large text (e.g. header) 512 :return SVG: SVG canvas (via svgwrite) that can be drawn to 513 """ 514 from svgwrite.container import SVG, Hyperlink 515 from svgwrite.drawing import Drawing 516 from svgwrite.shapes import Rect 517 from svgwrite.text import Text 518 519 if fontsize_normal is None: 520 fontsize_normal = width / 75 521 522 if fontsize_small is None: 523 fontsize_small = width / 100 524 525 if fontsize_large is None: 526 fontsize_large = width / 50 527 528 # instantiate with border and white background 529 canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal) 530 canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF")) 531 532 # header 533 if header: 534 header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2)) 535 header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 536 header_shape.add( 537 Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 538 style="font-size:%ipx" % fontsize_large)) 539 canvas.add(header_shape) 540 541 # footer (i.e. 4cat banner) 542 if footer: 543 footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2) 544 footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize) 545 footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 546 link = Hyperlink(href="https://4cat.nl") 547 link.add( 548 Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 549 style="font-size:%ipx" % fontsize_small)) 550 footer_shape.add(link) 551 canvas.add(footer_shape) 552 553 return canvas 554 555 556def call_api(action, payload=None, wait_for_response=True): 557 """ 558 Send message to server 559 560 Calls the internal API and returns interpreted response. 561 562 :param str action: API action 563 :param payload: API payload 564 :param bool wait_for_response: Wait for response? If not close connection 565 immediately after sending data. 566 567 :return: API response, or timeout message in case of timeout 568 """ 569 connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 570 connection.settimeout(15) 571 connection.connect((config.get('API_HOST'), config.get('API_PORT'))) 572 573 msg = json.dumps({"request": action, "payload": payload}) 574 connection.sendall(msg.encode("ascii", "ignore")) 575 576 if wait_for_response: 577 try: 578 response = "" 579 while True: 580 bytes = connection.recv(2048) 581 if not bytes: 582 break 583 584 response += bytes.decode("ascii", "ignore") 585 except (socket.timeout, TimeoutError): 586 response = "(Connection timed out)" 587 588 try: 589 connection.shutdown(socket.SHUT_RDWR) 590 except OSError: 591 # already shut down automatically 592 pass 593 connection.close() 594 595 try: 596 return json.loads(response) if wait_for_response else None 597 except json.JSONDecodeError: 598 return response 599 600 601def get_interval_descriptor(item, interval): 602 """ 603 Get interval descriptor based on timestamp 604 605 :param dict item: Item to generate descriptor for, should have a 606 "timestamp" key 607 :param str interval: Interval, one of "all", "overall", "year", 608 "month", "week", "day" 609 :return str: Interval descriptor, e.g. "overall", "2020", "2020-08", 610 "2020-43", "2020-08-01" 611 """ 612 if interval in ("all", "overall"): 613 return interval 614 615 if "timestamp" not in item: 616 raise ValueError("No date available for item in dataset") 617 618 # Catch cases where a custom timestamp has an epoch integer as value. 619 try: 620 timestamp = int(item["timestamp"]) 621 try: 622 timestamp = datetime.datetime.fromtimestamp(timestamp) 623 except (ValueError, TypeError) as e: 624 raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"])) 625 except: 626 try: 627 timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S") 628 except (ValueError, TypeError) as e: 629 raise ValueError("Invalid date '%s'" % str(item["timestamp"])) 630 631 if interval == "year": 632 return str(timestamp.year) 633 elif interval == "month": 634 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) 635 elif interval == "week": 636 return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2) 637 elif interval == "hour": 638 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 639 2) + " " + str(timestamp.hour).zfill(2) 640 elif interval == "minute": 641 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 642 2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2) 643 else: 644 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2) 645 646 647def pad_interval(intervals, first_interval=None, last_interval=None): 648 """ 649 Pad an interval so all intermediate intervals are filled 650 651 Warning, ugly code (PRs very welcome) 652 653 :param dict intervals: A dictionary, with dates (YYYY{-MM}{-DD}) as keys 654 and a numerical value. 655 :param first_interval: 656 :param last_interval: 657 :return: 658 """ 659 missing = 0 660 test_key = list(intervals.keys())[0] 661 662 # first determine the boundaries of the interval 663 # these may be passed as parameters, or they can be inferred from the 664 # interval given 665 if first_interval: 666 first_interval = str(first_interval) 667 first_year = int(first_interval[0:4]) 668 if len(first_interval) > 4: 669 first_month = int(first_interval[5:7]) 670 if len(first_interval) > 7: 671 first_day = int(first_interval[8:10]) 672 if len(first_interval) > 10: 673 first_hour = int(first_interval[11:13]) 674 if len(first_interval) > 13: 675 first_minute = int(first_interval[14:16]) 676 677 else: 678 first_year = min([int(i[0:4]) for i in intervals]) 679 if len(test_key) > 4: 680 first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year]) 681 if len(test_key) > 7: 682 first_day = min( 683 [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month]) 684 if len(test_key) > 10: 685 first_hour = min( 686 [int(i[11:13]) for i in intervals if 687 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day]) 688 if len(test_key) > 13: 689 first_minute = min( 690 [int(i[14:16]) for i in intervals if 691 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int( 692 i[11:13]) == first_hour]) 693 694 if last_interval: 695 last_interval = str(last_interval) 696 last_year = int(last_interval[0:4]) 697 if len(last_interval) > 4: 698 last_month = int(last_interval[5:7]) 699 if len(last_interval) > 7: 700 last_day = int(last_interval[8:10]) 701 if len(last_interval) > 10: 702 last_hour = int(last_interval[11:13]) 703 if len(last_interval) > 13: 704 last_minute = int(last_interval[14:16]) 705 else: 706 last_year = max([int(i[0:4]) for i in intervals]) 707 if len(test_key) > 4: 708 last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year]) 709 if len(test_key) > 7: 710 last_day = max( 711 [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month]) 712 if len(test_key) > 10: 713 last_hour = max( 714 [int(i[11:13]) for i in intervals if 715 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day]) 716 if len(test_key) > 13: 717 last_minute = max( 718 [int(i[14:16]) for i in intervals if 719 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int( 720 i[11:13]) == last_hour]) 721 722 has_month = re.match(r"^[0-9]{4}-[0-9]", test_key) 723 has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key) 724 has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key) 725 has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key) 726 727 all_intervals = [] 728 for year in range(first_year, last_year + 1): 729 year_interval = str(year) 730 731 if not has_month: 732 all_intervals.append(year_interval) 733 continue 734 735 start_month = first_month if year == first_year else 1 736 end_month = last_month if year == last_year else 12 737 for month in range(start_month, end_month + 1): 738 month_interval = year_interval + "-" + str(month).zfill(2) 739 740 if not has_day: 741 all_intervals.append(month_interval) 742 continue 743 744 start_day = first_day if all((year == first_year, month == first_month)) else 1 745 end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1] 746 for day in range(start_day, end_day + 1): 747 day_interval = month_interval + "-" + str(day).zfill(2) 748 749 if not has_hour: 750 all_intervals.append(day_interval) 751 continue 752 753 start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0 754 end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23 755 for hour in range(start_hour, end_hour + 1): 756 hour_interval = day_interval + " " + str(hour).zfill(2) 757 758 if not has_minute: 759 all_intervals.append(hour_interval) 760 continue 761 762 start_minute = first_minute if all( 763 (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0 764 end_minute = last_minute if all( 765 (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59 766 767 for minute in range(start_minute, end_minute + 1): 768 minute_interval = hour_interval + ":" + str(minute).zfill(2) 769 all_intervals.append(minute_interval) 770 771 for interval in all_intervals: 772 if interval not in intervals: 773 intervals[interval] = 0 774 missing += 1 775 776 # sort while we're at it 777 intervals = {key: intervals[key] for key in sorted(intervals)} 778 779 return missing, intervals 780 781 782def remove_nuls(value): 783 """ 784 Remove \0 from a value 785 786 The CSV library cries about a null byte when it encounters one :( :( :( 787 poor little csv cannot handle a tiny little null byte 788 789 So remove them from the data because they should not occur in utf-8 data 790 anyway. 791 792 :param value: Value to remove nulls from. For dictionaries, sets, tuples 793 and lists all items are parsed recursively. 794 :return value: Cleaned value 795 """ 796 if type(value) is dict: 797 for field in value: 798 value[field] = remove_nuls(value[field]) 799 elif type(value) is list: 800 value = [remove_nuls(item) for item in value] 801 elif type(value) is tuple: 802 value = tuple([remove_nuls(item) for item in value]) 803 elif type(value) is set: 804 value = set([remove_nuls(item) for item in value]) 805 elif type(value) is str: 806 value = value.replace("\0", "") 807 808 return value 809 810 811class NullAwareTextIOWrapper(io.TextIOWrapper): 812 """ 813 TextIOWrapper that skips null bytes 814 815 This can be used as a file reader that silently discards any null bytes it 816 encounters. 817 """ 818 819 def __next__(self): 820 value = super().__next__() 821 return remove_nuls(value) 822 823 824class HashCache: 825 """ 826 Simple cache handler to cache hashed values 827 828 Avoids having to calculate a hash for values that have been hashed before 829 """ 830 831 def __init__(self, hasher): 832 self.hash_cache = {} 833 self.hasher = hasher 834 835 def update_cache(self, value): 836 """ 837 Checks the hash_cache to see if the value has been cached previously, 838 updates the hash_cache if needed, and returns the hashed value. 839 """ 840 # value = str(value) 841 if value not in self.hash_cache: 842 author_hasher = self.hasher.copy() 843 author_hasher.update(str(value).encode("utf-8")) 844 self.hash_cache[value] = author_hasher.hexdigest() 845 del author_hasher 846 return self.hash_cache[value] 847 848 849def dict_search_and_update(item, keyword_matches, function): 850 """ 851 Filter fields in an object recursively 852 853 Apply a function to every item and sub item of a dictionary if the key 854 contains one of the provided match terms. 855 856 Function loops through a dictionary or list and compares dictionary keys to 857 the strings defined by keyword_matches. It then applies the change_function 858 to corresponding values. 859 860 Note: if a matching term is found, all nested values will have the function 861 applied to them. e.g., all these values would be changed even those with 862 not_key_match: 863 864 {'key_match' : 'changed', 865 'also_key_match' : {'not_key_match' : 'but_value_still_changed'}, 866 'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]} 867 868 This is a comprehensive (and expensive) approach to updating a dictionary. 869 IF a dictionary structure is known, a better solution would be to update 870 using specific keys. 871 872 :param Dict/List item: dictionary/list/json to loop through 873 :param String keyword_matches: list of strings that will be matched to 874 dictionary keys. Can contain wildcards which are matched using fnmatch. 875 :param Function function: function appled to all values of any items 876 nested under a matching key 877 878 :return Dict/List: Copy of original item, but filtered 879 """ 880 881 def loop_helper_function(d_or_l, match_terms, change_function): 882 """ 883 Recursive helper function that updates item in place 884 """ 885 if isinstance(d_or_l, dict): 886 # Iterate through dictionary 887 for key, value in iter(d_or_l.items()): 888 if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]): 889 # Match found; apply function to all items and sub-items 890 if isinstance(value, (list, dict)): 891 # Pass item through again with match_terms = True 892 loop_helper_function(value, 'True', change_function) 893 elif value is None: 894 pass 895 else: 896 # Update the value 897 d_or_l[key] = change_function(value) 898 elif isinstance(value, (list, dict)): 899 # Continue search 900 loop_helper_function(value, match_terms, change_function) 901 elif isinstance(d_or_l, list): 902 # Iterate through list 903 for n, value in enumerate(d_or_l): 904 if isinstance(value, (list, dict)): 905 # Continue search 906 loop_helper_function(value, match_terms, change_function) 907 elif match_terms == 'True': 908 # List item nested in matching 909 d_or_l[n] = change_function(value) 910 else: 911 raise Exception('Must pass list or dictionary') 912 913 # Lowercase keyword_matches 914 keyword_matches = [keyword.lower() for keyword in keyword_matches] 915 916 # Create deepcopy and return new item 917 temp_item = copy.deepcopy(item) 918 loop_helper_function(temp_item, keyword_matches, function) 919 return temp_item 920 921 922def get_last_line(filepath): 923 """ 924 Seeks from end of file for '\n' and returns that line 925 926 :param str filepath: path to file 927 :return str: last line of file 928 """ 929 with open(filepath, "rb") as file: 930 try: 931 # start at the end of file 932 file.seek(-2, os.SEEK_END) 933 # check if NOT endline i.e. '\n' 934 while file.read(1) != b'\n': 935 # if not '\n', back up two characters and check again 936 file.seek(-2, os.SEEK_CUR) 937 except OSError: 938 file.seek(0) 939 last_line = file.readline().decode() 940 return last_line 941 942 943def add_notification(db, user, notification, expires=None, allow_dismiss=True): 944 db.insert("users_notifications", { 945 "username": user, 946 "notification": notification, 947 "timestamp_expires": expires, 948 "allow_dismiss": allow_dismiss 949 }, safe=True) 950 951 952def send_email(recipient, message): 953 """ 954 Send an e-mail using the configured SMTP settings 955 956 Just a thin wrapper around smtplib, so we don't have to repeat ourselves. 957 Exceptions are to be handled outside the function. 958 959 :param list recipient: Recipient e-mail addresses 960 :param MIMEMultipart message: Message to send 961 """ 962 # Create a secure SSL context 963 context = ssl.create_default_context() 964 965 # Decide which connection type 966 with smtplib.SMTP_SSL(config.get('mail.server'), port=config.get('mail.port', 0), context=context) if config.get( 967 'mail.ssl') == 'ssl' else smtplib.SMTP(config.get('mail.server'), 968 port=config.get('mail.port', 0)) as server: 969 if config.get('mail.ssl') == 'tls': 970 # smtplib.SMTP adds TLS context here 971 server.starttls(context=context) 972 973 # Log in 974 if config.get('mail.username') and config.get('mail.password'): 975 server.ehlo() 976 server.login(config.get('mail.username'), config.get('mail.password')) 977 978 # Send message 979 if type(message) == str: 980 server.sendmail(config.get('mail.noreply'), recipient, message) 981 else: 982 server.sendmail(config.get('mail.noreply'), recipient, message.as_string()) 983 984 985def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'): 986 """ 987 Return a flattened dictionary where nested dictionary objects are given new 988 keys using the partent key combined using the seperator with the child key. 989 990 Lists will be converted to json strings via json.dumps() 991 992 :param MutableMapping d: Dictionary like object 993 :param str partent_key: The original parent key prepending future nested keys 994 :param str sep: A seperator string used to combine parent and child keys 995 :return dict: A new dictionary with the no nested values 996 """ 997 998 def _flatten_dict_gen(d, parent_key, sep): 999 for k, v in d.items(): 1000 new_key = parent_key + sep + k if parent_key else k 1001 if isinstance(v, MutableMapping): 1002 yield from flatten_dict(v, new_key, sep=sep).items() 1003 elif isinstance(v, (list, set)): 1004 yield new_key, json.dumps( 1005 [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v]) 1006 else: 1007 yield new_key, v 1008 1009 return dict(_flatten_dict_gen(d, parent_key, sep)) 1010 1011 1012def sets_to_lists(d: MutableMapping): 1013 """ 1014 Return a dictionary where all nested sets have been converted to lists. 1015 1016 :param MutableMapping d: Dictionary like object 1017 :return dict: A new dictionary with the no nested sets 1018 """ 1019 1020 def _check_list(l): 1021 return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, ( 1022 set, list)) else item for item in l] 1023 1024 def _sets_to_lists_gen(d): 1025 for k, v in d.items(): 1026 if isinstance(v, MutableMapping): 1027 yield k, sets_to_lists(v) 1028 elif isinstance(v, (list, set)): 1029 yield k, _check_list(v) 1030 else: 1031 yield k, v 1032 1033 return dict(_sets_to_lists_gen(d)) 1034 1035 1036def url_to_hash(url, remove_scheme=True, remove_www=True): 1037 """ 1038 Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the 1039 rest of the URL. 1040 """ 1041 parsed_url = urlparse(url.lower()) 1042 if parsed_url: 1043 if remove_scheme: 1044 parsed_url = parsed_url._replace(scheme="") 1045 if remove_www: 1046 netloc = re.sub(r"^www\.", "", parsed_url.netloc) 1047 parsed_url = parsed_url._replace(netloc=netloc) 1048 1049 url = re.sub(r"[^0-9a-z]+", "_", urlunparse(parsed_url).strip("/")) 1050 else: 1051 # Unable to parse URL; use regex 1052 if remove_scheme: 1053 url = re.sub(r"^https?://", "", url) 1054 if remove_www: 1055 if not remove_scheme: 1056 scheme = re.match(r"^https?://", url).group() 1057 temp_url = re.sub(r"^https?://", "", url) 1058 url = scheme + re.sub(r"^www\.", "", temp_url) 1059 else: 1060 url = re.sub(r"^www\.", "", url) 1061 1062 url = re.sub(r"[^0-9a-z]+", "_", url.lower().strip("/")) 1063 1064 return hashlib.blake2b(url.encode("utf-8"), digest_size=24).hexdigest() 1065 1066 1067def split_urls(url_string, allowed_schemes=None): 1068 """ 1069 Split URL text by \n and commas. 1070 1071 4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs 1072 and also check for commas within URLs using schemes. 1073 1074 Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com); 1075 this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail). 1076 """ 1077 if allowed_schemes is None: 1078 allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://') 1079 potential_urls = [] 1080 # Split the text by \n 1081 for line in url_string.split('\n'): 1082 # Handle commas that may exist within URLs 1083 parts = line.split(',') 1084 recombined_url = "" 1085 for part in parts: 1086 if part.startswith(allowed_schemes): # Other schemes exist 1087 # New URL start detected 1088 if recombined_url: 1089 # Already have a URL, add to list 1090 potential_urls.append(recombined_url) 1091 # Start new URL 1092 recombined_url = part 1093 elif part: 1094 if recombined_url: 1095 # Add to existing URL 1096 recombined_url += "," + part 1097 else: 1098 # No existing URL, start new 1099 recombined_url = part 1100 else: 1101 # Ignore empty strings 1102 pass 1103 if recombined_url: 1104 # Add any remaining URL 1105 potential_urls.append(recombined_url) 1106 return potential_urls 1107 1108 1109def folder_size(path='.'): 1110 """ 1111 Get the size of a folder using os.scandir for efficiency 1112 """ 1113 total = 0 1114 for entry in os.scandir(path): 1115 if entry.is_file(): 1116 total += entry.stat().st_size 1117 elif entry.is_dir(): 1118 total += folder_size(entry.path) 1119 return total
36def init_datasource(database, logger, queue, name): 37 """ 38 Initialize data source 39 40 Queues jobs to scrape the boards that were configured to be scraped in the 41 4CAT configuration file. If none were configured, nothing happens. 42 43 :param Database database: Database connection instance 44 :param Logger logger: Log handler 45 :param JobQueue queue: Job Queue instance 46 :param string name: ID of datasource that is being initialised 47 """ 48 pass
Initialize data source
Queues jobs to scrape the boards that were configured to be scraped in the 4CAT configuration file. If none were configured, nothing happens.
Parameters
- Database database: Database connection instance
- Logger logger: Log handler
- JobQueue queue: Job Queue instance
- string name: ID of datasource that is being initialised
86def sniff_encoding(file): 87 """ 88 Determine encoding from raw file bytes 89 90 Currently only distinguishes UTF-8 and UTF-8 with BOM 91 92 :param file: 93 :return: 94 """ 95 if type(file) == bytearray: 96 maybe_bom = file[:3] 97 elif hasattr(file, "getbuffer"): 98 buffer = file.getbuffer() 99 maybe_bom = buffer[:3].tobytes() 100 elif hasattr(file, "peek"): 101 buffer = file.peek(32) 102 maybe_bom = buffer[:3] 103 else: 104 maybe_bom = False 105 106 return "utf-8-sig" if maybe_bom == b"\xef\xbb\xbf" else "utf-8"
Determine encoding from raw file bytes
Currently only distinguishes UTF-8 and UTF-8 with BOM
Parameters
- file:
Returns
108def sniff_csv_dialect(csv_input): 109 """ 110 Determine CSV dialect for an input stream 111 112 :param csv_input: Input stream 113 :return tuple: Tuple: Dialect object and a boolean representing whether 114 the CSV file seems to have a header 115 """ 116 encoding = sniff_encoding(csv_input) 117 if type(csv_input) is io.TextIOWrapper: 118 wrapped_input = csv_input 119 else: 120 wrapped_input = io.TextIOWrapper(csv_input, encoding=encoding) 121 wrapped_input.seek(0) 122 sample = wrapped_input.read(1024 * 1024) 123 wrapped_input.seek(0) 124 has_header = csv.Sniffer().has_header(sample) 125 dialect = csv.Sniffer().sniff(sample, delimiters=(",", ";", "\t")) 126 127 return dialect, has_header
Determine CSV dialect for an input stream
Parameters
- csv_input: Input stream
Returns
Dialect object and a boolean representing whether the CSV file seems to have a header
130def get_git_branch(): 131 """ 132 Get current git branch 133 134 If the 4CAT root folder is a git repository, this function will return the 135 name of the currently checked-out branch. If the folder is not a git 136 repository or git is not installed an empty string is returned. 137 """ 138 try: 139 root_dir = str(config.get('PATH_ROOT').resolve()) 140 branch = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} branch --show-current"), stdout=subprocess.PIPE) 141 if branch.returncode != 0: 142 raise ValueError() 143 branch_name = branch.stdout.decode("utf-8").strip() 144 if not branch_name: 145 # Check for detached HEAD state 146 # Most likely occuring because of checking out release tags (which are not branches) or commits 147 head_status = subprocess.run(shlex.split(f"git -C {shlex.quote(root_dir)} status"), stdout=subprocess.PIPE) 148 if head_status.returncode == 0: 149 for line in head_status.stdout.decode("utf-8").split("\n"): 150 if "HEAD detached at" in line: 151 branch_name = line.split("/")[-1] if "/" in line else line.split(" ")[-1] 152 return branch_name 153 except (subprocess.SubprocessError, ValueError, FileNotFoundError): 154 return ""
Get current git branch
If the 4CAT root folder is a git repository, this function will return the name of the currently checked-out branch. If the folder is not a git repository or git is not installed an empty string is returned.
157def get_software_commit(worker=None): 158 """ 159 Get current 4CAT git commit hash 160 161 Use `get_software_version()` instead if you need the release version 162 number rather than the precise commit hash. 163 164 If no version file is available, run `git show` to test if there is a git 165 repository in the 4CAT root folder, and if so, what commit is currently 166 checked out in it. 167 168 For extensions, get the repository information for that extension, or if 169 the extension is not a git repository, return empty data. 170 171 :param BasicWorker processor: Worker to get commit for. If not given, get 172 version information for the main 4CAT installation. 173 174 :return tuple: 4CAT git commit hash, repository name 175 """ 176 # try git command line within the 4CAT root folder 177 # if it is a checked-out git repository, it will tell us the hash of 178 # the currently checked-out commit 179 180 # path has no Path.relative()... 181 relative_filepath = Path(re.sub(r"^[/\\]+", "", worker.filepath)).parent 182 try: 183 # if extension, go to the extension file's path 184 # we will run git here - if it is not its own repository, we have no 185 # useful version info (since the extension is by definition not in the 186 # main 4CAT repository) and will return an empty value 187 if worker and worker.is_extension: 188 working_dir = str(config.get("PATH_ROOT").joinpath(relative_filepath).resolve()) 189 # check if we are in the extensions' own repo or 4CAT's 190 git_cmd = f"git -C {shlex.quote(working_dir)} rev-parse --show-toplevel" 191 repo_level = subprocess.run(shlex.split(git_cmd), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 192 if Path(repo_level.stdout.decode("utf-8")) == config.get("PATH_ROOT"): 193 # not its own repository 194 return ("", "") 195 196 else: 197 working_dir = str(config.get("PATH_ROOT").resolve()) 198 199 show = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} show"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 200 if show.returncode != 0: 201 raise ValueError() 202 commit = show.stdout.decode("utf-8").split("\n")[0].split(" ")[1] 203 204 # now get the repository the commit belongs to, if we can 205 origin = subprocess.run(shlex.split(f"git -C {shlex.quote(working_dir)} config --get remote.origin.url"), stderr=subprocess.PIPE, stdout=subprocess.PIPE) 206 if origin.returncode != 0 or not origin.stdout: 207 raise ValueError() 208 repository = origin.stdout.decode("utf-8").strip() 209 if repository.endswith(".git"): 210 repository = repository[:-4] 211 212 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e: 213 return ("", "") 214 215 return (commit, repository)
Get current 4CAT git commit hash
Use get_software_version()
instead if you need the release version
number rather than the precise commit hash.
If no version file is available, run git show
to test if there is a git
repository in the 4CAT root folder, and if so, what commit is currently
checked out in it.
For extensions, get the repository information for that extension, or if the extension is not a git repository, return empty data.
Parameters
- BasicWorker processor: Worker to get commit for. If not given, get version information for the main 4CAT installation.
Returns
4CAT git commit hash, repository name
217def get_software_version(): 218 """ 219 Get current 4CAT version 220 221 This is the actual software version, i.e. not the commit hash (see 222 `get_software_hash()` for that). The current version is stored in a file 223 with a canonical location: if the file doesn't exist, an empty string is 224 returned. 225 226 :return str: Software version, for example `1.37`. 227 """ 228 current_version_file = config.get("PATH_ROOT").joinpath("config/.current-version") 229 if not current_version_file.exists(): 230 return "" 231 232 with current_version_file.open() as infile: 233 return infile.readline().strip()
Get current 4CAT version
This is the actual software version, i.e. not the commit hash (see
get_software_hash()
for that). The current version is stored in a file
with a canonical location: if the file doesn't exist, an empty string is
returned.
Returns
Software version, for example
1.37
.
235def get_github_version(timeout=5): 236 """ 237 Get latest release tag version from GitHub 238 239 Will raise a ValueError if it cannot retrieve information from GitHub. 240 241 :param int timeout: Timeout in seconds for HTTP request 242 243 :return tuple: Version, e.g. `1.26`, and release URL. 244 """ 245 repo_url = config.get("4cat.github_url") 246 if not repo_url.endswith("/"): 247 repo_url += "/" 248 249 repo_id = re.sub(r"(\.git)?/?$", "", re.sub(r"^https?://(www\.)?github\.com/", "", repo_url)) 250 251 api_url = "https://api.github.com/repos/%s/releases/latest" % repo_id 252 response = requests.get(api_url, timeout=timeout) 253 response = response.json() 254 if response.get("message") == "Not Found": 255 raise ValueError("Invalid GitHub URL or repository name") 256 257 latest_tag = response.get("tag_name", "unknown") 258 if latest_tag.startswith("v"): 259 latest_tag = re.sub(r"^v", "", latest_tag) 260 261 return (latest_tag, response.get("html_url"))
Get latest release tag version from GitHub
Will raise a ValueError if it cannot retrieve information from GitHub.
Parameters
- int timeout: Timeout in seconds for HTTP request
Returns
Version, e.g.
1.26
, and release URL.
263def get_ffmpeg_version(ffmpeg_path): 264 """ 265 Determine ffmpeg version 266 267 This can be necessary when using commands that change name between versions. 268 269 :param ffmpeg_path: ffmpeg executable path 270 :return packaging.version: Comparable ersion 271 """ 272 command = [ffmpeg_path, "-version"] 273 ffmpeg_version = subprocess.run(command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, 274 stderr=subprocess.PIPE) 275 276 ffmpeg_version = ffmpeg_version.stdout.decode("utf-8").split("\n")[0].strip().split(" version ")[1] 277 ffmpeg_version = re.split(r"[^0-9.]", ffmpeg_version)[0] 278 279 return version.parse(ffmpeg_version)
Determine ffmpeg version
This can be necessary when using commands that change name between versions.
Parameters
- ffmpeg_path: ffmpeg executable path
Returns
Comparable ersion
282def find_extensions(): 283 """ 284 Find 4CAT extensions and load their metadata 285 286 Looks for subfolders of the extension folder, and loads additional metadata 287 where available. 288 289 :return tuple: A tuple with two items; the extensions, as an ID -> metadata 290 dictionary, and a list of (str) errors encountered while loading 291 """ 292 extension_path = config.get("PATH_ROOT").joinpath("extensions") 293 errors = [] 294 if not extension_path.exists() or not extension_path.is_dir(): 295 return [], None 296 297 # each folder in the extensions folder is an extension 298 extensions = { 299 extension.name: { 300 "name": extension.name, 301 "version": "", 302 "url": "", 303 "git_url": "", 304 "is_git": False 305 } for extension in sorted(os.scandir(extension_path), key=lambda x: x.name) if extension.is_dir() 306 } 307 308 # collect metadata for extensions 309 allowed_metadata_keys = ("name", "version", "url") 310 for extension in extensions: 311 extension_folder = extension_path.joinpath(extension) 312 metadata_file = extension_folder.joinpath("metadata.json") 313 if metadata_file.exists(): 314 with metadata_file.open() as infile: 315 try: 316 metadata = json.load(infile) 317 extensions[extension].update({k: metadata[k] for k in metadata if k in allowed_metadata_keys}) 318 except (TypeError, ValueError) as e: 319 errors.append(f"Error reading metadata file for extension '{extension}' ({e})") 320 continue 321 322 extensions[extension]["is_git"] = extension_folder.joinpath(".git/HEAD").exists() 323 if extensions[extension]["is_git"]: 324 # try to get remote URL 325 try: 326 extension_root = str(extension_folder.resolve()) 327 origin = subprocess.run(shlex.split(f"git -C {shlex.quote(extension_root)} config --get remote.origin.url"), stderr=subprocess.PIPE, 328 stdout=subprocess.PIPE) 329 if origin.returncode != 0 or not origin.stdout: 330 raise ValueError() 331 repository = origin.stdout.decode("utf-8").strip() 332 if repository.endswith(".git") and "github.com" in repository: 333 # use repo URL 334 repository = repository[:-4] 335 extensions[extension]["git_url"] = repository 336 except (subprocess.SubprocessError, IndexError, TypeError, ValueError, FileNotFoundError) as e: 337 print(e) 338 pass 339 340 return extensions, errors
Find 4CAT extensions and load their metadata
Looks for subfolders of the extension folder, and loads additional metadata where available.
Returns
A tuple with two items; the extensions, as an ID -> metadata dictionary, and a list of (str) errors encountered while loading
343def convert_to_int(value, default=0): 344 """ 345 Convert a value to an integer, with a fallback 346 347 The fallback is used if an Error is thrown during converstion to int. 348 This is a convenience function, but beats putting try-catches everywhere 349 we're using user input as an integer. 350 351 :param value: Value to convert 352 :param int default: Default value, if conversion not possible 353 :return int: Converted value 354 """ 355 try: 356 return int(value) 357 except (ValueError, TypeError): 358 return default
Convert a value to an integer, with a fallback
The fallback is used if an Error is thrown during converstion to int. This is a convenience function, but beats putting try-catches everywhere we're using user input as an integer.
Parameters
- value: Value to convert
- int default: Default value, if conversion not possible
Returns
Converted value
361def timify_long(number): 362 """ 363 Make a number look like an indication of time 364 365 :param number: Number to convert. If the number is larger than the current 366 UNIX timestamp, decrease by that amount 367 :return str: A nice, string, for example `1 month, 3 weeks, 4 hours and 2 minutes` 368 """ 369 number = int(number) 370 371 components = [] 372 if number > time.time(): 373 number = time.time() - number 374 375 month_length = 30.42 * 86400 376 months = math.floor(number / month_length) 377 if months: 378 components.append("%i month%s" % (months, "s" if months != 1 else "")) 379 number -= (months * month_length) 380 381 week_length = 7 * 86400 382 weeks = math.floor(number / week_length) 383 if weeks: 384 components.append("%i week%s" % (weeks, "s" if weeks != 1 else "")) 385 number -= (weeks * week_length) 386 387 day_length = 86400 388 days = math.floor(number / day_length) 389 if days: 390 components.append("%i day%s" % (days, "s" if days != 1 else "")) 391 number -= (days * day_length) 392 393 hour_length = 3600 394 hours = math.floor(number / hour_length) 395 if hours: 396 components.append("%i hour%s" % (hours, "s" if hours != 1 else "")) 397 number -= (hours * hour_length) 398 399 minute_length = 60 400 minutes = math.floor(number / minute_length) 401 if minutes: 402 components.append("%i minute%s" % (minutes, "s" if minutes != 1 else "")) 403 404 if not components: 405 components.append("less than a minute") 406 407 last_str = components.pop() 408 time_str = "" 409 if components: 410 time_str = ", ".join(components) 411 time_str += " and " 412 413 return time_str + last_str
Make a number look like an indication of time
Parameters
- number: Number to convert. If the number is larger than the current UNIX timestamp, decrease by that amount
Returns
A nice, string, for example
1 month, 3 weeks, 4 hours and 2 minutes
415def andify(items): 416 """ 417 Format a list of items for use in text 418 419 Returns a comma-separated list, the last item preceded by "and" 420 421 :param items: Iterable list 422 :return str: Formatted string 423 """ 424 if len(items) == 0: 425 return "" 426 elif len(items) == 1: 427 return str(items[1]) 428 429 result = f" and {items.pop()}" 430 return ", ".join([str(item) for item in items]) + result
Format a list of items for use in text
Returns a comma-separated list, the last item preceded by "and"
Parameters
- items: Iterable list
Returns
Formatted string
433def hash_file(image_file, hash_type="file-hash"): 434 """ 435 Generate an image hash 436 437 :param Path image_file: Image file to hash 438 :param str hash_type: Hash type, one of `file-hash`, `colorhash`, 439 `phash`, `average_hash`, `dhash` 440 :return str: Hexadecimal hash value 441 """ 442 if not image_file.exists(): 443 raise FileNotFoundError() 444 445 if hash_type == "file-hash": 446 hasher = hashlib.sha1() 447 448 # Open the file in binary mode 449 with image_file.open("rb") as infile: 450 # Read and update hash in chunks to handle large files 451 while chunk := infile.read(1024): 452 hasher.update(chunk) 453 454 return hasher.hexdigest() 455 456 elif hash_type in ("colorhash", "phash", "average_hash", "dhash"): 457 image = Image.open(image_file) 458 459 return str(getattr(imagehash, hash_type)(image)) 460 461 else: 462 raise NotImplementedError(f"Unknown hash type '{hash_type}'")
Generate an image hash
Parameters
- Path image_file: Image file to hash
- str hash_type: Hash type, one of
file-hash
,colorhash
,phash
,average_hash
,dhash
Returns
Hexadecimal hash value
464def get_yt_compatible_ids(yt_ids): 465 """ 466 :param yt_ids list, a list of strings 467 :returns list, a ist of joined strings in pairs of 50 468 469 Takes a list of IDs and returns list of joined strings 470 in pairs of fifty. This should be done for the YouTube API 471 that requires a comma-separated string and can only return 472 max fifty results. 473 """ 474 475 # If there's only one item, return a single list item 476 if isinstance(yt_ids, str): 477 return [yt_ids] 478 479 ids = [] 480 last_i = 0 481 for i, yt_id in enumerate(yt_ids): 482 483 # Add a joined string per fifty videos 484 if i % 50 == 0 and i != 0: 485 ids_string = ",".join(yt_ids[last_i:i]) 486 ids.append(ids_string) 487 last_i = i 488 489 # If the end of the list is reached, add the last data 490 elif i == (len(yt_ids) - 1): 491 ids_string = ",".join(yt_ids[last_i:i]) 492 ids.append(ids_string) 493 494 return ids
:param yt_ids list, a list of strings :returns list, a ist of joined strings in pairs of 50
Takes a list of IDs and returns list of joined strings in pairs of fifty. This should be done for the YouTube API that requires a comma-separated string and can only return max fifty results.
497def get_4cat_canvas(path, width, height, header=None, footer="made with 4CAT", fontsize_normal=None, 498 fontsize_small=None, fontsize_large=None): 499 """ 500 Get a standard SVG canvas to draw 4CAT graphs to 501 502 Adds a border, footer, header, and some basic text styling 503 504 :param path: The path where the SVG graph will be saved 505 :param width: Width of the canvas 506 :param height: Height of the canvas 507 :param header: Header, if necessary to draw 508 :param footer: Footer text, if necessary to draw. Defaults to shameless 509 4CAT advertisement. 510 :param fontsize_normal: Font size of normal text 511 :param fontsize_small: Font size of small text (e.g. footer) 512 :param fontsize_large: Font size of large text (e.g. header) 513 :return SVG: SVG canvas (via svgwrite) that can be drawn to 514 """ 515 from svgwrite.container import SVG, Hyperlink 516 from svgwrite.drawing import Drawing 517 from svgwrite.shapes import Rect 518 from svgwrite.text import Text 519 520 if fontsize_normal is None: 521 fontsize_normal = width / 75 522 523 if fontsize_small is None: 524 fontsize_small = width / 100 525 526 if fontsize_large is None: 527 fontsize_large = width / 50 528 529 # instantiate with border and white background 530 canvas = Drawing(str(path), size=(width, height), style="font-family:monospace;font-size:%ipx" % fontsize_normal) 531 canvas.add(Rect(insert=(0, 0), size=(width, height), stroke="#000", stroke_width=2, fill="#FFF")) 532 533 # header 534 if header: 535 header_shape = SVG(insert=(0, 0), size=("100%", fontsize_large * 2)) 536 header_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 537 header_shape.add( 538 Text(insert=("50%", "50%"), text=header, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 539 style="font-size:%ipx" % fontsize_large)) 540 canvas.add(header_shape) 541 542 # footer (i.e. 4cat banner) 543 if footer: 544 footersize = (fontsize_small * len(footer) * 0.7, fontsize_small * 2) 545 footer_shape = SVG(insert=(width - footersize[0], height - footersize[1]), size=footersize) 546 footer_shape.add(Rect(insert=(0, 0), size=("100%", "100%"), fill="#000")) 547 link = Hyperlink(href="https://4cat.nl") 548 link.add( 549 Text(insert=("50%", "50%"), text=footer, dominant_baseline="middle", text_anchor="middle", fill="#FFF", 550 style="font-size:%ipx" % fontsize_small)) 551 footer_shape.add(link) 552 canvas.add(footer_shape) 553 554 return canvas
Get a standard SVG canvas to draw 4CAT graphs to
Adds a border, footer, header, and some basic text styling
Parameters
- path: The path where the SVG graph will be saved
- width: Width of the canvas
- height: Height of the canvas
- header: Header, if necessary to draw
- footer: Footer text, if necessary to draw. Defaults to shameless 4CAT advertisement.
- fontsize_normal: Font size of normal text
- fontsize_small: Font size of small text (e.g. footer)
- fontsize_large: Font size of large text (e.g. header)
Returns
SVG canvas (via svgwrite) that can be drawn to
557def call_api(action, payload=None, wait_for_response=True): 558 """ 559 Send message to server 560 561 Calls the internal API and returns interpreted response. 562 563 :param str action: API action 564 :param payload: API payload 565 :param bool wait_for_response: Wait for response? If not close connection 566 immediately after sending data. 567 568 :return: API response, or timeout message in case of timeout 569 """ 570 connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 571 connection.settimeout(15) 572 connection.connect((config.get('API_HOST'), config.get('API_PORT'))) 573 574 msg = json.dumps({"request": action, "payload": payload}) 575 connection.sendall(msg.encode("ascii", "ignore")) 576 577 if wait_for_response: 578 try: 579 response = "" 580 while True: 581 bytes = connection.recv(2048) 582 if not bytes: 583 break 584 585 response += bytes.decode("ascii", "ignore") 586 except (socket.timeout, TimeoutError): 587 response = "(Connection timed out)" 588 589 try: 590 connection.shutdown(socket.SHUT_RDWR) 591 except OSError: 592 # already shut down automatically 593 pass 594 connection.close() 595 596 try: 597 return json.loads(response) if wait_for_response else None 598 except json.JSONDecodeError: 599 return response
Send message to server
Calls the internal API and returns interpreted response.
Parameters
- str action: API action
- payload: API payload
- bool wait_for_response: Wait for response? If not close connection immediately after sending data.
Returns
API response, or timeout message in case of timeout
602def get_interval_descriptor(item, interval): 603 """ 604 Get interval descriptor based on timestamp 605 606 :param dict item: Item to generate descriptor for, should have a 607 "timestamp" key 608 :param str interval: Interval, one of "all", "overall", "year", 609 "month", "week", "day" 610 :return str: Interval descriptor, e.g. "overall", "2020", "2020-08", 611 "2020-43", "2020-08-01" 612 """ 613 if interval in ("all", "overall"): 614 return interval 615 616 if "timestamp" not in item: 617 raise ValueError("No date available for item in dataset") 618 619 # Catch cases where a custom timestamp has an epoch integer as value. 620 try: 621 timestamp = int(item["timestamp"]) 622 try: 623 timestamp = datetime.datetime.fromtimestamp(timestamp) 624 except (ValueError, TypeError) as e: 625 raise ValueError("Invalid timestamp '%s'" % str(item["timestamp"])) 626 except: 627 try: 628 timestamp = datetime.datetime.strptime(item["timestamp"], "%Y-%m-%d %H:%M:%S") 629 except (ValueError, TypeError) as e: 630 raise ValueError("Invalid date '%s'" % str(item["timestamp"])) 631 632 if interval == "year": 633 return str(timestamp.year) 634 elif interval == "month": 635 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) 636 elif interval == "week": 637 return str(timestamp.isocalendar()[0]) + "-" + str(timestamp.isocalendar()[1]).zfill(2) 638 elif interval == "hour": 639 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 640 2) + " " + str(timestamp.hour).zfill(2) 641 elif interval == "minute": 642 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill( 643 2) + " " + str(timestamp.hour).zfill(2) + ":" + str(timestamp.minute).zfill(2) 644 else: 645 return str(timestamp.year) + "-" + str(timestamp.month).zfill(2) + "-" + str(timestamp.day).zfill(2)
Get interval descriptor based on timestamp
Parameters
- dict item: Item to generate descriptor for, should have a "timestamp" key
- str interval: Interval, one of "all", "overall", "year", "month", "week", "day"
Returns
Interval descriptor, e.g. "overall", "2020", "2020-08", "2020-43", "2020-08-01"
648def pad_interval(intervals, first_interval=None, last_interval=None): 649 """ 650 Pad an interval so all intermediate intervals are filled 651 652 Warning, ugly code (PRs very welcome) 653 654 :param dict intervals: A dictionary, with dates (YYYY{-MM}{-DD}) as keys 655 and a numerical value. 656 :param first_interval: 657 :param last_interval: 658 :return: 659 """ 660 missing = 0 661 test_key = list(intervals.keys())[0] 662 663 # first determine the boundaries of the interval 664 # these may be passed as parameters, or they can be inferred from the 665 # interval given 666 if first_interval: 667 first_interval = str(first_interval) 668 first_year = int(first_interval[0:4]) 669 if len(first_interval) > 4: 670 first_month = int(first_interval[5:7]) 671 if len(first_interval) > 7: 672 first_day = int(first_interval[8:10]) 673 if len(first_interval) > 10: 674 first_hour = int(first_interval[11:13]) 675 if len(first_interval) > 13: 676 first_minute = int(first_interval[14:16]) 677 678 else: 679 first_year = min([int(i[0:4]) for i in intervals]) 680 if len(test_key) > 4: 681 first_month = min([int(i[5:7]) for i in intervals if int(i[0:4]) == first_year]) 682 if len(test_key) > 7: 683 first_day = min( 684 [int(i[8:10]) for i in intervals if int(i[0:4]) == first_year and int(i[5:7]) == first_month]) 685 if len(test_key) > 10: 686 first_hour = min( 687 [int(i[11:13]) for i in intervals if 688 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day]) 689 if len(test_key) > 13: 690 first_minute = min( 691 [int(i[14:16]) for i in intervals if 692 int(i[0:4]) == first_year and int(i[5:7]) == first_month and int(i[8:10]) == first_day and int( 693 i[11:13]) == first_hour]) 694 695 if last_interval: 696 last_interval = str(last_interval) 697 last_year = int(last_interval[0:4]) 698 if len(last_interval) > 4: 699 last_month = int(last_interval[5:7]) 700 if len(last_interval) > 7: 701 last_day = int(last_interval[8:10]) 702 if len(last_interval) > 10: 703 last_hour = int(last_interval[11:13]) 704 if len(last_interval) > 13: 705 last_minute = int(last_interval[14:16]) 706 else: 707 last_year = max([int(i[0:4]) for i in intervals]) 708 if len(test_key) > 4: 709 last_month = max([int(i[5:7]) for i in intervals if int(i[0:4]) == last_year]) 710 if len(test_key) > 7: 711 last_day = max( 712 [int(i[8:10]) for i in intervals if int(i[0:4]) == last_year and int(i[5:7]) == last_month]) 713 if len(test_key) > 10: 714 last_hour = max( 715 [int(i[11:13]) for i in intervals if 716 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day]) 717 if len(test_key) > 13: 718 last_minute = max( 719 [int(i[14:16]) for i in intervals if 720 int(i[0:4]) == last_year and int(i[5:7]) == last_month and int(i[8:10]) == last_day and int( 721 i[11:13]) == last_hour]) 722 723 has_month = re.match(r"^[0-9]{4}-[0-9]", test_key) 724 has_day = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2}", test_key) 725 has_hour = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}", test_key) 726 has_minute = re.match(r"^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}", test_key) 727 728 all_intervals = [] 729 for year in range(first_year, last_year + 1): 730 year_interval = str(year) 731 732 if not has_month: 733 all_intervals.append(year_interval) 734 continue 735 736 start_month = first_month if year == first_year else 1 737 end_month = last_month if year == last_year else 12 738 for month in range(start_month, end_month + 1): 739 month_interval = year_interval + "-" + str(month).zfill(2) 740 741 if not has_day: 742 all_intervals.append(month_interval) 743 continue 744 745 start_day = first_day if all((year == first_year, month == first_month)) else 1 746 end_day = last_day if all((year == last_year, month == last_month)) else monthrange(year, month)[1] 747 for day in range(start_day, end_day + 1): 748 day_interval = month_interval + "-" + str(day).zfill(2) 749 750 if not has_hour: 751 all_intervals.append(day_interval) 752 continue 753 754 start_hour = first_hour if all((year == first_year, month == first_month, day == first_day)) else 0 755 end_hour = last_hour if all((year == last_year, month == last_month, day == last_day)) else 23 756 for hour in range(start_hour, end_hour + 1): 757 hour_interval = day_interval + " " + str(hour).zfill(2) 758 759 if not has_minute: 760 all_intervals.append(hour_interval) 761 continue 762 763 start_minute = first_minute if all( 764 (year == first_year, month == first_month, day == first_day, hour == first_hour)) else 0 765 end_minute = last_minute if all( 766 (year == last_year, month == last_month, day == last_day, hour == last_hour)) else 59 767 768 for minute in range(start_minute, end_minute + 1): 769 minute_interval = hour_interval + ":" + str(minute).zfill(2) 770 all_intervals.append(minute_interval) 771 772 for interval in all_intervals: 773 if interval not in intervals: 774 intervals[interval] = 0 775 missing += 1 776 777 # sort while we're at it 778 intervals = {key: intervals[key] for key in sorted(intervals)} 779 780 return missing, intervals
Pad an interval so all intermediate intervals are filled
Warning, ugly code (PRs very welcome)
Parameters
- dict intervals: A dictionary, with dates (YYYY{-MM}{-DD}) as keys and a numerical value.
- first_interval:
- last_interval:
Returns
783def remove_nuls(value): 784 """ 785 Remove \0 from a value 786 787 The CSV library cries about a null byte when it encounters one :( :( :( 788 poor little csv cannot handle a tiny little null byte 789 790 So remove them from the data because they should not occur in utf-8 data 791 anyway. 792 793 :param value: Value to remove nulls from. For dictionaries, sets, tuples 794 and lists all items are parsed recursively. 795 :return value: Cleaned value 796 """ 797 if type(value) is dict: 798 for field in value: 799 value[field] = remove_nuls(value[field]) 800 elif type(value) is list: 801 value = [remove_nuls(item) for item in value] 802 elif type(value) is tuple: 803 value = tuple([remove_nuls(item) for item in value]) 804 elif type(value) is set: 805 value = set([remove_nuls(item) for item in value]) 806 elif type(value) is str: 807 value = value.replace("\0", "") 808 809 return value
Remove from a value
The CSV library cries about a null byte when it encounters one :( :( :( poor little csv cannot handle a tiny little null byte
So remove them from the data because they should not occur in utf-8 data anyway.
Parameters
- value: Value to remove nulls from. For dictionaries, sets, tuples and lists all items are parsed recursively.
Returns
Cleaned value
812class NullAwareTextIOWrapper(io.TextIOWrapper): 813 """ 814 TextIOWrapper that skips null bytes 815 816 This can be used as a file reader that silently discards any null bytes it 817 encounters. 818 """ 819 820 def __next__(self): 821 value = super().__next__() 822 return remove_nuls(value)
TextIOWrapper that skips null bytes
This can be used as a file reader that silently discards any null bytes it encounters.
825class HashCache: 826 """ 827 Simple cache handler to cache hashed values 828 829 Avoids having to calculate a hash for values that have been hashed before 830 """ 831 832 def __init__(self, hasher): 833 self.hash_cache = {} 834 self.hasher = hasher 835 836 def update_cache(self, value): 837 """ 838 Checks the hash_cache to see if the value has been cached previously, 839 updates the hash_cache if needed, and returns the hashed value. 840 """ 841 # value = str(value) 842 if value not in self.hash_cache: 843 author_hasher = self.hasher.copy() 844 author_hasher.update(str(value).encode("utf-8")) 845 self.hash_cache[value] = author_hasher.hexdigest() 846 del author_hasher 847 return self.hash_cache[value]
Simple cache handler to cache hashed values
Avoids having to calculate a hash for values that have been hashed before
836 def update_cache(self, value): 837 """ 838 Checks the hash_cache to see if the value has been cached previously, 839 updates the hash_cache if needed, and returns the hashed value. 840 """ 841 # value = str(value) 842 if value not in self.hash_cache: 843 author_hasher = self.hasher.copy() 844 author_hasher.update(str(value).encode("utf-8")) 845 self.hash_cache[value] = author_hasher.hexdigest() 846 del author_hasher 847 return self.hash_cache[value]
Checks the hash_cache to see if the value has been cached previously, updates the hash_cache if needed, and returns the hashed value.
850def dict_search_and_update(item, keyword_matches, function): 851 """ 852 Filter fields in an object recursively 853 854 Apply a function to every item and sub item of a dictionary if the key 855 contains one of the provided match terms. 856 857 Function loops through a dictionary or list and compares dictionary keys to 858 the strings defined by keyword_matches. It then applies the change_function 859 to corresponding values. 860 861 Note: if a matching term is found, all nested values will have the function 862 applied to them. e.g., all these values would be changed even those with 863 not_key_match: 864 865 {'key_match' : 'changed', 866 'also_key_match' : {'not_key_match' : 'but_value_still_changed'}, 867 'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]} 868 869 This is a comprehensive (and expensive) approach to updating a dictionary. 870 IF a dictionary structure is known, a better solution would be to update 871 using specific keys. 872 873 :param Dict/List item: dictionary/list/json to loop through 874 :param String keyword_matches: list of strings that will be matched to 875 dictionary keys. Can contain wildcards which are matched using fnmatch. 876 :param Function function: function appled to all values of any items 877 nested under a matching key 878 879 :return Dict/List: Copy of original item, but filtered 880 """ 881 882 def loop_helper_function(d_or_l, match_terms, change_function): 883 """ 884 Recursive helper function that updates item in place 885 """ 886 if isinstance(d_or_l, dict): 887 # Iterate through dictionary 888 for key, value in iter(d_or_l.items()): 889 if match_terms == 'True' or any([fnmatch.fnmatch(key, match_term) for match_term in match_terms]): 890 # Match found; apply function to all items and sub-items 891 if isinstance(value, (list, dict)): 892 # Pass item through again with match_terms = True 893 loop_helper_function(value, 'True', change_function) 894 elif value is None: 895 pass 896 else: 897 # Update the value 898 d_or_l[key] = change_function(value) 899 elif isinstance(value, (list, dict)): 900 # Continue search 901 loop_helper_function(value, match_terms, change_function) 902 elif isinstance(d_or_l, list): 903 # Iterate through list 904 for n, value in enumerate(d_or_l): 905 if isinstance(value, (list, dict)): 906 # Continue search 907 loop_helper_function(value, match_terms, change_function) 908 elif match_terms == 'True': 909 # List item nested in matching 910 d_or_l[n] = change_function(value) 911 else: 912 raise Exception('Must pass list or dictionary') 913 914 # Lowercase keyword_matches 915 keyword_matches = [keyword.lower() for keyword in keyword_matches] 916 917 # Create deepcopy and return new item 918 temp_item = copy.deepcopy(item) 919 loop_helper_function(temp_item, keyword_matches, function) 920 return temp_item
Filter fields in an object recursively
Apply a function to every item and sub item of a dictionary if the key contains one of the provided match terms.
Function loops through a dictionary or list and compares dictionary keys to the strings defined by keyword_matches. It then applies the change_function to corresponding values.
Note: if a matching term is found, all nested values will have the function applied to them. e.g., all these values would be changed even those with not_key_match:
{'key_match' : 'changed', 'also_key_match' : {'not_key_match' : 'but_value_still_changed'}, 'another_key_match': ['this_is_changed', 'and_this', {'not_key_match' : 'even_this_is_changed'}]}
This is a comprehensive (and expensive) approach to updating a dictionary. IF a dictionary structure is known, a better solution would be to update using specific keys.
Parameters
- Dict/List item: dictionary/list/json to loop through
- String keyword_matches: list of strings that will be matched to dictionary keys. Can contain wildcards which are matched using fnmatch.
- Function function: function appled to all values of any items nested under a matching key
Returns
Copy of original item, but filtered
923def get_last_line(filepath): 924 """ 925 Seeks from end of file for '\n' and returns that line 926 927 :param str filepath: path to file 928 :return str: last line of file 929 """ 930 with open(filepath, "rb") as file: 931 try: 932 # start at the end of file 933 file.seek(-2, os.SEEK_END) 934 # check if NOT endline i.e. '\n' 935 while file.read(1) != b'\n': 936 # if not '\n', back up two characters and check again 937 file.seek(-2, os.SEEK_CUR) 938 except OSError: 939 file.seek(0) 940 last_line = file.readline().decode() 941 return last_line
Seeks from end of file for ' ' and returns that line
:param str filepath: path to file
:return str: last line of file
953def send_email(recipient, message): 954 """ 955 Send an e-mail using the configured SMTP settings 956 957 Just a thin wrapper around smtplib, so we don't have to repeat ourselves. 958 Exceptions are to be handled outside the function. 959 960 :param list recipient: Recipient e-mail addresses 961 :param MIMEMultipart message: Message to send 962 """ 963 # Create a secure SSL context 964 context = ssl.create_default_context() 965 966 # Decide which connection type 967 with smtplib.SMTP_SSL(config.get('mail.server'), port=config.get('mail.port', 0), context=context) if config.get( 968 'mail.ssl') == 'ssl' else smtplib.SMTP(config.get('mail.server'), 969 port=config.get('mail.port', 0)) as server: 970 if config.get('mail.ssl') == 'tls': 971 # smtplib.SMTP adds TLS context here 972 server.starttls(context=context) 973 974 # Log in 975 if config.get('mail.username') and config.get('mail.password'): 976 server.ehlo() 977 server.login(config.get('mail.username'), config.get('mail.password')) 978 979 # Send message 980 if type(message) == str: 981 server.sendmail(config.get('mail.noreply'), recipient, message) 982 else: 983 server.sendmail(config.get('mail.noreply'), recipient, message.as_string())
Send an e-mail using the configured SMTP settings
Just a thin wrapper around smtplib, so we don't have to repeat ourselves. Exceptions are to be handled outside the function.
Parameters
- list recipient: Recipient e-mail addresses
- MIMEMultipart message: Message to send
986def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.'): 987 """ 988 Return a flattened dictionary where nested dictionary objects are given new 989 keys using the partent key combined using the seperator with the child key. 990 991 Lists will be converted to json strings via json.dumps() 992 993 :param MutableMapping d: Dictionary like object 994 :param str partent_key: The original parent key prepending future nested keys 995 :param str sep: A seperator string used to combine parent and child keys 996 :return dict: A new dictionary with the no nested values 997 """ 998 999 def _flatten_dict_gen(d, parent_key, sep): 1000 for k, v in d.items(): 1001 new_key = parent_key + sep + k if parent_key else k 1002 if isinstance(v, MutableMapping): 1003 yield from flatten_dict(v, new_key, sep=sep).items() 1004 elif isinstance(v, (list, set)): 1005 yield new_key, json.dumps( 1006 [flatten_dict(item, new_key, sep=sep) if isinstance(item, MutableMapping) else item for item in v]) 1007 else: 1008 yield new_key, v 1009 1010 return dict(_flatten_dict_gen(d, parent_key, sep))
Return a flattened dictionary where nested dictionary objects are given new keys using the partent key combined using the seperator with the child key.
Lists will be converted to json strings via json.dumps()
Parameters
- MutableMapping d: Dictionary like object
- str partent_key: The original parent key prepending future nested keys
- str sep: A seperator string used to combine parent and child keys
Returns
A new dictionary with the no nested values
1013def sets_to_lists(d: MutableMapping): 1014 """ 1015 Return a dictionary where all nested sets have been converted to lists. 1016 1017 :param MutableMapping d: Dictionary like object 1018 :return dict: A new dictionary with the no nested sets 1019 """ 1020 1021 def _check_list(l): 1022 return [sets_to_lists(item) if isinstance(item, MutableMapping) else _check_list(item) if isinstance(item, ( 1023 set, list)) else item for item in l] 1024 1025 def _sets_to_lists_gen(d): 1026 for k, v in d.items(): 1027 if isinstance(v, MutableMapping): 1028 yield k, sets_to_lists(v) 1029 elif isinstance(v, (list, set)): 1030 yield k, _check_list(v) 1031 else: 1032 yield k, v 1033 1034 return dict(_sets_to_lists_gen(d))
Return a dictionary where all nested sets have been converted to lists.
Parameters
- MutableMapping d: Dictionary like object
Returns
A new dictionary with the no nested sets
1037def url_to_hash(url, remove_scheme=True, remove_www=True): 1038 """ 1039 Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the 1040 rest of the URL. 1041 """ 1042 parsed_url = urlparse(url.lower()) 1043 if parsed_url: 1044 if remove_scheme: 1045 parsed_url = parsed_url._replace(scheme="") 1046 if remove_www: 1047 netloc = re.sub(r"^www\.", "", parsed_url.netloc) 1048 parsed_url = parsed_url._replace(netloc=netloc) 1049 1050 url = re.sub(r"[^0-9a-z]+", "_", urlunparse(parsed_url).strip("/")) 1051 else: 1052 # Unable to parse URL; use regex 1053 if remove_scheme: 1054 url = re.sub(r"^https?://", "", url) 1055 if remove_www: 1056 if not remove_scheme: 1057 scheme = re.match(r"^https?://", url).group() 1058 temp_url = re.sub(r"^https?://", "", url) 1059 url = scheme + re.sub(r"^www\.", "", temp_url) 1060 else: 1061 url = re.sub(r"^www\.", "", url) 1062 1063 url = re.sub(r"[^0-9a-z]+", "_", url.lower().strip("/")) 1064 1065 return hashlib.blake2b(url.encode("utf-8"), digest_size=24).hexdigest()
Convert a URL to a filename; some URLs are too long to be used as filenames, this keeps the domain and hashes the rest of the URL.
1068def split_urls(url_string, allowed_schemes=None): 1069 """ 1070 Split URL text by \n and commas. 1071 1072 4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs 1073 and also check for commas within URLs using schemes. 1074 1075 Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com); 1076 this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail). 1077 """ 1078 if allowed_schemes is None: 1079 allowed_schemes = ('http://', 'https://', 'ftp://', 'ftps://') 1080 potential_urls = [] 1081 # Split the text by \n 1082 for line in url_string.split('\n'): 1083 # Handle commas that may exist within URLs 1084 parts = line.split(',') 1085 recombined_url = "" 1086 for part in parts: 1087 if part.startswith(allowed_schemes): # Other schemes exist 1088 # New URL start detected 1089 if recombined_url: 1090 # Already have a URL, add to list 1091 potential_urls.append(recombined_url) 1092 # Start new URL 1093 recombined_url = part 1094 elif part: 1095 if recombined_url: 1096 # Add to existing URL 1097 recombined_url += "," + part 1098 else: 1099 # No existing URL, start new 1100 recombined_url = part 1101 else: 1102 # Ignore empty strings 1103 pass 1104 if recombined_url: 1105 # Add any remaining URL 1106 potential_urls.append(recombined_url) 1107 return potential_urls
Split URL text by and commas.
4CAT allows users to input lists by either separating items with a newline or a comma. This function will split URLs and also check for commas within URLs using schemes.
Note: some urls may contain scheme (e.g., https://web.archive.org/web/20250000000000*/http://economist.com); this function will work so long as the inner scheme does not follow a comma (e.g., "http://,https://" would fail).
1110def folder_size(path='.'): 1111 """ 1112 Get the size of a folder using os.scandir for efficiency 1113 """ 1114 total = 0 1115 for entry in os.scandir(path): 1116 if entry.is_file(): 1117 total += entry.stat().st_size 1118 elif entry.is_dir(): 1119 total += folder_size(entry.path) 1120 return total
Get the size of a folder using os.scandir for efficiency