Edit on GitHub

datasources.telegram.search_telegram

Search Telegram via API

   1"""
   2Search Telegram via API
   3"""
   4import traceback
   5import datetime
   6import hashlib
   7import asyncio
   8import json
   9import ural
  10import time
  11import re
  12
  13from pathlib import Path
  14
  15from backend.lib.search import Search
  16from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException, \
  17    QueryNeedsFurtherInputException
  18from common.lib.helpers import convert_to_int, UserInput
  19from common.lib.item_mapping import MappedItem, MissingMappedField
  20from common.config_manager import config
  21
  22from datetime import datetime
  23from telethon import TelegramClient
  24from telethon.errors.rpcerrorlist import UsernameInvalidError, TimeoutError, ChannelPrivateError, BadRequestError, \
  25    FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError
  26from telethon.tl.functions.channels import GetFullChannelRequest
  27from telethon.tl.functions.users import GetFullUserRequest
  28from telethon.tl.types import User, MessageEntityMention
  29
  30
  31
  32class SearchTelegram(Search):
  33    """
  34    Search Telegram via API
  35    """
  36    type = "telegram-search"  # job ID
  37    category = "Search"  # category
  38    title = "Telegram API search"  # title displayed in UI
  39    description = "Scrapes messages from open Telegram groups via its API."  # description displayed in UI
  40    extension = "ndjson"  # extension of result file, used internally and in UI
  41    is_local = False  # Whether this datasource is locally scraped
  42    is_static = False  # Whether this datasource is still updated
  43
  44    # cache
  45    details_cache = None
  46    failures_cache = None
  47    eventloop = None
  48    import_issues = 0
  49    end_if_rate_limited = 600  # break if Telegram requires wait time above number of seconds
  50
  51    max_workers = 1
  52    max_retries = 3
  53    flawless = 0
  54
  55    config = {
  56        "telegram-search.can_query_all_messages": {
  57            "type": UserInput.OPTION_TOGGLE,
  58            "help": "Remove message amount limit",
  59            "default": False,
  60            "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!"
  61        },
  62        "telegram-search.max_entities": {
  63            "type": UserInput.OPTION_TEXT,
  64            "help": "Max entities to query",
  65            "coerce_type": int,
  66            "min": 0,
  67            "default": 25,
  68            "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to "
  69                       "disable limit."
  70        },
  71        "telegram-search.max_crawl_depth": {
  72            "type": UserInput.OPTION_TEXT,
  73            "help": "Max crawl depth",
  74            "coerce_type": int,
  75            "min": 0,
  76            "default": 0,
  77            "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded "
  78                       "messages. Recommended to leave at 0 for most users since this can exponentially increase "
  79                       "dataset sizes."
  80        }
  81    }
  82
  83    @classmethod
  84    def get_options(cls, parent_dataset=None, user=None):
  85        """
  86        Get processor options
  87
  88        Just updates the description of the entities field based on the
  89        configured max entities.
  90
  91        :param DataSet parent_dataset:  An object representing the dataset that
  92          the processor would be run on
  93        :param User user:  Flask user the options will be displayed for, in
  94          case they are requested for display in the 4CAT web interface. This can
  95          be used to show some options only to privileges users.
  96        """
  97        max_entities = config.get("telegram-search.max_entities", 25, user=user)
  98        options = {
  99            "intro": {
 100                "type": UserInput.OPTION_INFO,
 101                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
 102                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
 103                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
 104                        "authentication for Telegram."
 105            },
 106            "api_id": {
 107                "type": UserInput.OPTION_TEXT,
 108                "help": "API ID",
 109                "cache": True,
 110            },
 111            "api_hash": {
 112                "type": UserInput.OPTION_TEXT,
 113                "help": "API Hash",
 114                "cache": True,
 115            },
 116            "api_phone": {
 117                "type": UserInput.OPTION_TEXT,
 118                "help": "Phone number",
 119                "cache": True,
 120                "default": "+xxxxxxxxxx"
 121            },
 122            "divider": {
 123                "type": UserInput.OPTION_DIVIDER
 124            },
 125            "query-intro": {
 126                "type": UserInput.OPTION_INFO,
 127                "help": "Separate with commas or line breaks."
 128            },
 129            "query": {
 130                "type": UserInput.OPTION_TEXT_LARGE,
 131                "help": "Entities to scrape",
 132                "tooltip": "Separate with commas or line breaks."
 133            },
 134            "max_posts": {
 135                "type": UserInput.OPTION_TEXT,
 136                "help": "Messages per group",
 137                "min": 1,
 138                "max": 50000,
 139                "default": 10
 140            },
 141            "daterange": {
 142                "type": UserInput.OPTION_DATERANGE,
 143                "help": "Date range"
 144            },
 145            "divider-2": {
 146                "type": UserInput.OPTION_DIVIDER
 147            },
 148            "info-sensitive": {
 149                "type": UserInput.OPTION_INFO,
 150                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
 151                        "there while data is fetched. After the dataset has been created your credentials will be "
 152                        "deleted from the server, unless you enable the option below. If you want to download images "
 153                        "attached to the messages in your collected data, you need to enable this option. Your "
 154                        "credentials will never be visible to other users and can be erased later via the result page."
 155            },
 156            "save-session": {
 157                "type": UserInput.OPTION_TOGGLE,
 158                "help": "Save session:",
 159                "default": False
 160            },
 161            "resolve-entities-intro": {
 162                "type": UserInput.OPTION_INFO,
 163                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
 164                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
 165                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
 166                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
 167                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
 168                        "data, so only enable this if you really need it!"
 169            },
 170            "resolve-entities": {
 171                "type": UserInput.OPTION_TOGGLE,
 172                "help": "Resolve references",
 173                "default": False,
 174            }
 175        }
 176
 177        if max_entities:
 178            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
 179                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
 180
 181        all_messages = config.get("telegram-search.can_query_all_messages", False, user=user)
 182        if all_messages:
 183            if "max" in options["max_posts"]:
 184                del options["max_posts"]["max"]
 185        else:
 186            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
 187                                             f"{options['max_posts']['max']:,} messages per entity.")
 188
 189        if config.get("telegram-search.max_crawl_depth", 0, user=user) > 0:
 190            options["crawl_intro"] = {
 191                "type": UserInput.OPTION_INFO,
 192                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
 193                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
 194                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
 195                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
 196                        "progress cannot be accurately tracked when you use this feature."
 197            }
 198            options["crawl-depth"] = {
 199                "type": UserInput.OPTION_TEXT,
 200                "coerce_type": int,
 201                "min": 0,
 202                "max": config.get("telegram-search.max_crawl_depth"),
 203                "default": 0,
 204                "help": "Crawl depth",
 205                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
 206                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
 207                           "starting entities."
 208            }
 209            options["crawl-threshold"] = {
 210                "type": UserInput.OPTION_TEXT,
 211                "coerce_type": int,
 212                "min": 0,
 213                "default": 5,
 214                "help": "Crawl threshold",
 215                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
 216                           "references discovered below the max crawl depth are taken into account."
 217            }
 218            options["crawl-via-links"] = {
 219                "type": UserInput.OPTION_TOGGLE,
 220                "default": False,
 221                "help": "Extract new groups from links",
 222                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
 223                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
 224                           "links that would otherwise remain undetected."
 225            }
 226
 227        return options
 228
 229
 230    def get_items(self, query):
 231        """
 232        Execute a query; get messages for given parameters
 233
 234        Basically a wrapper around execute_queries() to call it with asyncio.
 235
 236        :param dict query:  Query parameters, as part of the DataSet object
 237        :return list:  Posts, sorted by thread and post ID, in ascending order
 238        """
 239        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
 240            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
 241                                       "creating it again from scratch.", is_final=True)
 242            return None
 243
 244        self.details_cache = {}
 245        self.failures_cache = set()
 246        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
 247        results = asyncio.run(self.execute_queries())
 248
 249        if not query.get("save-session"):
 250            self.dataset.delete_parameter("api_hash", instant=True)
 251            self.dataset.delete_parameter("api_phone", instant=True)
 252            self.dataset.delete_parameter("api_id", instant=True)
 253
 254        if self.flawless:
 255            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
 256                                       "been private). View the log file for details.", is_final=True)
 257
 258        return results
 259
 260    async def execute_queries(self):
 261        """
 262        Get messages for queries
 263
 264        This is basically what would be done in get_items(), except due to
 265        Telethon's architecture this needs to be called in an async method,
 266        which is this one.
 267
 268        :return list:  Collected messages
 269        """
 270        # session file has been created earlier, and we can re-use it here in
 271        # order to avoid having to re-enter the security code
 272        query = self.parameters
 273
 274        session_id = SearchTelegram.create_session_id(query["api_phone"], query["api_id"], query["api_hash"])
 275        self.dataset.log(f'Telegram session id: {session_id}')
 276        session_path = Path(config.get("PATH_ROOT")).joinpath(config.get("PATH_SESSIONS"), session_id + ".session")
 277
 278        client = None
 279
 280        try:
 281            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
 282                                    loop=self.eventloop)
 283            await client.start(phone=SearchTelegram.cancel_start)
 284        except RuntimeError:
 285            # session is no longer useable, delete file so user will be asked
 286            # for security code again. The RuntimeError is raised by
 287            # `cancel_start()`
 288            self.dataset.update_status(
 289                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
 290                is_final=True)
 291
 292            if client and hasattr(client, "disconnect"):
 293                await client.disconnect()
 294
 295            if session_path.exists():
 296                session_path.unlink()
 297
 298            return []
 299        except Exception as e:
 300            # not sure what exception specifically is triggered here, but it
 301            # always means the connection failed
 302            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
 303            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
 304            if client and hasattr(client, "disconnect"):
 305                await client.disconnect()
 306            return []
 307
 308        # ready our parameters
 309        parameters = self.dataset.get_parameters()
 310        queries = [query.strip() for query in parameters.get("query", "").split(",")]
 311        max_items = convert_to_int(parameters.get("items", 10), 10)
 312
 313        # Telethon requires the offset date to be a datetime date
 314        max_date = parameters.get("max_date")
 315        if max_date:
 316            try:
 317                max_date = datetime.fromtimestamp(int(max_date))
 318            except ValueError:
 319                max_date = None
 320
 321        # min_date can remain an integer
 322        min_date = parameters.get("min_date")
 323        if min_date:
 324            try:
 325                min_date = int(min_date)
 326            except ValueError:
 327                min_date = None
 328
 329        posts = []
 330        try:
 331            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
 332                posts.append(post)
 333            return posts
 334        except ProcessorInterruptedException as e:
 335            raise e
 336        except Exception as e:
 337            # catch-all so we can disconnect properly
 338            # ...should we?
 339            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
 340            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
 341            # May as well return what was captured, yes?
 342            return posts
 343        finally:
 344            await client.disconnect()
 345
 346    async def gather_posts(self, client, queries, max_items, min_date, max_date):
 347        """
 348        Gather messages for each entity for which messages are requested
 349
 350        :param TelegramClient client:  Telegram Client
 351        :param list queries:  List of entities to query (as string)
 352        :param int max_items:  Messages to scrape per entity
 353        :param int min_date:  Datetime date to get posts after
 354        :param int max_date:  Datetime date to get posts before
 355        :return list:  List of messages, each message a dictionary.
 356        """
 357        resolve_refs = self.parameters.get("resolve-entities")
 358
 359        # Adding flag to stop; using for rate limits
 360        no_additional_queries = False
 361
 362        # This is used for the 'crawl' feature so we know at which depth a
 363        # given entity was discovered
 364        depth_map = {
 365            entity: 0 for entity in queries
 366        }
 367
 368        crawl_max_depth = self.parameters.get("crawl-depth", 0)
 369        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
 370        crawl_via_links = self.parameters.get("crawl-via-links", False)
 371
 372        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
 373        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
 374
 375        # this keeps track of how often an entity not in the original query
 376        # has been mentioned. When crawling is enabled and this exceeds the
 377        # given threshold, the entity is added to the query
 378        crawl_references = {}
 379        full_query = set(queries)
 380        num_queries = len(queries)
 381
 382        # we may not always know the 'entity username' for an entity ID, so
 383        # keep a reference map as we go
 384        entity_id_map = {}
 385        query_id_map= {}
 386
 387        # Collect queries
 388        # Use while instead of for so we can change queries during iteration
 389        # this is needed for the 'crawl' feature which can discover new
 390        # entities during crawl
 391        processed = 0
 392        total_messages = 0
 393        while queries:
 394            query = queries.pop(0)
 395
 396            delay = 10
 397            retries = 0
 398            processed += 1
 399            self.dataset.update_progress(processed / num_queries)
 400
 401            if no_additional_queries:
 402                # Note that we are not completing this query
 403                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
 404                continue
 405
 406            while True:
 407                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
 408                entity_posts = 0
 409                discovered = 0
 410                try:
 411                    async for message in client.iter_messages(entity=query, offset_date=max_date):
 412                        entity_posts += 1
 413                        total_messages += 1
 414                        if self.interrupted:
 415                            raise ProcessorInterruptedException(
 416                                "Interrupted while fetching message data from the Telegram API")
 417
 418                        if entity_posts % 100 == 0:
 419                            self.dataset.update_status(
 420                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
 421
 422                        if message.action is not None:
 423                            # e.g. someone joins the channel - not an actual message
 424                            continue
 425
 426                        # todo: possibly enrich object with e.g. the name of
 427                        # the channel a message was forwarded from (but that
 428                        # needs extra API requests...)
 429                        serialized_message = SearchTelegram.serialize_obj(message)
 430                        if "_chat" in serialized_message:
 431                            # Add query ID to check if queries have been crawled previously
 432                            full_query.add(serialized_message["_chat"]["id"])
 433                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
 434                                # once we know what a channel ID resolves to, use the username instead so it is easier to
 435                                # understand for the user
 436                                entity_id_map[query] = serialized_message["_chat"]["username"]
 437                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
 438
 439                        if resolve_refs:
 440                            serialized_message = await self.resolve_groups(client, serialized_message)
 441
 442                        # Stop if we're below the min date
 443                        if min_date and serialized_message.get("date") < min_date:
 444                            break
 445
 446                        # if crawling is enabled, see if we found something to add to the query
 447                        linked_entities = set()
 448                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
 449                            message_fwd = serialized_message.get("fwd_from")
 450                            fwd_from = None
 451                            fwd_source_type = None
 452                            if message_fwd and message_fwd.get("from_id"):
 453                                if message_fwd["from_id"].get("_type") == "PeerChannel":
 454                                    # Legacy(?) data structure (pre 2024/7/22)
 455                                    # even if we haven't resolved the ID, we can feed the numeric ID
 456                                    # to Telethon! this is nice because it means we don't have to
 457                                    # resolve entities to crawl iteratively
 458                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
 459                                    fwd_source_type = "channel"
 460                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
 461                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
 462                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
 463                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
 464                                    fwd_source_type = "channel"
 465                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
 466                                    # forwards can also come from users
 467                                    # these can never be followed, so don't add these to the crawl, but do document them
 468                                    fwd_source_type = "user"
 469                                else:
 470                                    print(json.dumps(message_fwd))
 471                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
 472                                    fwd_source_type = "unknown"
 473
 474                                if fwd_from:
 475                                    linked_entities.add(fwd_from)
 476
 477
 478                            if crawl_via_links:
 479                                # t.me links
 480                                all_links = ural.urls_from_text(serialized_message["message"])
 481                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 482                                for link in all_links:
 483                                    if link.startswith("+"):
 484                                        # invite links
 485                                        continue
 486
 487                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 488                                    linked_entities.add(entity_name)
 489
 490                                # @references
 491                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
 492                                for reference in references:
 493                                    if reference.startswith("@"):
 494                                        reference = reference[1:]
 495
 496                                    reference = reference.split("/")[0]
 497
 498                                    linked_entities.add(reference)
 499
 500                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
 501                            for link in linked_entities:
 502                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
 503                                    # new entity discovered!
 504                                    # might be discovered (before collection) multiple times, so retain lowest depth
 505                                    # print(f"Potentially crawling {link}")
 506                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
 507                                    if link not in crawl_references:
 508                                        crawl_references[link] = 0
 509                                    crawl_references[link] += 1
 510
 511                                    # Add to queries if it has been referenced enough times
 512                                    if crawl_references[link] >= crawl_msg_threshold:
 513                                        queries.append(link)
 514                                        full_query.add(link)
 515                                        num_queries += 1
 516                                        discovered += 1
 517                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
 518
 519
 520
 521                        serialized_message["4CAT_metadata"] = {
 522                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
 523                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
 524                            "query_depth": depth_map.get(query, 0)
 525                        }
 526                        yield serialized_message
 527
 528                        if entity_posts >= max_items:
 529                            break
 530
 531                except ChannelPrivateError:
 532                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
 533                    self.flawless += 1
 534
 535                except (UsernameInvalidError,):
 536                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
 537                    self.flawless += 1
 538
 539                except FloodWaitError as e:
 540                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
 541                    if e.seconds < self.end_if_rate_limited:
 542                        time.sleep(e.seconds)
 543                        continue
 544                    else:
 545                        self.flawless += 1
 546                        no_additional_queries = True
 547                        self.dataset.update_status(
 548                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
 549                        break
 550
 551                except BadRequestError as e:
 552                    self.dataset.update_status(
 553                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 554                    self.flawless += 1
 555
 556                except ValueError as e:
 557                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 558                    self.flawless += 1
 559
 560                except ChannelPrivateError as e:
 561                    self.dataset.update_status(
 562                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
 563                    break
 564
 565                except TimeoutError:
 566                    if retries < 3:
 567                        self.dataset.update_status(
 568                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
 569                        self.flawless += 1
 570                        break
 571
 572                    self.dataset.update_status(
 573                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
 574                    time.sleep(delay)
 575                    delay *= 2
 576                    continue
 577
 578                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
 579                break
 580
 581    async def resolve_groups(self, client, message):
 582        """
 583        Recursively resolve references to groups and users
 584
 585        :param client:  Telethon client instance
 586        :param dict message:  Message, as already mapped by serialize_obj
 587        :return:  Resolved dictionary
 588        """
 589        resolved_message = message.copy()
 590        for key, value in message.items():
 591            try:
 592                if type(value) is not dict:
 593                    # if it's not a dict, we never have to resolve it, as it
 594                    # does not represent an entity
 595                    continue
 596
 597                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
 598                    # forwarded from a channel!
 599                    if value["channel_id"] in self.failures_cache:
 600                        continue
 601
 602                    if value["channel_id"] not in self.details_cache:
 603                        channel = await client(GetFullChannelRequest(value["channel_id"]))
 604                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
 605
 606                    resolved_message[key] = self.details_cache[value["channel_id"]]
 607                    resolved_message[key]["channel_id"] = value["channel_id"]
 608
 609                elif "_type" in value and value["_type"] == "PeerUser":
 610                    # a user!
 611                    if value["user_id"] in self.failures_cache:
 612                        continue
 613
 614                    if value["user_id"] not in self.details_cache:
 615                        user = await client(GetFullUserRequest(value["user_id"]))
 616                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
 617
 618                    resolved_message[key] = self.details_cache[value["user_id"]]
 619                    resolved_message[key]["user_id"] = value["user_id"]
 620                else:
 621                    resolved_message[key] = await self.resolve_groups(client, value)
 622
 623            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
 624                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
 625                if type(e) in (ChannelPrivateError, UsernameInvalidError):
 626                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
 627                else:
 628                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
 629
 630        return resolved_message
 631
 632    @staticmethod
 633    def cancel_start():
 634        """
 635        Replace interactive phone number input in Telethon
 636
 637        By default, if Telethon cannot use the given session file to
 638        authenticate, it will interactively prompt the user for a phone
 639        number on the command line. That is not useful here, so instead
 640        raise a RuntimeError. This will be caught below and the user will
 641        be told they need to re-authenticate via 4CAT.
 642        """
 643        raise RuntimeError("Connection cancelled")
 644
 645    @staticmethod
 646    def map_item(message):
 647        """
 648        Convert Message object to 4CAT-ready data object
 649
 650        :param Message message:  Message to parse
 651        :return dict:  4CAT-compatible item object
 652        """
 653        if message["_chat"]["username"]:
 654            # chats can apparently not have usernames???
 655            # truly telegram objects are way too lenient for their own good
 656            thread = message["_chat"]["username"]
 657        elif message["_chat"]["title"]:
 658            thread = re.sub(r"\s", "", message["_chat"]["title"])
 659        else:
 660            # just give up
 661            thread = "unknown"
 662
 663        # determine username
 664        # API responses only include the user *ID*, not the username, and to
 665        # complicate things further not everyone is a user and not everyone
 666        # has a username. If no username is available, try the first and
 667        # last name someone has supplied
 668        fullname = ""
 669        username = ""
 670        user_id = message["_sender"]["id"] if message.get("_sender") else ""
 671        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
 672
 673        if message.get("_sender") and message["_sender"].get("username"):
 674            username = message["_sender"]["username"]
 675
 676        if message.get("_sender") and message["_sender"].get("first_name"):
 677            fullname += message["_sender"]["first_name"]
 678
 679        if message.get("_sender") and message["_sender"].get("last_name"):
 680            fullname += " " + message["_sender"]["last_name"]
 681
 682        fullname = fullname.strip()
 683
 684        # determine media type
 685        # these store some extra information of the attachment in
 686        # attachment_data. Since the final result will be serialised as a csv
 687        # file, we can only store text content. As such some media data is
 688        # serialised as JSON.
 689        attachment_type = SearchTelegram.get_media_type(message["media"])
 690        attachment_filename = ""
 691
 692        if attachment_type == "contact":
 693            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
 694            if message["media"].get('contact', False):
 695                # Old datastructure
 696                attachment = message["media"]["contact"]
 697            elif all([property in message["media"].keys() for property in contact_data]):
 698                # New datastructure 2022/7/25
 699                attachment = message["media"]
 700            else:
 701                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
 702            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
 703
 704        elif attachment_type == "document":
 705            # videos, etc
 706            # This could add a separate routine for videos to make them a
 707            # separate type, which could then be scraped later, etc
 708            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
 709            if attachment_type == "video":
 710                attachment = message["media"]["document"]
 711                attachment_data = json.dumps({
 712                    "id": attachment["id"],
 713                    "dc_id": attachment["dc_id"],
 714                    "file_reference": attachment["file_reference"],
 715                })
 716            else:
 717                attachment_data = ""
 718
 719        # elif attachment_type in ("geo", "geo_live"):
 720        # untested whether geo_live is significantly different from geo
 721        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
 722
 723        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
 724            # we don't actually store any metadata about the photo, since very
 725            # little of the metadata attached is of interest. Instead, the
 726            # actual photos may be downloaded via a processor that is run on the
 727            # search results
 728            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
 729            attachment_data = json.dumps({
 730                "id": attachment["id"],
 731                "dc_id": attachment["dc_id"],
 732                "file_reference": attachment["file_reference"],
 733            })
 734            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
 735
 736        elif attachment_type == "poll":
 737            # unfortunately poll results are only available when someone has
 738            # actually voted on the poll - that will usually not be the case,
 739            # so we store -1 as the vote count
 740            attachment = message["media"]
 741            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
 742            attachment_data = json.dumps({
 743                "question": attachment["poll"]["question"],
 744                "voters": attachment["results"]["total_voters"],
 745                "answers": [{
 746                    "answer": options[answer["option"]],
 747                    "votes": answer["voters"]
 748                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
 749                    "answer": options[option],
 750                    "votes": -1
 751                } for option in options]
 752            })
 753
 754        else:
 755            attachment_data = ""
 756
 757        # was the message forwarded from somewhere and if so when?
 758        forwarded_timestamp = ""
 759        forwarded_name = ""
 760        forwarded_id = ""
 761        forwarded_username = ""
 762        if message.get("fwd_from") and "from_id" in message["fwd_from"] and not (
 763                type(message["fwd_from"]["from_id"]) is int):
 764            # forward information is spread out over a lot of places
 765            # we can identify, in order of usefulness: username, full name,
 766            # and ID. But not all of these are always available, and not
 767            # always in the same place either
 768            forwarded_timestamp = int(message["fwd_from"]["date"])
 769            from_data = message["fwd_from"]["from_id"]
 770
 771            if from_data:
 772                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
 773
 774            if message["fwd_from"].get("from_name"):
 775                forwarded_name = message["fwd_from"].get("from_name")
 776
 777            if from_data and from_data.get("from_name"):
 778                forwarded_name = message["fwd_from"]["from_name"]
 779
 780            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
 781                from_data["user"] = from_data["users"][0]
 782
 783            if from_data and ("user" in from_data or "chats" in from_data):
 784                # 'resolve entities' was enabled for this dataset
 785                if "user" in from_data:
 786                    if from_data["user"].get("username"):
 787                        forwarded_username = from_data["user"]["username"]
 788
 789                    if from_data["user"].get("first_name"):
 790                        forwarded_name = from_data["user"]["first_name"]
 791                    if message["fwd_from"].get("last_name"):
 792                        forwarded_name += "  " + from_data["user"]["last_name"]
 793
 794                    forwarded_name = forwarded_name.strip()
 795
 796                elif "chats" in from_data:
 797                    channel_id = from_data.get("channel_id")
 798                    for chat in from_data["chats"]:
 799                        if chat["id"] == channel_id or channel_id is None:
 800                            forwarded_username = chat["username"]
 801
 802            elif message.get("_forward") and message["_forward"].get("_chat"):
 803                if message["_forward"]["_chat"].get("username"):
 804                    forwarded_username = message["_forward"]["_chat"]["username"]
 805
 806                if message["_forward"]["_chat"].get("title"):
 807                    forwarded_name = message["_forward"]["_chat"]["title"]
 808
 809        link_title = ""
 810        link_attached = ""
 811        link_description = ""
 812        reactions = ""
 813
 814        if message.get("media") and message["media"].get("webpage"):
 815            link_title = message["media"]["webpage"].get("title")
 816            link_attached = message["media"]["webpage"].get("url")
 817            link_description = message["media"]["webpage"].get("description")
 818
 819        if message.get("reactions") and message["reactions"].get("results"):
 820            for reaction in message["reactions"]["results"]:
 821                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
 822                    # Updated to support new reaction datastructure
 823                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
 824                elif type(reaction["reaction"]) is str and "count" in reaction:
 825                    reactions += reaction["reaction"] * reaction["count"]
 826                else:
 827                    # Failsafe; can be updated to support formatting of new datastructures in the future
 828                    reactions += f"{reaction}, "
 829
 830        # t.me links
 831        linked_entities = set()
 832        all_links = ural.urls_from_text(message["message"])
 833        all_links = [link.split("t.me/")[1] for link in all_links if
 834                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 835
 836        for link in all_links:
 837            if link.startswith("+"):
 838                # invite links
 839                continue
 840
 841            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 842            linked_entities.add(entity_name)
 843
 844        # @references
 845        # in execute_queries we use MessageEntityMention to get these
 846        # however, after serializing these objects we only have the offsets of
 847        # the mentioned username, and telegram does weird unicode things to its
 848        # offsets meaning we can't just substring the message. So use a regex
 849        # as a 'good enough' solution
 850        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
 851
 852        # make this case-insensitive since people may use different casing in
 853        # messages than the 'official' username for example
 854        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
 855        all_ci_connections = set()
 856        seen = set()
 857        for connection in all_connections:
 858            if connection.lower() not in seen:
 859                all_ci_connections.add(connection)
 860                seen.add(connection.lower())
 861
 862        return MappedItem({
 863            "id": f"{message['_chat']['username']}-{message['id']}",
 864            "thread_id": thread,
 865            "chat": message["_chat"]["username"],
 866            "author": user_id,
 867            "author_username": username,
 868            "author_name": fullname,
 869            "author_is_bot": "yes" if user_is_bot else "no",
 870            "body": message["message"],
 871            "reply_to": message.get("reply_to_msg_id", ""),
 872            "views": message["views"] if message["views"] else "",
 873            # "forwards": message.get("forwards", MissingMappedField(0)),
 874            "reactions": reactions,
 875            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
 876            "unix_timestamp": int(message["date"]),
 877            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
 878                "edit_date"] else "",
 879            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
 880            "author_forwarded_from_name": forwarded_name,
 881            "author_forwarded_from_username": forwarded_username,
 882            "author_forwarded_from_id": forwarded_id,
 883            "entities_linked": ",".join(linked_entities),
 884            "entities_mentioned": ",".join(all_mentions),
 885            "all_connections": ",".join(all_ci_connections),
 886            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
 887                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
 888            "unix_timestamp_forwarded_from": forwarded_timestamp,
 889            "link_title": link_title,
 890            "link_description": link_description,
 891            "link_attached": link_attached,
 892            "attachment_type": attachment_type,
 893            "attachment_data": attachment_data,
 894            "attachment_filename": attachment_filename
 895        })
 896
 897    @staticmethod
 898    def get_media_type(media):
 899        """
 900        Get media type for a Telegram attachment
 901
 902        :param media:  Media object
 903        :return str:  Textual identifier of the media type
 904        """
 905        try:
 906            return {
 907                "NoneType": "",
 908                "MessageMediaContact": "contact",
 909                "MessageMediaDocument": "document",
 910                "MessageMediaEmpty": "",
 911                "MessageMediaGame": "game",
 912                "MessageMediaGeo": "geo",
 913                "MessageMediaGeoLive": "geo_live",
 914                "MessageMediaInvoice": "invoice",
 915                "MessageMediaPhoto": "photo",
 916                "MessageMediaPoll": "poll",
 917                "MessageMediaUnsupported": "unsupported",
 918                "MessageMediaVenue": "venue",
 919                "MessageMediaWebPage": "url"
 920            }[media.get("_type", None)]
 921        except (AttributeError, KeyError):
 922            return ""
 923
 924    @staticmethod
 925    def serialize_obj(input_obj):
 926        """
 927        Serialize an object as a dictionary
 928
 929        Telethon message objects are not serializable by themselves, but most
 930        relevant attributes are simply struct classes. This function replaces
 931        those that are not with placeholders and then returns a dictionary that
 932        can be serialized as JSON.
 933
 934        :param obj:  Object to serialize
 935        :return:  Serialized object
 936        """
 937        scalars = (int, str, float, list, tuple, set, bool)
 938
 939        if type(input_obj) in scalars or input_obj is None:
 940            return input_obj
 941
 942        if type(input_obj) is not dict:
 943            obj = input_obj.__dict__
 944        else:
 945            obj = input_obj.copy()
 946
 947        mapped_obj = {}
 948        for item, value in obj.items():
 949            if type(value) is datetime:
 950                mapped_obj[item] = value.timestamp()
 951            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 952                mapped_obj[item] = SearchTelegram.serialize_obj(value)
 953            elif type(value) is list:
 954                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
 955            elif type(value) is bytes:
 956                mapped_obj[item] = value.hex()
 957            elif type(value) not in scalars and value is not None:
 958                # type we can't make sense of here
 959                continue
 960            else:
 961                mapped_obj[item] = value
 962
 963        # Add the _type if the original object was a telethon type
 964        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 965            mapped_obj["_type"] = type(input_obj).__name__
 966        return mapped_obj
 967
 968    @staticmethod
 969    def validate_query(query, request, user):
 970        """
 971        Validate Telegram query
 972
 973        :param dict query:  Query parameters, from client-side.
 974        :param request:  Flask request
 975        :param User user:  User object of user who has submitted the query
 976        :return dict:  Safe query parameters
 977        """
 978        # no query 4 u
 979        if not query.get("query", "").strip():
 980            raise QueryParametersException("You must provide a search query.")
 981
 982        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 983            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 984
 985        all_posts = config.get("telegram-search.can_query_all_messages", False, user=user)
 986        max_entities = config.get("telegram-search.max_entities", 25, user=user)
 987
 988        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options()["max_posts"]["max"])
 989
 990        # reformat queries to be a comma-separated list with no wrapping
 991        # whitespace
 992        whitespace = re.compile(r"\s+")
 993        items = whitespace.sub("", query.get("query").replace("\n", ","))
 994        if max_entities > 0 and len(items.split(",")) > max_entities:
 995            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
 996
 997        sanitized_items = []
 998        # handle telegram URLs
 999        for item in items.split(","):
1000            if not item.strip():
1001                continue
1002            item = re.sub(r"^https?://t\.me/", "", item)
1003            item = re.sub(r"^/?s/", "", item)
1004            item = re.sub(r"[/]*$", "", item)
1005            sanitized_items.append(item)
1006
1007        # the dates need to make sense as a range to search within
1008        min_date, max_date = query.get("daterange")
1009
1010        # now check if there is an active API session
1011        if not user or not user.is_authenticated or user.is_anonymous:
1012            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1013                                           "accounts.")
1014
1015        # check for the information we need
1016        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1017                                                      query.get("api_hash"))
1018        user.set_value("telegram.session", session_id)
1019        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session")
1020
1021        client = None
1022
1023        # API ID is always a number, if it's not, we can immediately fail
1024        try:
1025            api_id = int(query.get("api_id"))
1026        except ValueError:
1027            raise QueryParametersException("Invalid API ID.")
1028
1029        # maybe we've entered a code already and submitted it with the request
1030        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1031            code_callback = lambda: request.form.get("option-security-code")
1032            max_attempts = 1
1033        else:
1034            code_callback = lambda: -1
1035            # max_attempts = 0 because authing will always fail: we can't wait for
1036            # the code to be entered interactively, we'll need to do a new request
1037            # but we can't just immediately return, we still need to call start()
1038            # to get telegram to send us a code
1039            max_attempts = 0
1040
1041        # now try authenticating
1042        needs_code = False
1043        try:
1044            loop = asyncio.new_event_loop()
1045            asyncio.set_event_loop(loop)
1046            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1047
1048            try:
1049                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1050
1051            except ValueError as e:
1052                # this happens if 2FA is required
1053                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1054                                               f"does not support this authentication mode for Telegram. ({e})")
1055            except RuntimeError as e:
1056                # A code was sent to the given phone number
1057                needs_code = True
1058        except FloodWaitError as e:
1059            # uh oh, we got rate-limited
1060            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1061                                           str(e).split("(")[0] + ".")
1062        except ApiIdInvalidError as e:
1063            # wrong credentials
1064            raise QueryParametersException("Your API credentials are invalid.")
1065        except PhoneNumberInvalidError as e:
1066            # wrong phone number
1067            raise QueryParametersException(
1068                "The phone number provided is not a valid phone number for these credentials.")
1069        except RPCError as e:
1070            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1071            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1072                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1073        except Exception as e:
1074            # ?
1075            raise QueryParametersException(
1076                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1077        finally:
1078            if client:
1079                client.disconnect()
1080
1081        if needs_code:
1082            raise QueryNeedsFurtherInputException(config={
1083                "code-info": {
1084                    "type": UserInput.OPTION_INFO,
1085                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1086                },
1087                "security-code": {
1088                    "type": UserInput.OPTION_TEXT,
1089                    "help": "Security code",
1090                    "sensitive": True
1091                }})
1092
1093        # simple!
1094        return {
1095            "items": num_items,
1096            "query": ",".join(sanitized_items),
1097            "api_id": query.get("api_id"),
1098            "api_hash": query.get("api_hash"),
1099            "api_phone": query.get("api_phone"),
1100            "save-session": query.get("save-session"),
1101            "resolve-entities": query.get("resolve-entities"),
1102            "min_date": min_date,
1103            "max_date": max_date,
1104            "crawl-depth": query.get("crawl-depth"),
1105            "crawl-threshold": query.get("crawl-threshold"),
1106            "crawl-via-links": query.get("crawl-via-links")
1107        }
1108
1109    @staticmethod
1110    def create_session_id(api_phone, api_id, api_hash):
1111        """
1112        Generate a filename for the session file
1113
1114        This is a combination of phone number and API credentials, but hashed
1115        so that one cannot actually derive someone's phone number from it.
1116
1117        :param str api_phone:  Phone number for API ID
1118        :param int api_id:  Telegram API ID
1119        :param str api_hash:  Telegram API Hash
1120        :return str: A hash value derived from the input
1121        """
1122        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1123        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
class SearchTelegram(backend.lib.search.Search):
  33class SearchTelegram(Search):
  34    """
  35    Search Telegram via API
  36    """
  37    type = "telegram-search"  # job ID
  38    category = "Search"  # category
  39    title = "Telegram API search"  # title displayed in UI
  40    description = "Scrapes messages from open Telegram groups via its API."  # description displayed in UI
  41    extension = "ndjson"  # extension of result file, used internally and in UI
  42    is_local = False  # Whether this datasource is locally scraped
  43    is_static = False  # Whether this datasource is still updated
  44
  45    # cache
  46    details_cache = None
  47    failures_cache = None
  48    eventloop = None
  49    import_issues = 0
  50    end_if_rate_limited = 600  # break if Telegram requires wait time above number of seconds
  51
  52    max_workers = 1
  53    max_retries = 3
  54    flawless = 0
  55
  56    config = {
  57        "telegram-search.can_query_all_messages": {
  58            "type": UserInput.OPTION_TOGGLE,
  59            "help": "Remove message amount limit",
  60            "default": False,
  61            "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!"
  62        },
  63        "telegram-search.max_entities": {
  64            "type": UserInput.OPTION_TEXT,
  65            "help": "Max entities to query",
  66            "coerce_type": int,
  67            "min": 0,
  68            "default": 25,
  69            "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to "
  70                       "disable limit."
  71        },
  72        "telegram-search.max_crawl_depth": {
  73            "type": UserInput.OPTION_TEXT,
  74            "help": "Max crawl depth",
  75            "coerce_type": int,
  76            "min": 0,
  77            "default": 0,
  78            "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded "
  79                       "messages. Recommended to leave at 0 for most users since this can exponentially increase "
  80                       "dataset sizes."
  81        }
  82    }
  83
  84    @classmethod
  85    def get_options(cls, parent_dataset=None, user=None):
  86        """
  87        Get processor options
  88
  89        Just updates the description of the entities field based on the
  90        configured max entities.
  91
  92        :param DataSet parent_dataset:  An object representing the dataset that
  93          the processor would be run on
  94        :param User user:  Flask user the options will be displayed for, in
  95          case they are requested for display in the 4CAT web interface. This can
  96          be used to show some options only to privileges users.
  97        """
  98        max_entities = config.get("telegram-search.max_entities", 25, user=user)
  99        options = {
 100            "intro": {
 101                "type": UserInput.OPTION_INFO,
 102                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
 103                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
 104                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
 105                        "authentication for Telegram."
 106            },
 107            "api_id": {
 108                "type": UserInput.OPTION_TEXT,
 109                "help": "API ID",
 110                "cache": True,
 111            },
 112            "api_hash": {
 113                "type": UserInput.OPTION_TEXT,
 114                "help": "API Hash",
 115                "cache": True,
 116            },
 117            "api_phone": {
 118                "type": UserInput.OPTION_TEXT,
 119                "help": "Phone number",
 120                "cache": True,
 121                "default": "+xxxxxxxxxx"
 122            },
 123            "divider": {
 124                "type": UserInput.OPTION_DIVIDER
 125            },
 126            "query-intro": {
 127                "type": UserInput.OPTION_INFO,
 128                "help": "Separate with commas or line breaks."
 129            },
 130            "query": {
 131                "type": UserInput.OPTION_TEXT_LARGE,
 132                "help": "Entities to scrape",
 133                "tooltip": "Separate with commas or line breaks."
 134            },
 135            "max_posts": {
 136                "type": UserInput.OPTION_TEXT,
 137                "help": "Messages per group",
 138                "min": 1,
 139                "max": 50000,
 140                "default": 10
 141            },
 142            "daterange": {
 143                "type": UserInput.OPTION_DATERANGE,
 144                "help": "Date range"
 145            },
 146            "divider-2": {
 147                "type": UserInput.OPTION_DIVIDER
 148            },
 149            "info-sensitive": {
 150                "type": UserInput.OPTION_INFO,
 151                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
 152                        "there while data is fetched. After the dataset has been created your credentials will be "
 153                        "deleted from the server, unless you enable the option below. If you want to download images "
 154                        "attached to the messages in your collected data, you need to enable this option. Your "
 155                        "credentials will never be visible to other users and can be erased later via the result page."
 156            },
 157            "save-session": {
 158                "type": UserInput.OPTION_TOGGLE,
 159                "help": "Save session:",
 160                "default": False
 161            },
 162            "resolve-entities-intro": {
 163                "type": UserInput.OPTION_INFO,
 164                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
 165                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
 166                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
 167                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
 168                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
 169                        "data, so only enable this if you really need it!"
 170            },
 171            "resolve-entities": {
 172                "type": UserInput.OPTION_TOGGLE,
 173                "help": "Resolve references",
 174                "default": False,
 175            }
 176        }
 177
 178        if max_entities:
 179            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
 180                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
 181
 182        all_messages = config.get("telegram-search.can_query_all_messages", False, user=user)
 183        if all_messages:
 184            if "max" in options["max_posts"]:
 185                del options["max_posts"]["max"]
 186        else:
 187            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
 188                                             f"{options['max_posts']['max']:,} messages per entity.")
 189
 190        if config.get("telegram-search.max_crawl_depth", 0, user=user) > 0:
 191            options["crawl_intro"] = {
 192                "type": UserInput.OPTION_INFO,
 193                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
 194                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
 195                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
 196                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
 197                        "progress cannot be accurately tracked when you use this feature."
 198            }
 199            options["crawl-depth"] = {
 200                "type": UserInput.OPTION_TEXT,
 201                "coerce_type": int,
 202                "min": 0,
 203                "max": config.get("telegram-search.max_crawl_depth"),
 204                "default": 0,
 205                "help": "Crawl depth",
 206                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
 207                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
 208                           "starting entities."
 209            }
 210            options["crawl-threshold"] = {
 211                "type": UserInput.OPTION_TEXT,
 212                "coerce_type": int,
 213                "min": 0,
 214                "default": 5,
 215                "help": "Crawl threshold",
 216                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
 217                           "references discovered below the max crawl depth are taken into account."
 218            }
 219            options["crawl-via-links"] = {
 220                "type": UserInput.OPTION_TOGGLE,
 221                "default": False,
 222                "help": "Extract new groups from links",
 223                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
 224                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
 225                           "links that would otherwise remain undetected."
 226            }
 227
 228        return options
 229
 230
 231    def get_items(self, query):
 232        """
 233        Execute a query; get messages for given parameters
 234
 235        Basically a wrapper around execute_queries() to call it with asyncio.
 236
 237        :param dict query:  Query parameters, as part of the DataSet object
 238        :return list:  Posts, sorted by thread and post ID, in ascending order
 239        """
 240        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
 241            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
 242                                       "creating it again from scratch.", is_final=True)
 243            return None
 244
 245        self.details_cache = {}
 246        self.failures_cache = set()
 247        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
 248        results = asyncio.run(self.execute_queries())
 249
 250        if not query.get("save-session"):
 251            self.dataset.delete_parameter("api_hash", instant=True)
 252            self.dataset.delete_parameter("api_phone", instant=True)
 253            self.dataset.delete_parameter("api_id", instant=True)
 254
 255        if self.flawless:
 256            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
 257                                       "been private). View the log file for details.", is_final=True)
 258
 259        return results
 260
 261    async def execute_queries(self):
 262        """
 263        Get messages for queries
 264
 265        This is basically what would be done in get_items(), except due to
 266        Telethon's architecture this needs to be called in an async method,
 267        which is this one.
 268
 269        :return list:  Collected messages
 270        """
 271        # session file has been created earlier, and we can re-use it here in
 272        # order to avoid having to re-enter the security code
 273        query = self.parameters
 274
 275        session_id = SearchTelegram.create_session_id(query["api_phone"], query["api_id"], query["api_hash"])
 276        self.dataset.log(f'Telegram session id: {session_id}')
 277        session_path = Path(config.get("PATH_ROOT")).joinpath(config.get("PATH_SESSIONS"), session_id + ".session")
 278
 279        client = None
 280
 281        try:
 282            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
 283                                    loop=self.eventloop)
 284            await client.start(phone=SearchTelegram.cancel_start)
 285        except RuntimeError:
 286            # session is no longer useable, delete file so user will be asked
 287            # for security code again. The RuntimeError is raised by
 288            # `cancel_start()`
 289            self.dataset.update_status(
 290                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
 291                is_final=True)
 292
 293            if client and hasattr(client, "disconnect"):
 294                await client.disconnect()
 295
 296            if session_path.exists():
 297                session_path.unlink()
 298
 299            return []
 300        except Exception as e:
 301            # not sure what exception specifically is triggered here, but it
 302            # always means the connection failed
 303            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
 304            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
 305            if client and hasattr(client, "disconnect"):
 306                await client.disconnect()
 307            return []
 308
 309        # ready our parameters
 310        parameters = self.dataset.get_parameters()
 311        queries = [query.strip() for query in parameters.get("query", "").split(",")]
 312        max_items = convert_to_int(parameters.get("items", 10), 10)
 313
 314        # Telethon requires the offset date to be a datetime date
 315        max_date = parameters.get("max_date")
 316        if max_date:
 317            try:
 318                max_date = datetime.fromtimestamp(int(max_date))
 319            except ValueError:
 320                max_date = None
 321
 322        # min_date can remain an integer
 323        min_date = parameters.get("min_date")
 324        if min_date:
 325            try:
 326                min_date = int(min_date)
 327            except ValueError:
 328                min_date = None
 329
 330        posts = []
 331        try:
 332            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
 333                posts.append(post)
 334            return posts
 335        except ProcessorInterruptedException as e:
 336            raise e
 337        except Exception as e:
 338            # catch-all so we can disconnect properly
 339            # ...should we?
 340            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
 341            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
 342            # May as well return what was captured, yes?
 343            return posts
 344        finally:
 345            await client.disconnect()
 346
 347    async def gather_posts(self, client, queries, max_items, min_date, max_date):
 348        """
 349        Gather messages for each entity for which messages are requested
 350
 351        :param TelegramClient client:  Telegram Client
 352        :param list queries:  List of entities to query (as string)
 353        :param int max_items:  Messages to scrape per entity
 354        :param int min_date:  Datetime date to get posts after
 355        :param int max_date:  Datetime date to get posts before
 356        :return list:  List of messages, each message a dictionary.
 357        """
 358        resolve_refs = self.parameters.get("resolve-entities")
 359
 360        # Adding flag to stop; using for rate limits
 361        no_additional_queries = False
 362
 363        # This is used for the 'crawl' feature so we know at which depth a
 364        # given entity was discovered
 365        depth_map = {
 366            entity: 0 for entity in queries
 367        }
 368
 369        crawl_max_depth = self.parameters.get("crawl-depth", 0)
 370        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
 371        crawl_via_links = self.parameters.get("crawl-via-links", False)
 372
 373        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
 374        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
 375
 376        # this keeps track of how often an entity not in the original query
 377        # has been mentioned. When crawling is enabled and this exceeds the
 378        # given threshold, the entity is added to the query
 379        crawl_references = {}
 380        full_query = set(queries)
 381        num_queries = len(queries)
 382
 383        # we may not always know the 'entity username' for an entity ID, so
 384        # keep a reference map as we go
 385        entity_id_map = {}
 386        query_id_map= {}
 387
 388        # Collect queries
 389        # Use while instead of for so we can change queries during iteration
 390        # this is needed for the 'crawl' feature which can discover new
 391        # entities during crawl
 392        processed = 0
 393        total_messages = 0
 394        while queries:
 395            query = queries.pop(0)
 396
 397            delay = 10
 398            retries = 0
 399            processed += 1
 400            self.dataset.update_progress(processed / num_queries)
 401
 402            if no_additional_queries:
 403                # Note that we are not completing this query
 404                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
 405                continue
 406
 407            while True:
 408                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
 409                entity_posts = 0
 410                discovered = 0
 411                try:
 412                    async for message in client.iter_messages(entity=query, offset_date=max_date):
 413                        entity_posts += 1
 414                        total_messages += 1
 415                        if self.interrupted:
 416                            raise ProcessorInterruptedException(
 417                                "Interrupted while fetching message data from the Telegram API")
 418
 419                        if entity_posts % 100 == 0:
 420                            self.dataset.update_status(
 421                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
 422
 423                        if message.action is not None:
 424                            # e.g. someone joins the channel - not an actual message
 425                            continue
 426
 427                        # todo: possibly enrich object with e.g. the name of
 428                        # the channel a message was forwarded from (but that
 429                        # needs extra API requests...)
 430                        serialized_message = SearchTelegram.serialize_obj(message)
 431                        if "_chat" in serialized_message:
 432                            # Add query ID to check if queries have been crawled previously
 433                            full_query.add(serialized_message["_chat"]["id"])
 434                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
 435                                # once we know what a channel ID resolves to, use the username instead so it is easier to
 436                                # understand for the user
 437                                entity_id_map[query] = serialized_message["_chat"]["username"]
 438                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
 439
 440                        if resolve_refs:
 441                            serialized_message = await self.resolve_groups(client, serialized_message)
 442
 443                        # Stop if we're below the min date
 444                        if min_date and serialized_message.get("date") < min_date:
 445                            break
 446
 447                        # if crawling is enabled, see if we found something to add to the query
 448                        linked_entities = set()
 449                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
 450                            message_fwd = serialized_message.get("fwd_from")
 451                            fwd_from = None
 452                            fwd_source_type = None
 453                            if message_fwd and message_fwd.get("from_id"):
 454                                if message_fwd["from_id"].get("_type") == "PeerChannel":
 455                                    # Legacy(?) data structure (pre 2024/7/22)
 456                                    # even if we haven't resolved the ID, we can feed the numeric ID
 457                                    # to Telethon! this is nice because it means we don't have to
 458                                    # resolve entities to crawl iteratively
 459                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
 460                                    fwd_source_type = "channel"
 461                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
 462                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
 463                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
 464                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
 465                                    fwd_source_type = "channel"
 466                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
 467                                    # forwards can also come from users
 468                                    # these can never be followed, so don't add these to the crawl, but do document them
 469                                    fwd_source_type = "user"
 470                                else:
 471                                    print(json.dumps(message_fwd))
 472                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
 473                                    fwd_source_type = "unknown"
 474
 475                                if fwd_from:
 476                                    linked_entities.add(fwd_from)
 477
 478
 479                            if crawl_via_links:
 480                                # t.me links
 481                                all_links = ural.urls_from_text(serialized_message["message"])
 482                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 483                                for link in all_links:
 484                                    if link.startswith("+"):
 485                                        # invite links
 486                                        continue
 487
 488                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 489                                    linked_entities.add(entity_name)
 490
 491                                # @references
 492                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
 493                                for reference in references:
 494                                    if reference.startswith("@"):
 495                                        reference = reference[1:]
 496
 497                                    reference = reference.split("/")[0]
 498
 499                                    linked_entities.add(reference)
 500
 501                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
 502                            for link in linked_entities:
 503                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
 504                                    # new entity discovered!
 505                                    # might be discovered (before collection) multiple times, so retain lowest depth
 506                                    # print(f"Potentially crawling {link}")
 507                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
 508                                    if link not in crawl_references:
 509                                        crawl_references[link] = 0
 510                                    crawl_references[link] += 1
 511
 512                                    # Add to queries if it has been referenced enough times
 513                                    if crawl_references[link] >= crawl_msg_threshold:
 514                                        queries.append(link)
 515                                        full_query.add(link)
 516                                        num_queries += 1
 517                                        discovered += 1
 518                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
 519
 520
 521
 522                        serialized_message["4CAT_metadata"] = {
 523                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
 524                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
 525                            "query_depth": depth_map.get(query, 0)
 526                        }
 527                        yield serialized_message
 528
 529                        if entity_posts >= max_items:
 530                            break
 531
 532                except ChannelPrivateError:
 533                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
 534                    self.flawless += 1
 535
 536                except (UsernameInvalidError,):
 537                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
 538                    self.flawless += 1
 539
 540                except FloodWaitError as e:
 541                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
 542                    if e.seconds < self.end_if_rate_limited:
 543                        time.sleep(e.seconds)
 544                        continue
 545                    else:
 546                        self.flawless += 1
 547                        no_additional_queries = True
 548                        self.dataset.update_status(
 549                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
 550                        break
 551
 552                except BadRequestError as e:
 553                    self.dataset.update_status(
 554                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 555                    self.flawless += 1
 556
 557                except ValueError as e:
 558                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 559                    self.flawless += 1
 560
 561                except ChannelPrivateError as e:
 562                    self.dataset.update_status(
 563                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
 564                    break
 565
 566                except TimeoutError:
 567                    if retries < 3:
 568                        self.dataset.update_status(
 569                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
 570                        self.flawless += 1
 571                        break
 572
 573                    self.dataset.update_status(
 574                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
 575                    time.sleep(delay)
 576                    delay *= 2
 577                    continue
 578
 579                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
 580                break
 581
 582    async def resolve_groups(self, client, message):
 583        """
 584        Recursively resolve references to groups and users
 585
 586        :param client:  Telethon client instance
 587        :param dict message:  Message, as already mapped by serialize_obj
 588        :return:  Resolved dictionary
 589        """
 590        resolved_message = message.copy()
 591        for key, value in message.items():
 592            try:
 593                if type(value) is not dict:
 594                    # if it's not a dict, we never have to resolve it, as it
 595                    # does not represent an entity
 596                    continue
 597
 598                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
 599                    # forwarded from a channel!
 600                    if value["channel_id"] in self.failures_cache:
 601                        continue
 602
 603                    if value["channel_id"] not in self.details_cache:
 604                        channel = await client(GetFullChannelRequest(value["channel_id"]))
 605                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
 606
 607                    resolved_message[key] = self.details_cache[value["channel_id"]]
 608                    resolved_message[key]["channel_id"] = value["channel_id"]
 609
 610                elif "_type" in value and value["_type"] == "PeerUser":
 611                    # a user!
 612                    if value["user_id"] in self.failures_cache:
 613                        continue
 614
 615                    if value["user_id"] not in self.details_cache:
 616                        user = await client(GetFullUserRequest(value["user_id"]))
 617                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
 618
 619                    resolved_message[key] = self.details_cache[value["user_id"]]
 620                    resolved_message[key]["user_id"] = value["user_id"]
 621                else:
 622                    resolved_message[key] = await self.resolve_groups(client, value)
 623
 624            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
 625                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
 626                if type(e) in (ChannelPrivateError, UsernameInvalidError):
 627                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
 628                else:
 629                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
 630
 631        return resolved_message
 632
 633    @staticmethod
 634    def cancel_start():
 635        """
 636        Replace interactive phone number input in Telethon
 637
 638        By default, if Telethon cannot use the given session file to
 639        authenticate, it will interactively prompt the user for a phone
 640        number on the command line. That is not useful here, so instead
 641        raise a RuntimeError. This will be caught below and the user will
 642        be told they need to re-authenticate via 4CAT.
 643        """
 644        raise RuntimeError("Connection cancelled")
 645
 646    @staticmethod
 647    def map_item(message):
 648        """
 649        Convert Message object to 4CAT-ready data object
 650
 651        :param Message message:  Message to parse
 652        :return dict:  4CAT-compatible item object
 653        """
 654        if message["_chat"]["username"]:
 655            # chats can apparently not have usernames???
 656            # truly telegram objects are way too lenient for their own good
 657            thread = message["_chat"]["username"]
 658        elif message["_chat"]["title"]:
 659            thread = re.sub(r"\s", "", message["_chat"]["title"])
 660        else:
 661            # just give up
 662            thread = "unknown"
 663
 664        # determine username
 665        # API responses only include the user *ID*, not the username, and to
 666        # complicate things further not everyone is a user and not everyone
 667        # has a username. If no username is available, try the first and
 668        # last name someone has supplied
 669        fullname = ""
 670        username = ""
 671        user_id = message["_sender"]["id"] if message.get("_sender") else ""
 672        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
 673
 674        if message.get("_sender") and message["_sender"].get("username"):
 675            username = message["_sender"]["username"]
 676
 677        if message.get("_sender") and message["_sender"].get("first_name"):
 678            fullname += message["_sender"]["first_name"]
 679
 680        if message.get("_sender") and message["_sender"].get("last_name"):
 681            fullname += " " + message["_sender"]["last_name"]
 682
 683        fullname = fullname.strip()
 684
 685        # determine media type
 686        # these store some extra information of the attachment in
 687        # attachment_data. Since the final result will be serialised as a csv
 688        # file, we can only store text content. As such some media data is
 689        # serialised as JSON.
 690        attachment_type = SearchTelegram.get_media_type(message["media"])
 691        attachment_filename = ""
 692
 693        if attachment_type == "contact":
 694            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
 695            if message["media"].get('contact', False):
 696                # Old datastructure
 697                attachment = message["media"]["contact"]
 698            elif all([property in message["media"].keys() for property in contact_data]):
 699                # New datastructure 2022/7/25
 700                attachment = message["media"]
 701            else:
 702                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
 703            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
 704
 705        elif attachment_type == "document":
 706            # videos, etc
 707            # This could add a separate routine for videos to make them a
 708            # separate type, which could then be scraped later, etc
 709            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
 710            if attachment_type == "video":
 711                attachment = message["media"]["document"]
 712                attachment_data = json.dumps({
 713                    "id": attachment["id"],
 714                    "dc_id": attachment["dc_id"],
 715                    "file_reference": attachment["file_reference"],
 716                })
 717            else:
 718                attachment_data = ""
 719
 720        # elif attachment_type in ("geo", "geo_live"):
 721        # untested whether geo_live is significantly different from geo
 722        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
 723
 724        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
 725            # we don't actually store any metadata about the photo, since very
 726            # little of the metadata attached is of interest. Instead, the
 727            # actual photos may be downloaded via a processor that is run on the
 728            # search results
 729            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
 730            attachment_data = json.dumps({
 731                "id": attachment["id"],
 732                "dc_id": attachment["dc_id"],
 733                "file_reference": attachment["file_reference"],
 734            })
 735            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
 736
 737        elif attachment_type == "poll":
 738            # unfortunately poll results are only available when someone has
 739            # actually voted on the poll - that will usually not be the case,
 740            # so we store -1 as the vote count
 741            attachment = message["media"]
 742            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
 743            attachment_data = json.dumps({
 744                "question": attachment["poll"]["question"],
 745                "voters": attachment["results"]["total_voters"],
 746                "answers": [{
 747                    "answer": options[answer["option"]],
 748                    "votes": answer["voters"]
 749                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
 750                    "answer": options[option],
 751                    "votes": -1
 752                } for option in options]
 753            })
 754
 755        else:
 756            attachment_data = ""
 757
 758        # was the message forwarded from somewhere and if so when?
 759        forwarded_timestamp = ""
 760        forwarded_name = ""
 761        forwarded_id = ""
 762        forwarded_username = ""
 763        if message.get("fwd_from") and "from_id" in message["fwd_from"] and not (
 764                type(message["fwd_from"]["from_id"]) is int):
 765            # forward information is spread out over a lot of places
 766            # we can identify, in order of usefulness: username, full name,
 767            # and ID. But not all of these are always available, and not
 768            # always in the same place either
 769            forwarded_timestamp = int(message["fwd_from"]["date"])
 770            from_data = message["fwd_from"]["from_id"]
 771
 772            if from_data:
 773                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
 774
 775            if message["fwd_from"].get("from_name"):
 776                forwarded_name = message["fwd_from"].get("from_name")
 777
 778            if from_data and from_data.get("from_name"):
 779                forwarded_name = message["fwd_from"]["from_name"]
 780
 781            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
 782                from_data["user"] = from_data["users"][0]
 783
 784            if from_data and ("user" in from_data or "chats" in from_data):
 785                # 'resolve entities' was enabled for this dataset
 786                if "user" in from_data:
 787                    if from_data["user"].get("username"):
 788                        forwarded_username = from_data["user"]["username"]
 789
 790                    if from_data["user"].get("first_name"):
 791                        forwarded_name = from_data["user"]["first_name"]
 792                    if message["fwd_from"].get("last_name"):
 793                        forwarded_name += "  " + from_data["user"]["last_name"]
 794
 795                    forwarded_name = forwarded_name.strip()
 796
 797                elif "chats" in from_data:
 798                    channel_id = from_data.get("channel_id")
 799                    for chat in from_data["chats"]:
 800                        if chat["id"] == channel_id or channel_id is None:
 801                            forwarded_username = chat["username"]
 802
 803            elif message.get("_forward") and message["_forward"].get("_chat"):
 804                if message["_forward"]["_chat"].get("username"):
 805                    forwarded_username = message["_forward"]["_chat"]["username"]
 806
 807                if message["_forward"]["_chat"].get("title"):
 808                    forwarded_name = message["_forward"]["_chat"]["title"]
 809
 810        link_title = ""
 811        link_attached = ""
 812        link_description = ""
 813        reactions = ""
 814
 815        if message.get("media") and message["media"].get("webpage"):
 816            link_title = message["media"]["webpage"].get("title")
 817            link_attached = message["media"]["webpage"].get("url")
 818            link_description = message["media"]["webpage"].get("description")
 819
 820        if message.get("reactions") and message["reactions"].get("results"):
 821            for reaction in message["reactions"]["results"]:
 822                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
 823                    # Updated to support new reaction datastructure
 824                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
 825                elif type(reaction["reaction"]) is str and "count" in reaction:
 826                    reactions += reaction["reaction"] * reaction["count"]
 827                else:
 828                    # Failsafe; can be updated to support formatting of new datastructures in the future
 829                    reactions += f"{reaction}, "
 830
 831        # t.me links
 832        linked_entities = set()
 833        all_links = ural.urls_from_text(message["message"])
 834        all_links = [link.split("t.me/")[1] for link in all_links if
 835                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 836
 837        for link in all_links:
 838            if link.startswith("+"):
 839                # invite links
 840                continue
 841
 842            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 843            linked_entities.add(entity_name)
 844
 845        # @references
 846        # in execute_queries we use MessageEntityMention to get these
 847        # however, after serializing these objects we only have the offsets of
 848        # the mentioned username, and telegram does weird unicode things to its
 849        # offsets meaning we can't just substring the message. So use a regex
 850        # as a 'good enough' solution
 851        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
 852
 853        # make this case-insensitive since people may use different casing in
 854        # messages than the 'official' username for example
 855        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
 856        all_ci_connections = set()
 857        seen = set()
 858        for connection in all_connections:
 859            if connection.lower() not in seen:
 860                all_ci_connections.add(connection)
 861                seen.add(connection.lower())
 862
 863        return MappedItem({
 864            "id": f"{message['_chat']['username']}-{message['id']}",
 865            "thread_id": thread,
 866            "chat": message["_chat"]["username"],
 867            "author": user_id,
 868            "author_username": username,
 869            "author_name": fullname,
 870            "author_is_bot": "yes" if user_is_bot else "no",
 871            "body": message["message"],
 872            "reply_to": message.get("reply_to_msg_id", ""),
 873            "views": message["views"] if message["views"] else "",
 874            # "forwards": message.get("forwards", MissingMappedField(0)),
 875            "reactions": reactions,
 876            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
 877            "unix_timestamp": int(message["date"]),
 878            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
 879                "edit_date"] else "",
 880            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
 881            "author_forwarded_from_name": forwarded_name,
 882            "author_forwarded_from_username": forwarded_username,
 883            "author_forwarded_from_id": forwarded_id,
 884            "entities_linked": ",".join(linked_entities),
 885            "entities_mentioned": ",".join(all_mentions),
 886            "all_connections": ",".join(all_ci_connections),
 887            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
 888                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
 889            "unix_timestamp_forwarded_from": forwarded_timestamp,
 890            "link_title": link_title,
 891            "link_description": link_description,
 892            "link_attached": link_attached,
 893            "attachment_type": attachment_type,
 894            "attachment_data": attachment_data,
 895            "attachment_filename": attachment_filename
 896        })
 897
 898    @staticmethod
 899    def get_media_type(media):
 900        """
 901        Get media type for a Telegram attachment
 902
 903        :param media:  Media object
 904        :return str:  Textual identifier of the media type
 905        """
 906        try:
 907            return {
 908                "NoneType": "",
 909                "MessageMediaContact": "contact",
 910                "MessageMediaDocument": "document",
 911                "MessageMediaEmpty": "",
 912                "MessageMediaGame": "game",
 913                "MessageMediaGeo": "geo",
 914                "MessageMediaGeoLive": "geo_live",
 915                "MessageMediaInvoice": "invoice",
 916                "MessageMediaPhoto": "photo",
 917                "MessageMediaPoll": "poll",
 918                "MessageMediaUnsupported": "unsupported",
 919                "MessageMediaVenue": "venue",
 920                "MessageMediaWebPage": "url"
 921            }[media.get("_type", None)]
 922        except (AttributeError, KeyError):
 923            return ""
 924
 925    @staticmethod
 926    def serialize_obj(input_obj):
 927        """
 928        Serialize an object as a dictionary
 929
 930        Telethon message objects are not serializable by themselves, but most
 931        relevant attributes are simply struct classes. This function replaces
 932        those that are not with placeholders and then returns a dictionary that
 933        can be serialized as JSON.
 934
 935        :param obj:  Object to serialize
 936        :return:  Serialized object
 937        """
 938        scalars = (int, str, float, list, tuple, set, bool)
 939
 940        if type(input_obj) in scalars or input_obj is None:
 941            return input_obj
 942
 943        if type(input_obj) is not dict:
 944            obj = input_obj.__dict__
 945        else:
 946            obj = input_obj.copy()
 947
 948        mapped_obj = {}
 949        for item, value in obj.items():
 950            if type(value) is datetime:
 951                mapped_obj[item] = value.timestamp()
 952            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 953                mapped_obj[item] = SearchTelegram.serialize_obj(value)
 954            elif type(value) is list:
 955                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
 956            elif type(value) is bytes:
 957                mapped_obj[item] = value.hex()
 958            elif type(value) not in scalars and value is not None:
 959                # type we can't make sense of here
 960                continue
 961            else:
 962                mapped_obj[item] = value
 963
 964        # Add the _type if the original object was a telethon type
 965        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 966            mapped_obj["_type"] = type(input_obj).__name__
 967        return mapped_obj
 968
 969    @staticmethod
 970    def validate_query(query, request, user):
 971        """
 972        Validate Telegram query
 973
 974        :param dict query:  Query parameters, from client-side.
 975        :param request:  Flask request
 976        :param User user:  User object of user who has submitted the query
 977        :return dict:  Safe query parameters
 978        """
 979        # no query 4 u
 980        if not query.get("query", "").strip():
 981            raise QueryParametersException("You must provide a search query.")
 982
 983        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 984            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 985
 986        all_posts = config.get("telegram-search.can_query_all_messages", False, user=user)
 987        max_entities = config.get("telegram-search.max_entities", 25, user=user)
 988
 989        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options()["max_posts"]["max"])
 990
 991        # reformat queries to be a comma-separated list with no wrapping
 992        # whitespace
 993        whitespace = re.compile(r"\s+")
 994        items = whitespace.sub("", query.get("query").replace("\n", ","))
 995        if max_entities > 0 and len(items.split(",")) > max_entities:
 996            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
 997
 998        sanitized_items = []
 999        # handle telegram URLs
1000        for item in items.split(","):
1001            if not item.strip():
1002                continue
1003            item = re.sub(r"^https?://t\.me/", "", item)
1004            item = re.sub(r"^/?s/", "", item)
1005            item = re.sub(r"[/]*$", "", item)
1006            sanitized_items.append(item)
1007
1008        # the dates need to make sense as a range to search within
1009        min_date, max_date = query.get("daterange")
1010
1011        # now check if there is an active API session
1012        if not user or not user.is_authenticated or user.is_anonymous:
1013            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1014                                           "accounts.")
1015
1016        # check for the information we need
1017        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1018                                                      query.get("api_hash"))
1019        user.set_value("telegram.session", session_id)
1020        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session")
1021
1022        client = None
1023
1024        # API ID is always a number, if it's not, we can immediately fail
1025        try:
1026            api_id = int(query.get("api_id"))
1027        except ValueError:
1028            raise QueryParametersException("Invalid API ID.")
1029
1030        # maybe we've entered a code already and submitted it with the request
1031        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1032            code_callback = lambda: request.form.get("option-security-code")
1033            max_attempts = 1
1034        else:
1035            code_callback = lambda: -1
1036            # max_attempts = 0 because authing will always fail: we can't wait for
1037            # the code to be entered interactively, we'll need to do a new request
1038            # but we can't just immediately return, we still need to call start()
1039            # to get telegram to send us a code
1040            max_attempts = 0
1041
1042        # now try authenticating
1043        needs_code = False
1044        try:
1045            loop = asyncio.new_event_loop()
1046            asyncio.set_event_loop(loop)
1047            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1048
1049            try:
1050                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1051
1052            except ValueError as e:
1053                # this happens if 2FA is required
1054                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1055                                               f"does not support this authentication mode for Telegram. ({e})")
1056            except RuntimeError as e:
1057                # A code was sent to the given phone number
1058                needs_code = True
1059        except FloodWaitError as e:
1060            # uh oh, we got rate-limited
1061            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1062                                           str(e).split("(")[0] + ".")
1063        except ApiIdInvalidError as e:
1064            # wrong credentials
1065            raise QueryParametersException("Your API credentials are invalid.")
1066        except PhoneNumberInvalidError as e:
1067            # wrong phone number
1068            raise QueryParametersException(
1069                "The phone number provided is not a valid phone number for these credentials.")
1070        except RPCError as e:
1071            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1072            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1073                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1074        except Exception as e:
1075            # ?
1076            raise QueryParametersException(
1077                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1078        finally:
1079            if client:
1080                client.disconnect()
1081
1082        if needs_code:
1083            raise QueryNeedsFurtherInputException(config={
1084                "code-info": {
1085                    "type": UserInput.OPTION_INFO,
1086                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1087                },
1088                "security-code": {
1089                    "type": UserInput.OPTION_TEXT,
1090                    "help": "Security code",
1091                    "sensitive": True
1092                }})
1093
1094        # simple!
1095        return {
1096            "items": num_items,
1097            "query": ",".join(sanitized_items),
1098            "api_id": query.get("api_id"),
1099            "api_hash": query.get("api_hash"),
1100            "api_phone": query.get("api_phone"),
1101            "save-session": query.get("save-session"),
1102            "resolve-entities": query.get("resolve-entities"),
1103            "min_date": min_date,
1104            "max_date": max_date,
1105            "crawl-depth": query.get("crawl-depth"),
1106            "crawl-threshold": query.get("crawl-threshold"),
1107            "crawl-via-links": query.get("crawl-via-links")
1108        }
1109
1110    @staticmethod
1111    def create_session_id(api_phone, api_id, api_hash):
1112        """
1113        Generate a filename for the session file
1114
1115        This is a combination of phone number and API credentials, but hashed
1116        so that one cannot actually derive someone's phone number from it.
1117
1118        :param str api_phone:  Phone number for API ID
1119        :param int api_id:  Telegram API ID
1120        :param str api_hash:  Telegram API Hash
1121        :return str: A hash value derived from the input
1122        """
1123        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1124        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Search Telegram via API

type = 'telegram-search'
category = 'Search'
title = 'Telegram API search'
description = 'Scrapes messages from open Telegram groups via its API.'
extension = 'ndjson'
is_local = False
is_static = False
details_cache = None
failures_cache = None
eventloop = None
import_issues = 0
end_if_rate_limited = 600
max_workers = 1
max_retries = 3
flawless = 0
config = {'telegram-search.can_query_all_messages': {'type': 'toggle', 'help': 'Remove message amount limit', 'default': False, 'tooltip': 'Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!'}, 'telegram-search.max_entities': {'type': 'string', 'help': 'Max entities to query', 'coerce_type': <class 'int'>, 'min': 0, 'default': 25, 'tooltip': 'Amount of entities that can be queried at a time. Entities are groups or channels. 0 to disable limit.'}, 'telegram-search.max_crawl_depth': {'type': 'string', 'help': 'Max crawl depth', 'coerce_type': <class 'int'>, 'min': 0, 'default': 0, 'tooltip': 'If higher than 0, 4CAT can automatically add new entities to the query based on forwarded messages. Recommended to leave at 0 for most users since this can exponentially increase dataset sizes.'}}
@classmethod
def get_options(cls, parent_dataset=None, user=None):
 84    @classmethod
 85    def get_options(cls, parent_dataset=None, user=None):
 86        """
 87        Get processor options
 88
 89        Just updates the description of the entities field based on the
 90        configured max entities.
 91
 92        :param DataSet parent_dataset:  An object representing the dataset that
 93          the processor would be run on
 94        :param User user:  Flask user the options will be displayed for, in
 95          case they are requested for display in the 4CAT web interface. This can
 96          be used to show some options only to privileges users.
 97        """
 98        max_entities = config.get("telegram-search.max_entities", 25, user=user)
 99        options = {
100            "intro": {
101                "type": UserInput.OPTION_INFO,
102                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
103                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
104                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
105                        "authentication for Telegram."
106            },
107            "api_id": {
108                "type": UserInput.OPTION_TEXT,
109                "help": "API ID",
110                "cache": True,
111            },
112            "api_hash": {
113                "type": UserInput.OPTION_TEXT,
114                "help": "API Hash",
115                "cache": True,
116            },
117            "api_phone": {
118                "type": UserInput.OPTION_TEXT,
119                "help": "Phone number",
120                "cache": True,
121                "default": "+xxxxxxxxxx"
122            },
123            "divider": {
124                "type": UserInput.OPTION_DIVIDER
125            },
126            "query-intro": {
127                "type": UserInput.OPTION_INFO,
128                "help": "Separate with commas or line breaks."
129            },
130            "query": {
131                "type": UserInput.OPTION_TEXT_LARGE,
132                "help": "Entities to scrape",
133                "tooltip": "Separate with commas or line breaks."
134            },
135            "max_posts": {
136                "type": UserInput.OPTION_TEXT,
137                "help": "Messages per group",
138                "min": 1,
139                "max": 50000,
140                "default": 10
141            },
142            "daterange": {
143                "type": UserInput.OPTION_DATERANGE,
144                "help": "Date range"
145            },
146            "divider-2": {
147                "type": UserInput.OPTION_DIVIDER
148            },
149            "info-sensitive": {
150                "type": UserInput.OPTION_INFO,
151                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
152                        "there while data is fetched. After the dataset has been created your credentials will be "
153                        "deleted from the server, unless you enable the option below. If you want to download images "
154                        "attached to the messages in your collected data, you need to enable this option. Your "
155                        "credentials will never be visible to other users and can be erased later via the result page."
156            },
157            "save-session": {
158                "type": UserInput.OPTION_TOGGLE,
159                "help": "Save session:",
160                "default": False
161            },
162            "resolve-entities-intro": {
163                "type": UserInput.OPTION_INFO,
164                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
165                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
166                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
167                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
168                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
169                        "data, so only enable this if you really need it!"
170            },
171            "resolve-entities": {
172                "type": UserInput.OPTION_TOGGLE,
173                "help": "Resolve references",
174                "default": False,
175            }
176        }
177
178        if max_entities:
179            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
180                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
181
182        all_messages = config.get("telegram-search.can_query_all_messages", False, user=user)
183        if all_messages:
184            if "max" in options["max_posts"]:
185                del options["max_posts"]["max"]
186        else:
187            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
188                                             f"{options['max_posts']['max']:,} messages per entity.")
189
190        if config.get("telegram-search.max_crawl_depth", 0, user=user) > 0:
191            options["crawl_intro"] = {
192                "type": UserInput.OPTION_INFO,
193                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
194                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
195                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
196                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
197                        "progress cannot be accurately tracked when you use this feature."
198            }
199            options["crawl-depth"] = {
200                "type": UserInput.OPTION_TEXT,
201                "coerce_type": int,
202                "min": 0,
203                "max": config.get("telegram-search.max_crawl_depth"),
204                "default": 0,
205                "help": "Crawl depth",
206                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
207                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
208                           "starting entities."
209            }
210            options["crawl-threshold"] = {
211                "type": UserInput.OPTION_TEXT,
212                "coerce_type": int,
213                "min": 0,
214                "default": 5,
215                "help": "Crawl threshold",
216                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
217                           "references discovered below the max crawl depth are taken into account."
218            }
219            options["crawl-via-links"] = {
220                "type": UserInput.OPTION_TOGGLE,
221                "default": False,
222                "help": "Extract new groups from links",
223                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
224                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
225                           "links that would otherwise remain undetected."
226            }
227
228        return options

Get processor options

Just updates the description of the entities field based on the configured max entities.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
  • User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
def get_items(self, query):
231    def get_items(self, query):
232        """
233        Execute a query; get messages for given parameters
234
235        Basically a wrapper around execute_queries() to call it with asyncio.
236
237        :param dict query:  Query parameters, as part of the DataSet object
238        :return list:  Posts, sorted by thread and post ID, in ascending order
239        """
240        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
241            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
242                                       "creating it again from scratch.", is_final=True)
243            return None
244
245        self.details_cache = {}
246        self.failures_cache = set()
247        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
248        results = asyncio.run(self.execute_queries())
249
250        if not query.get("save-session"):
251            self.dataset.delete_parameter("api_hash", instant=True)
252            self.dataset.delete_parameter("api_phone", instant=True)
253            self.dataset.delete_parameter("api_id", instant=True)
254
255        if self.flawless:
256            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
257                                       "been private). View the log file for details.", is_final=True)
258
259        return results

Execute a query; get messages for given parameters

Basically a wrapper around execute_queries() to call it with asyncio.

Parameters
  • dict query: Query parameters, as part of the DataSet object
Returns

Posts, sorted by thread and post ID, in ascending order

async def execute_queries(self):
261    async def execute_queries(self):
262        """
263        Get messages for queries
264
265        This is basically what would be done in get_items(), except due to
266        Telethon's architecture this needs to be called in an async method,
267        which is this one.
268
269        :return list:  Collected messages
270        """
271        # session file has been created earlier, and we can re-use it here in
272        # order to avoid having to re-enter the security code
273        query = self.parameters
274
275        session_id = SearchTelegram.create_session_id(query["api_phone"], query["api_id"], query["api_hash"])
276        self.dataset.log(f'Telegram session id: {session_id}')
277        session_path = Path(config.get("PATH_ROOT")).joinpath(config.get("PATH_SESSIONS"), session_id + ".session")
278
279        client = None
280
281        try:
282            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
283                                    loop=self.eventloop)
284            await client.start(phone=SearchTelegram.cancel_start)
285        except RuntimeError:
286            # session is no longer useable, delete file so user will be asked
287            # for security code again. The RuntimeError is raised by
288            # `cancel_start()`
289            self.dataset.update_status(
290                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
291                is_final=True)
292
293            if client and hasattr(client, "disconnect"):
294                await client.disconnect()
295
296            if session_path.exists():
297                session_path.unlink()
298
299            return []
300        except Exception as e:
301            # not sure what exception specifically is triggered here, but it
302            # always means the connection failed
303            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
304            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
305            if client and hasattr(client, "disconnect"):
306                await client.disconnect()
307            return []
308
309        # ready our parameters
310        parameters = self.dataset.get_parameters()
311        queries = [query.strip() for query in parameters.get("query", "").split(",")]
312        max_items = convert_to_int(parameters.get("items", 10), 10)
313
314        # Telethon requires the offset date to be a datetime date
315        max_date = parameters.get("max_date")
316        if max_date:
317            try:
318                max_date = datetime.fromtimestamp(int(max_date))
319            except ValueError:
320                max_date = None
321
322        # min_date can remain an integer
323        min_date = parameters.get("min_date")
324        if min_date:
325            try:
326                min_date = int(min_date)
327            except ValueError:
328                min_date = None
329
330        posts = []
331        try:
332            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
333                posts.append(post)
334            return posts
335        except ProcessorInterruptedException as e:
336            raise e
337        except Exception as e:
338            # catch-all so we can disconnect properly
339            # ...should we?
340            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
341            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
342            # May as well return what was captured, yes?
343            return posts
344        finally:
345            await client.disconnect()

Get messages for queries

This is basically what would be done in get_items(), except due to Telethon's architecture this needs to be called in an async method, which is this one.

Returns

Collected messages

async def gather_posts(self, client, queries, max_items, min_date, max_date):
347    async def gather_posts(self, client, queries, max_items, min_date, max_date):
348        """
349        Gather messages for each entity for which messages are requested
350
351        :param TelegramClient client:  Telegram Client
352        :param list queries:  List of entities to query (as string)
353        :param int max_items:  Messages to scrape per entity
354        :param int min_date:  Datetime date to get posts after
355        :param int max_date:  Datetime date to get posts before
356        :return list:  List of messages, each message a dictionary.
357        """
358        resolve_refs = self.parameters.get("resolve-entities")
359
360        # Adding flag to stop; using for rate limits
361        no_additional_queries = False
362
363        # This is used for the 'crawl' feature so we know at which depth a
364        # given entity was discovered
365        depth_map = {
366            entity: 0 for entity in queries
367        }
368
369        crawl_max_depth = self.parameters.get("crawl-depth", 0)
370        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
371        crawl_via_links = self.parameters.get("crawl-via-links", False)
372
373        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
374        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
375
376        # this keeps track of how often an entity not in the original query
377        # has been mentioned. When crawling is enabled and this exceeds the
378        # given threshold, the entity is added to the query
379        crawl_references = {}
380        full_query = set(queries)
381        num_queries = len(queries)
382
383        # we may not always know the 'entity username' for an entity ID, so
384        # keep a reference map as we go
385        entity_id_map = {}
386        query_id_map= {}
387
388        # Collect queries
389        # Use while instead of for so we can change queries during iteration
390        # this is needed for the 'crawl' feature which can discover new
391        # entities during crawl
392        processed = 0
393        total_messages = 0
394        while queries:
395            query = queries.pop(0)
396
397            delay = 10
398            retries = 0
399            processed += 1
400            self.dataset.update_progress(processed / num_queries)
401
402            if no_additional_queries:
403                # Note that we are not completing this query
404                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
405                continue
406
407            while True:
408                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
409                entity_posts = 0
410                discovered = 0
411                try:
412                    async for message in client.iter_messages(entity=query, offset_date=max_date):
413                        entity_posts += 1
414                        total_messages += 1
415                        if self.interrupted:
416                            raise ProcessorInterruptedException(
417                                "Interrupted while fetching message data from the Telegram API")
418
419                        if entity_posts % 100 == 0:
420                            self.dataset.update_status(
421                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
422
423                        if message.action is not None:
424                            # e.g. someone joins the channel - not an actual message
425                            continue
426
427                        # todo: possibly enrich object with e.g. the name of
428                        # the channel a message was forwarded from (but that
429                        # needs extra API requests...)
430                        serialized_message = SearchTelegram.serialize_obj(message)
431                        if "_chat" in serialized_message:
432                            # Add query ID to check if queries have been crawled previously
433                            full_query.add(serialized_message["_chat"]["id"])
434                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
435                                # once we know what a channel ID resolves to, use the username instead so it is easier to
436                                # understand for the user
437                                entity_id_map[query] = serialized_message["_chat"]["username"]
438                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
439
440                        if resolve_refs:
441                            serialized_message = await self.resolve_groups(client, serialized_message)
442
443                        # Stop if we're below the min date
444                        if min_date and serialized_message.get("date") < min_date:
445                            break
446
447                        # if crawling is enabled, see if we found something to add to the query
448                        linked_entities = set()
449                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
450                            message_fwd = serialized_message.get("fwd_from")
451                            fwd_from = None
452                            fwd_source_type = None
453                            if message_fwd and message_fwd.get("from_id"):
454                                if message_fwd["from_id"].get("_type") == "PeerChannel":
455                                    # Legacy(?) data structure (pre 2024/7/22)
456                                    # even if we haven't resolved the ID, we can feed the numeric ID
457                                    # to Telethon! this is nice because it means we don't have to
458                                    # resolve entities to crawl iteratively
459                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
460                                    fwd_source_type = "channel"
461                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
462                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
463                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
464                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
465                                    fwd_source_type = "channel"
466                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
467                                    # forwards can also come from users
468                                    # these can never be followed, so don't add these to the crawl, but do document them
469                                    fwd_source_type = "user"
470                                else:
471                                    print(json.dumps(message_fwd))
472                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
473                                    fwd_source_type = "unknown"
474
475                                if fwd_from:
476                                    linked_entities.add(fwd_from)
477
478
479                            if crawl_via_links:
480                                # t.me links
481                                all_links = ural.urls_from_text(serialized_message["message"])
482                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
483                                for link in all_links:
484                                    if link.startswith("+"):
485                                        # invite links
486                                        continue
487
488                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
489                                    linked_entities.add(entity_name)
490
491                                # @references
492                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
493                                for reference in references:
494                                    if reference.startswith("@"):
495                                        reference = reference[1:]
496
497                                    reference = reference.split("/")[0]
498
499                                    linked_entities.add(reference)
500
501                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
502                            for link in linked_entities:
503                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
504                                    # new entity discovered!
505                                    # might be discovered (before collection) multiple times, so retain lowest depth
506                                    # print(f"Potentially crawling {link}")
507                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
508                                    if link not in crawl_references:
509                                        crawl_references[link] = 0
510                                    crawl_references[link] += 1
511
512                                    # Add to queries if it has been referenced enough times
513                                    if crawl_references[link] >= crawl_msg_threshold:
514                                        queries.append(link)
515                                        full_query.add(link)
516                                        num_queries += 1
517                                        discovered += 1
518                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
519
520
521
522                        serialized_message["4CAT_metadata"] = {
523                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
524                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
525                            "query_depth": depth_map.get(query, 0)
526                        }
527                        yield serialized_message
528
529                        if entity_posts >= max_items:
530                            break
531
532                except ChannelPrivateError:
533                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
534                    self.flawless += 1
535
536                except (UsernameInvalidError,):
537                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
538                    self.flawless += 1
539
540                except FloodWaitError as e:
541                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
542                    if e.seconds < self.end_if_rate_limited:
543                        time.sleep(e.seconds)
544                        continue
545                    else:
546                        self.flawless += 1
547                        no_additional_queries = True
548                        self.dataset.update_status(
549                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
550                        break
551
552                except BadRequestError as e:
553                    self.dataset.update_status(
554                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
555                    self.flawless += 1
556
557                except ValueError as e:
558                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
559                    self.flawless += 1
560
561                except ChannelPrivateError as e:
562                    self.dataset.update_status(
563                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
564                    break
565
566                except TimeoutError:
567                    if retries < 3:
568                        self.dataset.update_status(
569                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
570                        self.flawless += 1
571                        break
572
573                    self.dataset.update_status(
574                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
575                    time.sleep(delay)
576                    delay *= 2
577                    continue
578
579                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
580                break

Gather messages for each entity for which messages are requested

Parameters
  • TelegramClient client: Telegram Client
  • list queries: List of entities to query (as string)
  • int max_items: Messages to scrape per entity
  • int min_date: Datetime date to get posts after
  • int max_date: Datetime date to get posts before
Returns

List of messages, each message a dictionary.

async def resolve_groups(self, client, message):
582    async def resolve_groups(self, client, message):
583        """
584        Recursively resolve references to groups and users
585
586        :param client:  Telethon client instance
587        :param dict message:  Message, as already mapped by serialize_obj
588        :return:  Resolved dictionary
589        """
590        resolved_message = message.copy()
591        for key, value in message.items():
592            try:
593                if type(value) is not dict:
594                    # if it's not a dict, we never have to resolve it, as it
595                    # does not represent an entity
596                    continue
597
598                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
599                    # forwarded from a channel!
600                    if value["channel_id"] in self.failures_cache:
601                        continue
602
603                    if value["channel_id"] not in self.details_cache:
604                        channel = await client(GetFullChannelRequest(value["channel_id"]))
605                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
606
607                    resolved_message[key] = self.details_cache[value["channel_id"]]
608                    resolved_message[key]["channel_id"] = value["channel_id"]
609
610                elif "_type" in value and value["_type"] == "PeerUser":
611                    # a user!
612                    if value["user_id"] in self.failures_cache:
613                        continue
614
615                    if value["user_id"] not in self.details_cache:
616                        user = await client(GetFullUserRequest(value["user_id"]))
617                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
618
619                    resolved_message[key] = self.details_cache[value["user_id"]]
620                    resolved_message[key]["user_id"] = value["user_id"]
621                else:
622                    resolved_message[key] = await self.resolve_groups(client, value)
623
624            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
625                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
626                if type(e) in (ChannelPrivateError, UsernameInvalidError):
627                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
628                else:
629                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
630
631        return resolved_message

Recursively resolve references to groups and users

Parameters
  • client: Telethon client instance
  • dict message: Message, as already mapped by serialize_obj
Returns

Resolved dictionary

@staticmethod
def cancel_start():
633    @staticmethod
634    def cancel_start():
635        """
636        Replace interactive phone number input in Telethon
637
638        By default, if Telethon cannot use the given session file to
639        authenticate, it will interactively prompt the user for a phone
640        number on the command line. That is not useful here, so instead
641        raise a RuntimeError. This will be caught below and the user will
642        be told they need to re-authenticate via 4CAT.
643        """
644        raise RuntimeError("Connection cancelled")

Replace interactive phone number input in Telethon

By default, if Telethon cannot use the given session file to authenticate, it will interactively prompt the user for a phone number on the command line. That is not useful here, so instead raise a RuntimeError. This will be caught below and the user will be told they need to re-authenticate via 4CAT.

@staticmethod
def map_item(message):
646    @staticmethod
647    def map_item(message):
648        """
649        Convert Message object to 4CAT-ready data object
650
651        :param Message message:  Message to parse
652        :return dict:  4CAT-compatible item object
653        """
654        if message["_chat"]["username"]:
655            # chats can apparently not have usernames???
656            # truly telegram objects are way too lenient for their own good
657            thread = message["_chat"]["username"]
658        elif message["_chat"]["title"]:
659            thread = re.sub(r"\s", "", message["_chat"]["title"])
660        else:
661            # just give up
662            thread = "unknown"
663
664        # determine username
665        # API responses only include the user *ID*, not the username, and to
666        # complicate things further not everyone is a user and not everyone
667        # has a username. If no username is available, try the first and
668        # last name someone has supplied
669        fullname = ""
670        username = ""
671        user_id = message["_sender"]["id"] if message.get("_sender") else ""
672        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
673
674        if message.get("_sender") and message["_sender"].get("username"):
675            username = message["_sender"]["username"]
676
677        if message.get("_sender") and message["_sender"].get("first_name"):
678            fullname += message["_sender"]["first_name"]
679
680        if message.get("_sender") and message["_sender"].get("last_name"):
681            fullname += " " + message["_sender"]["last_name"]
682
683        fullname = fullname.strip()
684
685        # determine media type
686        # these store some extra information of the attachment in
687        # attachment_data. Since the final result will be serialised as a csv
688        # file, we can only store text content. As such some media data is
689        # serialised as JSON.
690        attachment_type = SearchTelegram.get_media_type(message["media"])
691        attachment_filename = ""
692
693        if attachment_type == "contact":
694            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
695            if message["media"].get('contact', False):
696                # Old datastructure
697                attachment = message["media"]["contact"]
698            elif all([property in message["media"].keys() for property in contact_data]):
699                # New datastructure 2022/7/25
700                attachment = message["media"]
701            else:
702                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
703            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
704
705        elif attachment_type == "document":
706            # videos, etc
707            # This could add a separate routine for videos to make them a
708            # separate type, which could then be scraped later, etc
709            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
710            if attachment_type == "video":
711                attachment = message["media"]["document"]
712                attachment_data = json.dumps({
713                    "id": attachment["id"],
714                    "dc_id": attachment["dc_id"],
715                    "file_reference": attachment["file_reference"],
716                })
717            else:
718                attachment_data = ""
719
720        # elif attachment_type in ("geo", "geo_live"):
721        # untested whether geo_live is significantly different from geo
722        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
723
724        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
725            # we don't actually store any metadata about the photo, since very
726            # little of the metadata attached is of interest. Instead, the
727            # actual photos may be downloaded via a processor that is run on the
728            # search results
729            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
730            attachment_data = json.dumps({
731                "id": attachment["id"],
732                "dc_id": attachment["dc_id"],
733                "file_reference": attachment["file_reference"],
734            })
735            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
736
737        elif attachment_type == "poll":
738            # unfortunately poll results are only available when someone has
739            # actually voted on the poll - that will usually not be the case,
740            # so we store -1 as the vote count
741            attachment = message["media"]
742            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
743            attachment_data = json.dumps({
744                "question": attachment["poll"]["question"],
745                "voters": attachment["results"]["total_voters"],
746                "answers": [{
747                    "answer": options[answer["option"]],
748                    "votes": answer["voters"]
749                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
750                    "answer": options[option],
751                    "votes": -1
752                } for option in options]
753            })
754
755        else:
756            attachment_data = ""
757
758        # was the message forwarded from somewhere and if so when?
759        forwarded_timestamp = ""
760        forwarded_name = ""
761        forwarded_id = ""
762        forwarded_username = ""
763        if message.get("fwd_from") and "from_id" in message["fwd_from"] and not (
764                type(message["fwd_from"]["from_id"]) is int):
765            # forward information is spread out over a lot of places
766            # we can identify, in order of usefulness: username, full name,
767            # and ID. But not all of these are always available, and not
768            # always in the same place either
769            forwarded_timestamp = int(message["fwd_from"]["date"])
770            from_data = message["fwd_from"]["from_id"]
771
772            if from_data:
773                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
774
775            if message["fwd_from"].get("from_name"):
776                forwarded_name = message["fwd_from"].get("from_name")
777
778            if from_data and from_data.get("from_name"):
779                forwarded_name = message["fwd_from"]["from_name"]
780
781            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
782                from_data["user"] = from_data["users"][0]
783
784            if from_data and ("user" in from_data or "chats" in from_data):
785                # 'resolve entities' was enabled for this dataset
786                if "user" in from_data:
787                    if from_data["user"].get("username"):
788                        forwarded_username = from_data["user"]["username"]
789
790                    if from_data["user"].get("first_name"):
791                        forwarded_name = from_data["user"]["first_name"]
792                    if message["fwd_from"].get("last_name"):
793                        forwarded_name += "  " + from_data["user"]["last_name"]
794
795                    forwarded_name = forwarded_name.strip()
796
797                elif "chats" in from_data:
798                    channel_id = from_data.get("channel_id")
799                    for chat in from_data["chats"]:
800                        if chat["id"] == channel_id or channel_id is None:
801                            forwarded_username = chat["username"]
802
803            elif message.get("_forward") and message["_forward"].get("_chat"):
804                if message["_forward"]["_chat"].get("username"):
805                    forwarded_username = message["_forward"]["_chat"]["username"]
806
807                if message["_forward"]["_chat"].get("title"):
808                    forwarded_name = message["_forward"]["_chat"]["title"]
809
810        link_title = ""
811        link_attached = ""
812        link_description = ""
813        reactions = ""
814
815        if message.get("media") and message["media"].get("webpage"):
816            link_title = message["media"]["webpage"].get("title")
817            link_attached = message["media"]["webpage"].get("url")
818            link_description = message["media"]["webpage"].get("description")
819
820        if message.get("reactions") and message["reactions"].get("results"):
821            for reaction in message["reactions"]["results"]:
822                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
823                    # Updated to support new reaction datastructure
824                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
825                elif type(reaction["reaction"]) is str and "count" in reaction:
826                    reactions += reaction["reaction"] * reaction["count"]
827                else:
828                    # Failsafe; can be updated to support formatting of new datastructures in the future
829                    reactions += f"{reaction}, "
830
831        # t.me links
832        linked_entities = set()
833        all_links = ural.urls_from_text(message["message"])
834        all_links = [link.split("t.me/")[1] for link in all_links if
835                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
836
837        for link in all_links:
838            if link.startswith("+"):
839                # invite links
840                continue
841
842            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
843            linked_entities.add(entity_name)
844
845        # @references
846        # in execute_queries we use MessageEntityMention to get these
847        # however, after serializing these objects we only have the offsets of
848        # the mentioned username, and telegram does weird unicode things to its
849        # offsets meaning we can't just substring the message. So use a regex
850        # as a 'good enough' solution
851        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
852
853        # make this case-insensitive since people may use different casing in
854        # messages than the 'official' username for example
855        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
856        all_ci_connections = set()
857        seen = set()
858        for connection in all_connections:
859            if connection.lower() not in seen:
860                all_ci_connections.add(connection)
861                seen.add(connection.lower())
862
863        return MappedItem({
864            "id": f"{message['_chat']['username']}-{message['id']}",
865            "thread_id": thread,
866            "chat": message["_chat"]["username"],
867            "author": user_id,
868            "author_username": username,
869            "author_name": fullname,
870            "author_is_bot": "yes" if user_is_bot else "no",
871            "body": message["message"],
872            "reply_to": message.get("reply_to_msg_id", ""),
873            "views": message["views"] if message["views"] else "",
874            # "forwards": message.get("forwards", MissingMappedField(0)),
875            "reactions": reactions,
876            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
877            "unix_timestamp": int(message["date"]),
878            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
879                "edit_date"] else "",
880            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
881            "author_forwarded_from_name": forwarded_name,
882            "author_forwarded_from_username": forwarded_username,
883            "author_forwarded_from_id": forwarded_id,
884            "entities_linked": ",".join(linked_entities),
885            "entities_mentioned": ",".join(all_mentions),
886            "all_connections": ",".join(all_ci_connections),
887            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
888                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
889            "unix_timestamp_forwarded_from": forwarded_timestamp,
890            "link_title": link_title,
891            "link_description": link_description,
892            "link_attached": link_attached,
893            "attachment_type": attachment_type,
894            "attachment_data": attachment_data,
895            "attachment_filename": attachment_filename
896        })

Convert Message object to 4CAT-ready data object

Parameters
  • Message message: Message to parse
Returns

4CAT-compatible item object

@staticmethod
def get_media_type(media):
898    @staticmethod
899    def get_media_type(media):
900        """
901        Get media type for a Telegram attachment
902
903        :param media:  Media object
904        :return str:  Textual identifier of the media type
905        """
906        try:
907            return {
908                "NoneType": "",
909                "MessageMediaContact": "contact",
910                "MessageMediaDocument": "document",
911                "MessageMediaEmpty": "",
912                "MessageMediaGame": "game",
913                "MessageMediaGeo": "geo",
914                "MessageMediaGeoLive": "geo_live",
915                "MessageMediaInvoice": "invoice",
916                "MessageMediaPhoto": "photo",
917                "MessageMediaPoll": "poll",
918                "MessageMediaUnsupported": "unsupported",
919                "MessageMediaVenue": "venue",
920                "MessageMediaWebPage": "url"
921            }[media.get("_type", None)]
922        except (AttributeError, KeyError):
923            return ""

Get media type for a Telegram attachment

Parameters
  • media: Media object
Returns

Textual identifier of the media type

@staticmethod
def serialize_obj(input_obj):
925    @staticmethod
926    def serialize_obj(input_obj):
927        """
928        Serialize an object as a dictionary
929
930        Telethon message objects are not serializable by themselves, but most
931        relevant attributes are simply struct classes. This function replaces
932        those that are not with placeholders and then returns a dictionary that
933        can be serialized as JSON.
934
935        :param obj:  Object to serialize
936        :return:  Serialized object
937        """
938        scalars = (int, str, float, list, tuple, set, bool)
939
940        if type(input_obj) in scalars or input_obj is None:
941            return input_obj
942
943        if type(input_obj) is not dict:
944            obj = input_obj.__dict__
945        else:
946            obj = input_obj.copy()
947
948        mapped_obj = {}
949        for item, value in obj.items():
950            if type(value) is datetime:
951                mapped_obj[item] = value.timestamp()
952            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
953                mapped_obj[item] = SearchTelegram.serialize_obj(value)
954            elif type(value) is list:
955                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
956            elif type(value) is bytes:
957                mapped_obj[item] = value.hex()
958            elif type(value) not in scalars and value is not None:
959                # type we can't make sense of here
960                continue
961            else:
962                mapped_obj[item] = value
963
964        # Add the _type if the original object was a telethon type
965        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
966            mapped_obj["_type"] = type(input_obj).__name__
967        return mapped_obj

Serialize an object as a dictionary

Telethon message objects are not serializable by themselves, but most relevant attributes are simply struct classes. This function replaces those that are not with placeholders and then returns a dictionary that can be serialized as JSON.

Parameters
  • obj: Object to serialize
Returns

Serialized object

@staticmethod
def validate_query(query, request, user):
 969    @staticmethod
 970    def validate_query(query, request, user):
 971        """
 972        Validate Telegram query
 973
 974        :param dict query:  Query parameters, from client-side.
 975        :param request:  Flask request
 976        :param User user:  User object of user who has submitted the query
 977        :return dict:  Safe query parameters
 978        """
 979        # no query 4 u
 980        if not query.get("query", "").strip():
 981            raise QueryParametersException("You must provide a search query.")
 982
 983        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 984            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 985
 986        all_posts = config.get("telegram-search.can_query_all_messages", False, user=user)
 987        max_entities = config.get("telegram-search.max_entities", 25, user=user)
 988
 989        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options()["max_posts"]["max"])
 990
 991        # reformat queries to be a comma-separated list with no wrapping
 992        # whitespace
 993        whitespace = re.compile(r"\s+")
 994        items = whitespace.sub("", query.get("query").replace("\n", ","))
 995        if max_entities > 0 and len(items.split(",")) > max_entities:
 996            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
 997
 998        sanitized_items = []
 999        # handle telegram URLs
1000        for item in items.split(","):
1001            if not item.strip():
1002                continue
1003            item = re.sub(r"^https?://t\.me/", "", item)
1004            item = re.sub(r"^/?s/", "", item)
1005            item = re.sub(r"[/]*$", "", item)
1006            sanitized_items.append(item)
1007
1008        # the dates need to make sense as a range to search within
1009        min_date, max_date = query.get("daterange")
1010
1011        # now check if there is an active API session
1012        if not user or not user.is_authenticated or user.is_anonymous:
1013            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1014                                           "accounts.")
1015
1016        # check for the information we need
1017        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1018                                                      query.get("api_hash"))
1019        user.set_value("telegram.session", session_id)
1020        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session")
1021
1022        client = None
1023
1024        # API ID is always a number, if it's not, we can immediately fail
1025        try:
1026            api_id = int(query.get("api_id"))
1027        except ValueError:
1028            raise QueryParametersException("Invalid API ID.")
1029
1030        # maybe we've entered a code already and submitted it with the request
1031        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1032            code_callback = lambda: request.form.get("option-security-code")
1033            max_attempts = 1
1034        else:
1035            code_callback = lambda: -1
1036            # max_attempts = 0 because authing will always fail: we can't wait for
1037            # the code to be entered interactively, we'll need to do a new request
1038            # but we can't just immediately return, we still need to call start()
1039            # to get telegram to send us a code
1040            max_attempts = 0
1041
1042        # now try authenticating
1043        needs_code = False
1044        try:
1045            loop = asyncio.new_event_loop()
1046            asyncio.set_event_loop(loop)
1047            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1048
1049            try:
1050                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1051
1052            except ValueError as e:
1053                # this happens if 2FA is required
1054                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1055                                               f"does not support this authentication mode for Telegram. ({e})")
1056            except RuntimeError as e:
1057                # A code was sent to the given phone number
1058                needs_code = True
1059        except FloodWaitError as e:
1060            # uh oh, we got rate-limited
1061            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1062                                           str(e).split("(")[0] + ".")
1063        except ApiIdInvalidError as e:
1064            # wrong credentials
1065            raise QueryParametersException("Your API credentials are invalid.")
1066        except PhoneNumberInvalidError as e:
1067            # wrong phone number
1068            raise QueryParametersException(
1069                "The phone number provided is not a valid phone number for these credentials.")
1070        except RPCError as e:
1071            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1072            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1073                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1074        except Exception as e:
1075            # ?
1076            raise QueryParametersException(
1077                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1078        finally:
1079            if client:
1080                client.disconnect()
1081
1082        if needs_code:
1083            raise QueryNeedsFurtherInputException(config={
1084                "code-info": {
1085                    "type": UserInput.OPTION_INFO,
1086                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1087                },
1088                "security-code": {
1089                    "type": UserInput.OPTION_TEXT,
1090                    "help": "Security code",
1091                    "sensitive": True
1092                }})
1093
1094        # simple!
1095        return {
1096            "items": num_items,
1097            "query": ",".join(sanitized_items),
1098            "api_id": query.get("api_id"),
1099            "api_hash": query.get("api_hash"),
1100            "api_phone": query.get("api_phone"),
1101            "save-session": query.get("save-session"),
1102            "resolve-entities": query.get("resolve-entities"),
1103            "min_date": min_date,
1104            "max_date": max_date,
1105            "crawl-depth": query.get("crawl-depth"),
1106            "crawl-threshold": query.get("crawl-threshold"),
1107            "crawl-via-links": query.get("crawl-via-links")
1108        }

Validate Telegram query

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters

@staticmethod
def create_session_id(api_phone, api_id, api_hash):
1110    @staticmethod
1111    def create_session_id(api_phone, api_id, api_hash):
1112        """
1113        Generate a filename for the session file
1114
1115        This is a combination of phone number and API credentials, but hashed
1116        so that one cannot actually derive someone's phone number from it.
1117
1118        :param str api_phone:  Phone number for API ID
1119        :param int api_id:  Telegram API ID
1120        :param str api_hash:  Telegram API Hash
1121        :return str: A hash value derived from the input
1122        """
1123        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1124        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Generate a filename for the session file

This is a combination of phone number and API credentials, but hashed so that one cannot actually derive someone's phone number from it.

Parameters
  • str api_phone: Phone number for API ID
  • int api_id: Telegram API ID
  • str api_hash: Telegram API Hash
Returns

A hash value derived from the input