Edit on GitHub

datasources.telegram.search_telegram

Search Telegram via API

   1"""
   2Search Telegram via API
   3"""
   4import traceback
   5import hashlib
   6import asyncio
   7import json
   8import ural
   9import time
  10import re
  11
  12from backend.lib.search import Search
  13from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException, \
  14    QueryNeedsFurtherInputException
  15from common.lib.helpers import convert_to_int, UserInput
  16from common.lib.item_mapping import MappedItem, MissingMappedField
  17
  18from datetime import datetime
  19from telethon import TelegramClient
  20from telethon.errors.rpcerrorlist import UsernameInvalidError, TimeoutError, ChannelPrivateError, BadRequestError, \
  21    FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError
  22from telethon.tl.functions.channels import GetFullChannelRequest
  23from telethon.tl.functions.users import GetFullUserRequest
  24from telethon.tl.types import MessageEntityMention
  25
  26
  27
  28class SearchTelegram(Search):
  29    """
  30    Search Telegram via API
  31    """
  32    type = "telegram-search"  # job ID
  33    category = "Search"  # category
  34    title = "Telegram API search"  # title displayed in UI
  35    description = "Scrapes messages from open Telegram groups via its API."  # description displayed in UI
  36    extension = "ndjson"  # extension of result file, used internally and in UI
  37    is_local = False  # Whether this datasource is locally scraped
  38    is_static = False  # Whether this datasource is still updated
  39
  40    # cache
  41    details_cache = None
  42    failures_cache = None
  43    eventloop = None
  44    import_issues = 0
  45    end_if_rate_limited = 600  # break if Telegram requires wait time above number of seconds
  46
  47    max_workers = 1
  48    max_retries = 3
  49    flawless = 0
  50
  51    config = {
  52        "telegram-search.can_query_all_messages": {
  53            "type": UserInput.OPTION_TOGGLE,
  54            "help": "Remove message amount limit",
  55            "default": False,
  56            "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!"
  57        },
  58        "telegram-search.max_entities": {
  59            "type": UserInput.OPTION_TEXT,
  60            "help": "Max entities to query",
  61            "coerce_type": int,
  62            "min": 0,
  63            "default": 25,
  64            "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to "
  65                       "disable limit."
  66        },
  67        "telegram-search.max_crawl_depth": {
  68            "type": UserInput.OPTION_TEXT,
  69            "help": "Max crawl depth",
  70            "coerce_type": int,
  71            "min": 0,
  72            "default": 0,
  73            "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded "
  74                       "messages. Recommended to leave at 0 for most users since this can exponentially increase "
  75                       "dataset sizes."
  76        }
  77    }
  78
  79    @classmethod
  80    def get_options(cls, parent_dataset=None, config=None):
  81        """
  82        Get processor options
  83
  84        Just updates the description of the entities field based on the
  85        configured max entities.
  86
  87        :param DataSet parent_dataset:  An object representing the dataset that
  88          the processor would be run on
  89        :param ConfigManager|None config:  Configuration reader (context-aware)
  90        """
  91
  92        max_entities = config.get("telegram-search.max_entities", 25)
  93        options = {
  94            "intro": {
  95                "type": UserInput.OPTION_INFO,
  96                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
  97                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
  98                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
  99                        "authentication for Telegram."
 100            },
 101            "api_id": {
 102                "type": UserInput.OPTION_TEXT,
 103                "help": "API ID",
 104                "cache": True,
 105            },
 106            "api_hash": {
 107                "type": UserInput.OPTION_TEXT,
 108                "help": "API Hash",
 109                "cache": True,
 110            },
 111            "api_phone": {
 112                "type": UserInput.OPTION_TEXT,
 113                "help": "Phone number",
 114                "cache": True,
 115                "default": "+xxxxxxxxxx"
 116            },
 117            "divider": {
 118                "type": UserInput.OPTION_DIVIDER
 119            },
 120            "query-intro": {
 121                "type": UserInput.OPTION_INFO,
 122                "help": "Separate with commas or line breaks."
 123            },
 124            "query": {
 125                "type": UserInput.OPTION_TEXT_LARGE,
 126                "help": "Entities to scrape",
 127                "tooltip": "Separate with commas or line breaks."
 128            },
 129            "max_posts": {
 130                "type": UserInput.OPTION_TEXT,
 131                "help": "Messages per group",
 132                "min": 1,
 133                "max": 50000,
 134                "default": 10
 135            },
 136            "daterange": {
 137                "type": UserInput.OPTION_DATERANGE,
 138                "help": "Date range"
 139            },
 140            "divider-2": {
 141                "type": UserInput.OPTION_DIVIDER
 142            },
 143            "info-sensitive": {
 144                "type": UserInput.OPTION_INFO,
 145                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
 146                        "there while data is fetched. After the dataset has been created your credentials will be "
 147                        "deleted from the server, unless you enable the option below. If you want to download images "
 148                        "attached to the messages in your collected data, you need to enable this option. Your "
 149                        "credentials will never be visible to other users and can be erased later via the result page."
 150            },
 151            "save-session": {
 152                "type": UserInput.OPTION_TOGGLE,
 153                "help": "Save session:",
 154                "default": False
 155            },
 156            "resolve-entities-intro": {
 157                "type": UserInput.OPTION_INFO,
 158                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
 159                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
 160                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
 161                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
 162                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
 163                        "data, so only enable this if you really need it!"
 164            },
 165            "resolve-entities": {
 166                "type": UserInput.OPTION_TOGGLE,
 167                "help": "Resolve references",
 168                "default": False,
 169            }
 170        }
 171
 172        if max_entities:
 173            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
 174                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
 175
 176        all_messages = config.get("telegram-search.can_query_all_messages", False)
 177        if all_messages:
 178            if "max" in options["max_posts"]:
 179                del options["max_posts"]["max"]
 180        else:
 181            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
 182                                             f"{options['max_posts']['max']:,} messages per entity.")
 183
 184        if config.get("telegram-search.max_crawl_depth", 0) > 0:
 185            options["crawl_intro"] = {
 186                "type": UserInput.OPTION_INFO,
 187                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
 188                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
 189                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
 190                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
 191                        "progress cannot be accurately tracked when you use this feature."
 192            }
 193            options["crawl-depth"] = {
 194                "type": UserInput.OPTION_TEXT,
 195                "coerce_type": int,
 196                "min": 0,
 197                "max": config.get("telegram-search.max_crawl_depth"),
 198                "default": 0,
 199                "help": "Crawl depth",
 200                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
 201                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
 202                           "starting entities."
 203            }
 204            options["crawl-threshold"] = {
 205                "type": UserInput.OPTION_TEXT,
 206                "coerce_type": int,
 207                "min": 0,
 208                "default": 5,
 209                "help": "Crawl threshold",
 210                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
 211                           "references discovered below the max crawl depth are taken into account."
 212            }
 213            options["crawl-via-links"] = {
 214                "type": UserInput.OPTION_TOGGLE,
 215                "default": False,
 216                "help": "Extract new groups from links",
 217                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
 218                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
 219                           "links that would otherwise remain undetected."
 220            }
 221
 222        return options
 223
 224
 225    def get_items(self, query):
 226        """
 227        Execute a query; get messages for given parameters
 228
 229        Basically a wrapper around execute_queries() to call it with asyncio.
 230
 231        :param dict query:  Query parameters, as part of the DataSet object
 232        :return list:  Posts, sorted by thread and post ID, in ascending order
 233        """
 234        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
 235            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
 236                                       "creating it again from scratch.", is_final=True)
 237            return None
 238
 239        self.details_cache = {}
 240        self.failures_cache = set()
 241        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
 242        results = asyncio.run(self.execute_queries())
 243
 244        if not query.get("save-session"):
 245            self.dataset.delete_parameter("api_hash", instant=True)
 246            self.dataset.delete_parameter("api_phone", instant=True)
 247            self.dataset.delete_parameter("api_id", instant=True)
 248
 249        if self.flawless:
 250            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
 251                                       "been private). View the log file for details.", is_final=True)
 252
 253        return results
 254
 255    async def execute_queries(self):
 256        """
 257        Get messages for queries
 258
 259        This is basically what would be done in get_items(), except due to
 260        Telethon's architecture this needs to be called in an async method,
 261        which is this one.
 262
 263        :return list:  Collected messages
 264        """
 265        # session file has been created earlier, and we can re-use it here in
 266        # order to avoid having to re-enter the security code
 267        query = self.parameters
 268
 269        session_id = SearchTelegram.create_session_id(query["api_phone"].strip(),
 270                                                      query["api_id"].strip(),
 271                                                      query["api_hash"].strip())
 272        self.dataset.log(f'Telegram session id: {session_id}')
 273        session_path = self.config.get("PATH_SESSIONS").joinpath(session_id + ".session")
 274
 275        client = None
 276
 277        try:
 278            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
 279                                    loop=self.eventloop)
 280            await client.start(phone=SearchTelegram.cancel_start)
 281        except RuntimeError:
 282            # session is no longer useable, delete file so user will be asked
 283            # for security code again. The RuntimeError is raised by
 284            # `cancel_start()`
 285            self.dataset.update_status(
 286                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
 287                is_final=True)
 288
 289            if client and hasattr(client, "disconnect"):
 290                await client.disconnect()
 291
 292            if session_path.exists():
 293                session_path.unlink()
 294
 295            return []
 296        except Exception as e:
 297            # not sure what exception specifically is triggered here, but it
 298            # always means the connection failed
 299            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
 300            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
 301            if client and hasattr(client, "disconnect"):
 302                await client.disconnect()
 303            return []
 304
 305        # ready our parameters
 306        parameters = self.dataset.get_parameters()
 307        queries = [query.strip() for query in parameters.get("query", "").split(",")]
 308        max_items = convert_to_int(parameters.get("items", 10), 10)
 309
 310        # Telethon requires the offset date to be a datetime date
 311        max_date = parameters.get("max_date")
 312        if max_date:
 313            try:
 314                max_date = datetime.fromtimestamp(int(max_date))
 315            except ValueError:
 316                max_date = None
 317
 318        # min_date can remain an integer
 319        min_date = parameters.get("min_date")
 320        if min_date:
 321            try:
 322                min_date = int(min_date)
 323            except ValueError:
 324                min_date = None
 325
 326        posts = []
 327        try:
 328            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
 329                posts.append(post)
 330            return posts
 331        except ProcessorInterruptedException as e:
 332            raise e
 333        except Exception:
 334            # catch-all so we can disconnect properly
 335            # ...should we?
 336            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
 337            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
 338            # May as well return what was captured, yes?
 339            return posts
 340        finally:
 341            await client.disconnect()
 342
 343    async def gather_posts(self, client, queries, max_items, min_date, max_date):
 344        """
 345        Gather messages for each entity for which messages are requested
 346
 347        :param TelegramClient client:  Telegram Client
 348        :param list queries:  List of entities to query (as string)
 349        :param int max_items:  Messages to scrape per entity
 350        :param int min_date:  Datetime date to get posts after
 351        :param int max_date:  Datetime date to get posts before
 352        :return list:  List of messages, each message a dictionary.
 353        """
 354        resolve_refs = self.parameters.get("resolve-entities")
 355
 356        # Adding flag to stop; using for rate limits
 357        no_additional_queries = False
 358
 359        # This is used for the 'crawl' feature so we know at which depth a
 360        # given entity was discovered
 361        depth_map = {
 362            entity: 0 for entity in queries
 363        }
 364
 365        crawl_max_depth = self.parameters.get("crawl-depth", 0)
 366        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
 367        crawl_via_links = self.parameters.get("crawl-via-links", False)
 368
 369        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
 370        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
 371
 372        # this keeps track of how often an entity not in the original query
 373        # has been mentioned. When crawling is enabled and this exceeds the
 374        # given threshold, the entity is added to the query
 375        crawl_references = {}
 376        full_query = set(queries)
 377        num_queries = len(queries)
 378
 379        # we may not always know the 'entity username' for an entity ID, so
 380        # keep a reference map as we go
 381        entity_id_map = {}
 382
 383        # Collect queries
 384        # Use while instead of for so we can change queries during iteration
 385        # this is needed for the 'crawl' feature which can discover new
 386        # entities during crawl
 387        processed = 0
 388        total_messages = 0
 389        while queries:
 390            query = queries.pop(0)
 391
 392            delay = 10
 393            retries = 0
 394            processed += 1
 395            self.dataset.update_progress(processed / num_queries)
 396
 397            if no_additional_queries:
 398                # Note that we are not completing this query
 399                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
 400                continue
 401
 402            while True:
 403                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
 404                entity_posts = 0
 405                discovered = 0
 406                try:
 407                    async for message in client.iter_messages(entity=query, offset_date=max_date):
 408                        entity_posts += 1
 409                        total_messages += 1
 410                        if self.interrupted:
 411                            raise ProcessorInterruptedException(
 412                                "Interrupted while fetching message data from the Telegram API")
 413
 414                        if entity_posts % 100 == 0:
 415                            self.dataset.update_status(
 416                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
 417
 418                        if message.action is not None:
 419                            # e.g. someone joins the channel - not an actual message
 420                            continue
 421
 422                        # todo: possibly enrich object with e.g. the name of
 423                        # the channel a message was forwarded from (but that
 424                        # needs extra API requests...)
 425                        serialized_message = SearchTelegram.serialize_obj(message)
 426                        if "_chat" in serialized_message:
 427                            # Add query ID to check if queries have been crawled previously
 428                            full_query.add(serialized_message["_chat"]["id"])
 429                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
 430                                # once we know what a channel ID resolves to, use the username instead so it is easier to
 431                                # understand for the user
 432                                entity_id_map[query] = serialized_message["_chat"]["username"]
 433                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
 434
 435                        if resolve_refs:
 436                            serialized_message = await self.resolve_groups(client, serialized_message)
 437
 438                        # Stop if we're below the min date
 439                        if min_date and serialized_message.get("date") < min_date:
 440                            break
 441
 442                        # if crawling is enabled, see if we found something to add to the query
 443                        linked_entities = set()
 444                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
 445                            message_fwd = serialized_message.get("fwd_from")
 446                            fwd_from = None
 447                            fwd_source_type = None
 448                            if message_fwd and message_fwd.get("from_id"):
 449                                if message_fwd["from_id"].get("_type") == "PeerChannel":
 450                                    # Legacy(?) data structure (pre 2024/7/22)
 451                                    # even if we haven't resolved the ID, we can feed the numeric ID
 452                                    # to Telethon! this is nice because it means we don't have to
 453                                    # resolve entities to crawl iteratively
 454                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
 455                                    fwd_source_type = "channel"
 456                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
 457                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
 458                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
 459                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
 460                                    fwd_source_type = "channel"
 461                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
 462                                    # forwards can also come from users
 463                                    # these can never be followed, so don't add these to the crawl, but do document them
 464                                    fwd_source_type = "user"
 465                                else:
 466                                    print(json.dumps(message_fwd))
 467                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
 468                                    fwd_source_type = "unknown"
 469
 470                                if fwd_from:
 471                                    linked_entities.add(fwd_from)
 472
 473
 474                            if crawl_via_links:
 475                                # t.me links
 476                                all_links = ural.urls_from_text(serialized_message["message"])
 477                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 478                                for link in all_links:
 479                                    if link.startswith("+"):
 480                                        # invite links
 481                                        continue
 482
 483                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 484                                    linked_entities.add(entity_name)
 485
 486                                # @references
 487                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
 488                                for reference in references:
 489                                    if reference.startswith("@"):
 490                                        reference = reference[1:]
 491
 492                                    reference = reference.split("/")[0]
 493
 494                                    linked_entities.add(reference)
 495
 496                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
 497                            for link in linked_entities:
 498                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
 499                                    # new entity discovered!
 500                                    # might be discovered (before collection) multiple times, so retain lowest depth
 501                                    # print(f"Potentially crawling {link}")
 502                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
 503                                    if link not in crawl_references:
 504                                        crawl_references[link] = 0
 505                                    crawl_references[link] += 1
 506
 507                                    # Add to queries if it has been referenced enough times
 508                                    if crawl_references[link] >= crawl_msg_threshold:
 509                                        queries.append(link)
 510                                        full_query.add(link)
 511                                        num_queries += 1
 512                                        discovered += 1
 513                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
 514
 515
 516
 517                        serialized_message["4CAT_metadata"] = {
 518                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
 519                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
 520                            "query_depth": depth_map.get(query, 0)
 521                        }
 522                        yield serialized_message
 523
 524                        if entity_posts >= max_items:
 525                            break
 526
 527                except ChannelPrivateError:
 528                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
 529                    self.flawless += 1
 530
 531                except (UsernameInvalidError,):
 532                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
 533                    self.flawless += 1
 534
 535                except FloodWaitError as e:
 536                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
 537                    if e.seconds < self.end_if_rate_limited:
 538                        time.sleep(e.seconds)
 539                        continue
 540                    else:
 541                        self.flawless += 1
 542                        no_additional_queries = True
 543                        self.dataset.update_status(
 544                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
 545                        break
 546
 547                except BadRequestError as e:
 548                    self.dataset.update_status(
 549                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 550                    self.flawless += 1
 551
 552                except ValueError as e:
 553                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 554                    self.flawless += 1
 555
 556                except ChannelPrivateError as e:
 557                    self.dataset.update_status(
 558                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
 559                    break
 560
 561                except TimeoutError:
 562                    if retries < 3:
 563                        self.dataset.update_status(
 564                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
 565                        self.flawless += 1
 566                        break
 567
 568                    self.dataset.update_status(
 569                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
 570                    time.sleep(delay)
 571                    delay *= 2
 572                    continue
 573
 574                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
 575                break
 576
 577    async def resolve_groups(self, client, message):
 578        """
 579        Recursively resolve references to groups and users
 580
 581        :param client:  Telethon client instance
 582        :param dict message:  Message, as already mapped by serialize_obj
 583        :return:  Resolved dictionary
 584        """
 585        resolved_message = message.copy()
 586        for key, value in message.items():
 587            try:
 588                if type(value) is not dict:
 589                    # if it's not a dict, we never have to resolve it, as it
 590                    # does not represent an entity
 591                    continue
 592
 593                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
 594                    # forwarded from a channel!
 595                    if value["channel_id"] in self.failures_cache:
 596                        continue
 597
 598                    if value["channel_id"] not in self.details_cache:
 599                        channel = await client(GetFullChannelRequest(value["channel_id"]))
 600                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
 601
 602                    resolved_message[key] = self.details_cache[value["channel_id"]]
 603                    resolved_message[key]["channel_id"] = value["channel_id"]
 604
 605                elif "_type" in value and value["_type"] == "PeerUser":
 606                    # a user!
 607                    if value["user_id"] in self.failures_cache:
 608                        continue
 609
 610                    if value["user_id"] not in self.details_cache:
 611                        user = await client(GetFullUserRequest(value["user_id"]))
 612                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
 613
 614                    resolved_message[key] = self.details_cache[value["user_id"]]
 615                    resolved_message[key]["user_id"] = value["user_id"]
 616                else:
 617                    resolved_message[key] = await self.resolve_groups(client, value)
 618
 619            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
 620                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
 621                if type(e) in (ChannelPrivateError, UsernameInvalidError):
 622                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
 623                else:
 624                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
 625
 626        return resolved_message
 627
 628    @staticmethod
 629    def cancel_start():
 630        """
 631        Replace interactive phone number input in Telethon
 632
 633        By default, if Telethon cannot use the given session file to
 634        authenticate, it will interactively prompt the user for a phone
 635        number on the command line. That is not useful here, so instead
 636        raise a RuntimeError. This will be caught below and the user will
 637        be told they need to re-authenticate via 4CAT.
 638        """
 639        raise RuntimeError("Connection cancelled")
 640
 641    @staticmethod
 642    def map_item(message):
 643        """
 644        Convert Message object to 4CAT-ready data object
 645
 646        :param Message message:  Message to parse
 647        :return dict:  4CAT-compatible item object
 648        """
 649        if message["_chat"]["username"]:
 650            # chats can apparently not have usernames???
 651            # truly telegram objects are way too lenient for their own good
 652            thread = message["_chat"]["username"]
 653        elif message["_chat"]["title"]:
 654            thread = re.sub(r"\s", "", message["_chat"]["title"])
 655        else:
 656            # just give up
 657            thread = "unknown"
 658
 659        # determine username
 660        # API responses only include the user *ID*, not the username, and to
 661        # complicate things further not everyone is a user and not everyone
 662        # has a username. If no username is available, try the first and
 663        # last name someone has supplied
 664        fullname = ""
 665        username = ""
 666        user_id = message["_sender"]["id"] if message.get("_sender") else ""
 667        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
 668
 669        if message.get("_sender") and message["_sender"].get("username"):
 670            username = message["_sender"]["username"]
 671
 672        if message.get("_sender") and message["_sender"].get("first_name"):
 673            fullname += message["_sender"]["first_name"]
 674
 675        if message.get("_sender") and message["_sender"].get("last_name"):
 676            fullname += " " + message["_sender"]["last_name"]
 677
 678        fullname = fullname.strip()
 679
 680        # determine media type
 681        # these store some extra information of the attachment in
 682        # attachment_data. Since the final result will be serialised as a csv
 683        # file, we can only store text content. As such some media data is
 684        # serialised as JSON.
 685        attachment_type = SearchTelegram.get_media_type(message["media"])
 686        attachment_filename = ""
 687
 688        if attachment_type == "contact":
 689            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
 690            if message["media"].get('contact', False):
 691                # Old datastructure
 692                attachment = message["media"]["contact"]
 693            elif all([property in message["media"].keys() for property in contact_data]):
 694                # New datastructure 2022/7/25
 695                attachment = message["media"]
 696            else:
 697                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
 698            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
 699
 700        elif attachment_type == "document":
 701            # videos, etc
 702            # This could add a separate routine for videos to make them a
 703            # separate type, which could then be scraped later, etc
 704            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
 705            if attachment_type == "video":
 706                attachment = message["media"]["document"]
 707                attachment_data = json.dumps({
 708                    "id": attachment["id"],
 709                    "dc_id": attachment["dc_id"],
 710                    "file_reference": attachment["file_reference"],
 711                })
 712            else:
 713                attachment_data = ""
 714
 715        # elif attachment_type in ("geo", "geo_live"):
 716        # untested whether geo_live is significantly different from geo
 717        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
 718
 719        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
 720            # we don't actually store any metadata about the photo, since very
 721            # little of the metadata attached is of interest. Instead, the
 722            # actual photos may be downloaded via a processor that is run on the
 723            # search results
 724            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
 725            attachment_data = json.dumps({
 726                "id": attachment["id"],
 727                "dc_id": attachment["dc_id"],
 728                "file_reference": attachment["file_reference"],
 729            })
 730            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
 731
 732        elif attachment_type == "poll":
 733            # unfortunately poll results are only available when someone has
 734            # actually voted on the poll - that will usually not be the case,
 735            # so we store -1 as the vote count
 736            attachment = message["media"]
 737            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
 738            attachment_data = json.dumps({
 739                "question": attachment["poll"]["question"],
 740                "voters": attachment["results"]["total_voters"],
 741                "answers": [{
 742                    "answer": options[answer["option"]],
 743                    "votes": answer["voters"]
 744                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
 745                    "answer": options[option],
 746                    "votes": -1
 747                } for option in options]
 748            })
 749
 750        else:
 751            attachment_data = ""
 752
 753        # was the message forwarded from somewhere and if so when?
 754        forwarded_timestamp = ""
 755        forwarded_name = ""
 756        forwarded_id = ""
 757        forwarded_username = ""
 758        if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int:
 759            # forward information is spread out over a lot of places
 760            # we can identify, in order of usefulness: username, full name,
 761            # and ID. But not all of these are always available, and not
 762            # always in the same place either
 763            forwarded_timestamp = int(message["fwd_from"]["date"])
 764            from_data = message["fwd_from"]["from_id"]
 765
 766            if from_data:
 767                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
 768
 769            if message["fwd_from"].get("from_name"):
 770                forwarded_name = message["fwd_from"].get("from_name")
 771
 772            if from_data and from_data.get("from_name"):
 773                forwarded_name = message["fwd_from"]["from_name"]
 774
 775            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
 776                from_data["user"] = from_data["users"][0]
 777
 778            if from_data and ("user" in from_data or "chats" in from_data):
 779                # 'resolve entities' was enabled for this dataset
 780                if "user" in from_data:
 781                    if from_data["user"].get("username"):
 782                        forwarded_username = from_data["user"]["username"]
 783
 784                    if from_data["user"].get("first_name"):
 785                        forwarded_name = from_data["user"]["first_name"]
 786                    if message["fwd_from"].get("last_name"):
 787                        forwarded_name += "  " + from_data["user"]["last_name"]
 788
 789                    forwarded_name = forwarded_name.strip()
 790
 791                elif "chats" in from_data:
 792                    channel_id = from_data.get("channel_id")
 793                    for chat in from_data["chats"]:
 794                        if chat["id"] == channel_id or channel_id is None:
 795                            forwarded_username = chat["username"]
 796
 797            elif message.get("_forward") and message["_forward"].get("_chat"):
 798                if message["_forward"]["_chat"].get("username"):
 799                    forwarded_username = message["_forward"]["_chat"]["username"]
 800
 801                if message["_forward"]["_chat"].get("title"):
 802                    forwarded_name = message["_forward"]["_chat"]["title"]
 803
 804        link_title = ""
 805        link_attached = ""
 806        link_description = ""
 807        reactions = ""
 808
 809        if message.get("media") and message["media"].get("webpage"):
 810            link_title = message["media"]["webpage"].get("title")
 811            link_attached = message["media"]["webpage"].get("url")
 812            link_description = message["media"]["webpage"].get("description")
 813
 814        if message.get("reactions") and message["reactions"].get("results"):
 815            for reaction in message["reactions"]["results"]:
 816                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
 817                    # Updated to support new reaction datastructure
 818                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
 819                elif type(reaction["reaction"]) is str and "count" in reaction:
 820                    reactions += reaction["reaction"] * reaction["count"]
 821                else:
 822                    # Failsafe; can be updated to support formatting of new datastructures in the future
 823                    reactions += f"{reaction}, "
 824
 825        is_reply = False
 826        reply_to = ""
 827        if message.get("reply_to"):
 828            is_reply = True
 829            reply_to = message["reply_to"].get("reply_to_msg_id", "")
 830
 831        # t.me links
 832        linked_entities = set()
 833        all_links = ural.urls_from_text(message["message"])
 834        all_links = [link.split("t.me/")[1] for link in all_links if
 835                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 836
 837        for link in all_links:
 838            if link.startswith("+"):
 839                # invite links
 840                continue
 841
 842            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 843            linked_entities.add(entity_name)
 844
 845        # @references
 846        # in execute_queries we use MessageEntityMention to get these
 847        # however, after serializing these objects we only have the offsets of
 848        # the mentioned username, and telegram does weird unicode things to its
 849        # offsets meaning we can't just substring the message. So use a regex
 850        # as a 'good enough' solution
 851        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
 852
 853        # make this case-insensitive since people may use different casing in
 854        # messages than the 'official' username for example
 855        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
 856        all_ci_connections = set()
 857        seen = set()
 858        for connection in all_connections:
 859            if connection.lower() not in seen:
 860                all_ci_connections.add(connection)
 861                seen.add(connection.lower())
 862
 863        return MappedItem({
 864            "id": f"{message['_chat']['username']}-{message['id']}",
 865            "thread_id": thread,
 866            "chat": message["_chat"]["username"],
 867            "author": user_id,
 868            "author_username": username,
 869            "author_name": fullname,
 870            "author_is_bot": "yes" if user_is_bot else "no",
 871            "body": message["message"],
 872            "body_markdown": message.get("message_markdown", MissingMappedField("")),
 873            "is_reply": is_reply,
 874            "reply_to": reply_to,
 875            "views": message["views"] if message["views"] else "",
 876            # "forwards": message.get("forwards", MissingMappedField(0)),
 877            "reactions": reactions,
 878            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
 879            "unix_timestamp": int(message["date"]),
 880            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
 881                "edit_date"] else "",
 882            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
 883            "author_forwarded_from_name": forwarded_name,
 884            "author_forwarded_from_username": forwarded_username,
 885            "author_forwarded_from_id": forwarded_id,
 886            "entities_linked": ",".join(linked_entities),
 887            "entities_mentioned": ",".join(all_mentions),
 888            "all_connections": ",".join(all_ci_connections),
 889            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
 890                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
 891            "unix_timestamp_forwarded_from": forwarded_timestamp,
 892            "link_title": link_title,
 893            "link_description": link_description,
 894            "link_attached": link_attached,
 895            "attachment_type": attachment_type,
 896            "attachment_data": attachment_data,
 897            "attachment_filename": attachment_filename
 898        })
 899
 900    @staticmethod
 901    def get_media_type(media):
 902        """
 903        Get media type for a Telegram attachment
 904
 905        :param media:  Media object
 906        :return str:  Textual identifier of the media type
 907        """
 908        try:
 909            return {
 910                "NoneType": "",
 911                "MessageMediaContact": "contact",
 912                "MessageMediaDocument": "document",
 913                "MessageMediaEmpty": "",
 914                "MessageMediaGame": "game",
 915                "MessageMediaGeo": "geo",
 916                "MessageMediaGeoLive": "geo_live",
 917                "MessageMediaInvoice": "invoice",
 918                "MessageMediaPhoto": "photo",
 919                "MessageMediaPoll": "poll",
 920                "MessageMediaUnsupported": "unsupported",
 921                "MessageMediaVenue": "venue",
 922                "MessageMediaWebPage": "url"
 923            }[media.get("_type", None)]
 924        except (AttributeError, KeyError):
 925            return ""
 926
 927    @staticmethod
 928    def serialize_obj(input_obj):
 929        """
 930        Serialize an object as a dictionary
 931
 932        Telethon message objects are not serializable by themselves, but most
 933        relevant attributes are simply struct classes. This function replaces
 934        those that are not with placeholders and then returns a dictionary that
 935        can be serialized as JSON.
 936
 937        :param obj:  Object to serialize
 938        :return:  Serialized object
 939        """
 940        scalars = (int, str, float, list, tuple, set, bool)
 941
 942        if type(input_obj) in scalars or input_obj is None:
 943            return input_obj
 944
 945        if type(input_obj) is not dict:
 946            obj = input_obj.__dict__
 947        else:
 948            obj = input_obj.copy()
 949
 950        mapped_obj = {}
 951        for item, value in obj.items():
 952            if type(value) is datetime:
 953                mapped_obj[item] = value.timestamp()
 954            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 955                mapped_obj[item] = SearchTelegram.serialize_obj(value)
 956            elif type(value) is list:
 957                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
 958            elif type(value) is bytes:
 959                mapped_obj[item] = value.hex()
 960            elif type(value) not in scalars and value is not None:
 961                # type we can't make sense of here
 962                continue
 963            else:
 964                mapped_obj[item] = value
 965
 966        # Add the _type if the original object was a telethon type
 967        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 968            mapped_obj["_type"] = type(input_obj).__name__
 969
 970        # Store the markdown-formatted text
 971        if type(input_obj).__name__ == "Message":
 972            mapped_obj["message_markdown"] = input_obj.text
 973
 974        return mapped_obj
 975
 976    @staticmethod
 977    def validate_query(query, request, config):
 978        """
 979        Validate Telegram query
 980
 981        :param config:
 982        :param dict query:  Query parameters, from client-side.
 983        :param request:  Flask request
 984        :param User user:  User object of user who has submitted the query
 985        :param ConfigManager config:  Configuration reader (context-aware)
 986        :return dict:  Safe query parameters
 987        """
 988        # no query 4 u
 989        if not query.get("query", "").strip():
 990            raise QueryParametersException("You must provide a search query.")
 991
 992        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 993            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 994
 995        all_posts = config.get("telegram-search.can_query_all_messages", False)
 996        max_entities = config.get("telegram-search.max_entities", 25)
 997
 998        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options(
 999            config=config)["max_posts"]["max"])
1000
1001        # reformat queries to be a comma-separated list with no wrapping
1002        # whitespace
1003        whitespace = re.compile(r"\s+")
1004        items = whitespace.sub("", query.get("query").replace("\n", ","))
1005        if max_entities > 0 and len(items.split(",")) > max_entities:
1006            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
1007
1008        sanitized_items = []
1009        # handle telegram URLs
1010        for item in items.split(","):
1011            if not item.strip():
1012                continue
1013            item = re.sub(r"^https?://t\.me/", "", item)
1014            item = re.sub(r"^/?s/", "", item)
1015            item = re.sub(r"[/]*$", "", item)
1016            sanitized_items.append(item)
1017
1018        # the dates need to make sense as a range to search within
1019        min_date, max_date = query.get("daterange")
1020
1021        # now check if there is an active API session
1022        if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous:
1023            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1024                                           "accounts.")
1025
1026        # check for the information we need
1027        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1028                                                      query.get("api_hash"))
1029        config.user.set_value("telegram.session", session_id)
1030        session_path = config.get('PATH_SESSIONS').joinpath(session_id + ".session")
1031
1032        client = None
1033
1034        # API ID is always a number, if it's not, we can immediately fail
1035        try:
1036            api_id = int(query.get("api_id"))
1037        except ValueError:
1038            raise QueryParametersException("Invalid API ID.")
1039
1040        # maybe we've entered a code already and submitted it with the request
1041        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1042            code_callback = lambda: request.form.get("option-security-code")  # noqa: E731
1043            max_attempts = 1
1044        else:
1045            code_callback = lambda: -1  # noqa: E731
1046            # max_attempts = 0 because authing will always fail: we can't wait for
1047            # the code to be entered interactively, we'll need to do a new request
1048            # but we can't just immediately return, we still need to call start()
1049            # to get telegram to send us a code
1050            max_attempts = 0
1051
1052        # now try authenticating
1053        needs_code = False
1054        try:
1055            loop = asyncio.new_event_loop()
1056            asyncio.set_event_loop(loop)
1057            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1058
1059            try:
1060                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1061
1062            except ValueError as e:
1063                # this happens if 2FA is required
1064                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1065                                               f"does not support this authentication mode for Telegram. ({e})")
1066            except RuntimeError:
1067                # A code was sent to the given phone number
1068                needs_code = True
1069        except FloodWaitError as e:
1070            # uh oh, we got rate-limited
1071            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1072                                           str(e).split("(")[0] + ".")
1073        except ApiIdInvalidError:
1074            # wrong credentials
1075            raise QueryParametersException("Your API credentials are invalid.")
1076        except PhoneNumberInvalidError:
1077            # wrong phone number
1078            raise QueryParametersException(
1079                "The phone number provided is not a valid phone number for these credentials.")
1080        except RPCError as e:
1081            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1082            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1083                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1084        except Exception as e:
1085            # ?
1086            raise QueryParametersException(
1087                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1088        finally:
1089            if client:
1090                client.disconnect()
1091
1092        if needs_code:
1093            raise QueryNeedsFurtherInputException(config={
1094                "code-info": {
1095                    "type": UserInput.OPTION_INFO,
1096                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1097                },
1098                "security-code": {
1099                    "type": UserInput.OPTION_TEXT,
1100                    "help": "Security code",
1101                    "sensitive": True
1102                }})
1103
1104        # simple!
1105        return {
1106            "items": num_items,
1107            "query": ",".join(sanitized_items),
1108            "api_id": query.get("api_id"),
1109            "api_hash": query.get("api_hash"),
1110            "api_phone": query.get("api_phone"),
1111            "save-session": query.get("save-session"),
1112            "resolve-entities": query.get("resolve-entities"),
1113            "min_date": min_date,
1114            "max_date": max_date,
1115            "crawl-depth": query.get("crawl-depth"),
1116            "crawl-threshold": query.get("crawl-threshold"),
1117            "crawl-via-links": query.get("crawl-via-links")
1118        }
1119
1120    @staticmethod
1121    def create_session_id(api_phone, api_id, api_hash):
1122        """
1123        Generate a filename for the session file
1124
1125        This is a combination of phone number and API credentials, but hashed
1126        so that one cannot actually derive someone's phone number from it.
1127
1128        :param str api_phone:  Phone number for API ID
1129        :param int api_id:  Telegram API ID
1130        :param str api_hash:  Telegram API Hash
1131        :return str: A hash value derived from the input
1132        """
1133        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1134        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
class SearchTelegram(backend.lib.search.Search):
  29class SearchTelegram(Search):
  30    """
  31    Search Telegram via API
  32    """
  33    type = "telegram-search"  # job ID
  34    category = "Search"  # category
  35    title = "Telegram API search"  # title displayed in UI
  36    description = "Scrapes messages from open Telegram groups via its API."  # description displayed in UI
  37    extension = "ndjson"  # extension of result file, used internally and in UI
  38    is_local = False  # Whether this datasource is locally scraped
  39    is_static = False  # Whether this datasource is still updated
  40
  41    # cache
  42    details_cache = None
  43    failures_cache = None
  44    eventloop = None
  45    import_issues = 0
  46    end_if_rate_limited = 600  # break if Telegram requires wait time above number of seconds
  47
  48    max_workers = 1
  49    max_retries = 3
  50    flawless = 0
  51
  52    config = {
  53        "telegram-search.can_query_all_messages": {
  54            "type": UserInput.OPTION_TOGGLE,
  55            "help": "Remove message amount limit",
  56            "default": False,
  57            "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!"
  58        },
  59        "telegram-search.max_entities": {
  60            "type": UserInput.OPTION_TEXT,
  61            "help": "Max entities to query",
  62            "coerce_type": int,
  63            "min": 0,
  64            "default": 25,
  65            "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to "
  66                       "disable limit."
  67        },
  68        "telegram-search.max_crawl_depth": {
  69            "type": UserInput.OPTION_TEXT,
  70            "help": "Max crawl depth",
  71            "coerce_type": int,
  72            "min": 0,
  73            "default": 0,
  74            "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded "
  75                       "messages. Recommended to leave at 0 for most users since this can exponentially increase "
  76                       "dataset sizes."
  77        }
  78    }
  79
  80    @classmethod
  81    def get_options(cls, parent_dataset=None, config=None):
  82        """
  83        Get processor options
  84
  85        Just updates the description of the entities field based on the
  86        configured max entities.
  87
  88        :param DataSet parent_dataset:  An object representing the dataset that
  89          the processor would be run on
  90        :param ConfigManager|None config:  Configuration reader (context-aware)
  91        """
  92
  93        max_entities = config.get("telegram-search.max_entities", 25)
  94        options = {
  95            "intro": {
  96                "type": UserInput.OPTION_INFO,
  97                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
  98                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
  99                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
 100                        "authentication for Telegram."
 101            },
 102            "api_id": {
 103                "type": UserInput.OPTION_TEXT,
 104                "help": "API ID",
 105                "cache": True,
 106            },
 107            "api_hash": {
 108                "type": UserInput.OPTION_TEXT,
 109                "help": "API Hash",
 110                "cache": True,
 111            },
 112            "api_phone": {
 113                "type": UserInput.OPTION_TEXT,
 114                "help": "Phone number",
 115                "cache": True,
 116                "default": "+xxxxxxxxxx"
 117            },
 118            "divider": {
 119                "type": UserInput.OPTION_DIVIDER
 120            },
 121            "query-intro": {
 122                "type": UserInput.OPTION_INFO,
 123                "help": "Separate with commas or line breaks."
 124            },
 125            "query": {
 126                "type": UserInput.OPTION_TEXT_LARGE,
 127                "help": "Entities to scrape",
 128                "tooltip": "Separate with commas or line breaks."
 129            },
 130            "max_posts": {
 131                "type": UserInput.OPTION_TEXT,
 132                "help": "Messages per group",
 133                "min": 1,
 134                "max": 50000,
 135                "default": 10
 136            },
 137            "daterange": {
 138                "type": UserInput.OPTION_DATERANGE,
 139                "help": "Date range"
 140            },
 141            "divider-2": {
 142                "type": UserInput.OPTION_DIVIDER
 143            },
 144            "info-sensitive": {
 145                "type": UserInput.OPTION_INFO,
 146                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
 147                        "there while data is fetched. After the dataset has been created your credentials will be "
 148                        "deleted from the server, unless you enable the option below. If you want to download images "
 149                        "attached to the messages in your collected data, you need to enable this option. Your "
 150                        "credentials will never be visible to other users and can be erased later via the result page."
 151            },
 152            "save-session": {
 153                "type": UserInput.OPTION_TOGGLE,
 154                "help": "Save session:",
 155                "default": False
 156            },
 157            "resolve-entities-intro": {
 158                "type": UserInput.OPTION_INFO,
 159                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
 160                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
 161                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
 162                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
 163                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
 164                        "data, so only enable this if you really need it!"
 165            },
 166            "resolve-entities": {
 167                "type": UserInput.OPTION_TOGGLE,
 168                "help": "Resolve references",
 169                "default": False,
 170            }
 171        }
 172
 173        if max_entities:
 174            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
 175                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
 176
 177        all_messages = config.get("telegram-search.can_query_all_messages", False)
 178        if all_messages:
 179            if "max" in options["max_posts"]:
 180                del options["max_posts"]["max"]
 181        else:
 182            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
 183                                             f"{options['max_posts']['max']:,} messages per entity.")
 184
 185        if config.get("telegram-search.max_crawl_depth", 0) > 0:
 186            options["crawl_intro"] = {
 187                "type": UserInput.OPTION_INFO,
 188                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
 189                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
 190                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
 191                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
 192                        "progress cannot be accurately tracked when you use this feature."
 193            }
 194            options["crawl-depth"] = {
 195                "type": UserInput.OPTION_TEXT,
 196                "coerce_type": int,
 197                "min": 0,
 198                "max": config.get("telegram-search.max_crawl_depth"),
 199                "default": 0,
 200                "help": "Crawl depth",
 201                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
 202                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
 203                           "starting entities."
 204            }
 205            options["crawl-threshold"] = {
 206                "type": UserInput.OPTION_TEXT,
 207                "coerce_type": int,
 208                "min": 0,
 209                "default": 5,
 210                "help": "Crawl threshold",
 211                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
 212                           "references discovered below the max crawl depth are taken into account."
 213            }
 214            options["crawl-via-links"] = {
 215                "type": UserInput.OPTION_TOGGLE,
 216                "default": False,
 217                "help": "Extract new groups from links",
 218                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
 219                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
 220                           "links that would otherwise remain undetected."
 221            }
 222
 223        return options
 224
 225
 226    def get_items(self, query):
 227        """
 228        Execute a query; get messages for given parameters
 229
 230        Basically a wrapper around execute_queries() to call it with asyncio.
 231
 232        :param dict query:  Query parameters, as part of the DataSet object
 233        :return list:  Posts, sorted by thread and post ID, in ascending order
 234        """
 235        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
 236            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
 237                                       "creating it again from scratch.", is_final=True)
 238            return None
 239
 240        self.details_cache = {}
 241        self.failures_cache = set()
 242        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
 243        results = asyncio.run(self.execute_queries())
 244
 245        if not query.get("save-session"):
 246            self.dataset.delete_parameter("api_hash", instant=True)
 247            self.dataset.delete_parameter("api_phone", instant=True)
 248            self.dataset.delete_parameter("api_id", instant=True)
 249
 250        if self.flawless:
 251            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
 252                                       "been private). View the log file for details.", is_final=True)
 253
 254        return results
 255
 256    async def execute_queries(self):
 257        """
 258        Get messages for queries
 259
 260        This is basically what would be done in get_items(), except due to
 261        Telethon's architecture this needs to be called in an async method,
 262        which is this one.
 263
 264        :return list:  Collected messages
 265        """
 266        # session file has been created earlier, and we can re-use it here in
 267        # order to avoid having to re-enter the security code
 268        query = self.parameters
 269
 270        session_id = SearchTelegram.create_session_id(query["api_phone"].strip(),
 271                                                      query["api_id"].strip(),
 272                                                      query["api_hash"].strip())
 273        self.dataset.log(f'Telegram session id: {session_id}')
 274        session_path = self.config.get("PATH_SESSIONS").joinpath(session_id + ".session")
 275
 276        client = None
 277
 278        try:
 279            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
 280                                    loop=self.eventloop)
 281            await client.start(phone=SearchTelegram.cancel_start)
 282        except RuntimeError:
 283            # session is no longer useable, delete file so user will be asked
 284            # for security code again. The RuntimeError is raised by
 285            # `cancel_start()`
 286            self.dataset.update_status(
 287                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
 288                is_final=True)
 289
 290            if client and hasattr(client, "disconnect"):
 291                await client.disconnect()
 292
 293            if session_path.exists():
 294                session_path.unlink()
 295
 296            return []
 297        except Exception as e:
 298            # not sure what exception specifically is triggered here, but it
 299            # always means the connection failed
 300            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
 301            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
 302            if client and hasattr(client, "disconnect"):
 303                await client.disconnect()
 304            return []
 305
 306        # ready our parameters
 307        parameters = self.dataset.get_parameters()
 308        queries = [query.strip() for query in parameters.get("query", "").split(",")]
 309        max_items = convert_to_int(parameters.get("items", 10), 10)
 310
 311        # Telethon requires the offset date to be a datetime date
 312        max_date = parameters.get("max_date")
 313        if max_date:
 314            try:
 315                max_date = datetime.fromtimestamp(int(max_date))
 316            except ValueError:
 317                max_date = None
 318
 319        # min_date can remain an integer
 320        min_date = parameters.get("min_date")
 321        if min_date:
 322            try:
 323                min_date = int(min_date)
 324            except ValueError:
 325                min_date = None
 326
 327        posts = []
 328        try:
 329            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
 330                posts.append(post)
 331            return posts
 332        except ProcessorInterruptedException as e:
 333            raise e
 334        except Exception:
 335            # catch-all so we can disconnect properly
 336            # ...should we?
 337            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
 338            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
 339            # May as well return what was captured, yes?
 340            return posts
 341        finally:
 342            await client.disconnect()
 343
 344    async def gather_posts(self, client, queries, max_items, min_date, max_date):
 345        """
 346        Gather messages for each entity for which messages are requested
 347
 348        :param TelegramClient client:  Telegram Client
 349        :param list queries:  List of entities to query (as string)
 350        :param int max_items:  Messages to scrape per entity
 351        :param int min_date:  Datetime date to get posts after
 352        :param int max_date:  Datetime date to get posts before
 353        :return list:  List of messages, each message a dictionary.
 354        """
 355        resolve_refs = self.parameters.get("resolve-entities")
 356
 357        # Adding flag to stop; using for rate limits
 358        no_additional_queries = False
 359
 360        # This is used for the 'crawl' feature so we know at which depth a
 361        # given entity was discovered
 362        depth_map = {
 363            entity: 0 for entity in queries
 364        }
 365
 366        crawl_max_depth = self.parameters.get("crawl-depth", 0)
 367        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
 368        crawl_via_links = self.parameters.get("crawl-via-links", False)
 369
 370        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
 371        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
 372
 373        # this keeps track of how often an entity not in the original query
 374        # has been mentioned. When crawling is enabled and this exceeds the
 375        # given threshold, the entity is added to the query
 376        crawl_references = {}
 377        full_query = set(queries)
 378        num_queries = len(queries)
 379
 380        # we may not always know the 'entity username' for an entity ID, so
 381        # keep a reference map as we go
 382        entity_id_map = {}
 383
 384        # Collect queries
 385        # Use while instead of for so we can change queries during iteration
 386        # this is needed for the 'crawl' feature which can discover new
 387        # entities during crawl
 388        processed = 0
 389        total_messages = 0
 390        while queries:
 391            query = queries.pop(0)
 392
 393            delay = 10
 394            retries = 0
 395            processed += 1
 396            self.dataset.update_progress(processed / num_queries)
 397
 398            if no_additional_queries:
 399                # Note that we are not completing this query
 400                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
 401                continue
 402
 403            while True:
 404                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
 405                entity_posts = 0
 406                discovered = 0
 407                try:
 408                    async for message in client.iter_messages(entity=query, offset_date=max_date):
 409                        entity_posts += 1
 410                        total_messages += 1
 411                        if self.interrupted:
 412                            raise ProcessorInterruptedException(
 413                                "Interrupted while fetching message data from the Telegram API")
 414
 415                        if entity_posts % 100 == 0:
 416                            self.dataset.update_status(
 417                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
 418
 419                        if message.action is not None:
 420                            # e.g. someone joins the channel - not an actual message
 421                            continue
 422
 423                        # todo: possibly enrich object with e.g. the name of
 424                        # the channel a message was forwarded from (but that
 425                        # needs extra API requests...)
 426                        serialized_message = SearchTelegram.serialize_obj(message)
 427                        if "_chat" in serialized_message:
 428                            # Add query ID to check if queries have been crawled previously
 429                            full_query.add(serialized_message["_chat"]["id"])
 430                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
 431                                # once we know what a channel ID resolves to, use the username instead so it is easier to
 432                                # understand for the user
 433                                entity_id_map[query] = serialized_message["_chat"]["username"]
 434                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
 435
 436                        if resolve_refs:
 437                            serialized_message = await self.resolve_groups(client, serialized_message)
 438
 439                        # Stop if we're below the min date
 440                        if min_date and serialized_message.get("date") < min_date:
 441                            break
 442
 443                        # if crawling is enabled, see if we found something to add to the query
 444                        linked_entities = set()
 445                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
 446                            message_fwd = serialized_message.get("fwd_from")
 447                            fwd_from = None
 448                            fwd_source_type = None
 449                            if message_fwd and message_fwd.get("from_id"):
 450                                if message_fwd["from_id"].get("_type") == "PeerChannel":
 451                                    # Legacy(?) data structure (pre 2024/7/22)
 452                                    # even if we haven't resolved the ID, we can feed the numeric ID
 453                                    # to Telethon! this is nice because it means we don't have to
 454                                    # resolve entities to crawl iteratively
 455                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
 456                                    fwd_source_type = "channel"
 457                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
 458                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
 459                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
 460                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
 461                                    fwd_source_type = "channel"
 462                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
 463                                    # forwards can also come from users
 464                                    # these can never be followed, so don't add these to the crawl, but do document them
 465                                    fwd_source_type = "user"
 466                                else:
 467                                    print(json.dumps(message_fwd))
 468                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
 469                                    fwd_source_type = "unknown"
 470
 471                                if fwd_from:
 472                                    linked_entities.add(fwd_from)
 473
 474
 475                            if crawl_via_links:
 476                                # t.me links
 477                                all_links = ural.urls_from_text(serialized_message["message"])
 478                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 479                                for link in all_links:
 480                                    if link.startswith("+"):
 481                                        # invite links
 482                                        continue
 483
 484                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 485                                    linked_entities.add(entity_name)
 486
 487                                # @references
 488                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
 489                                for reference in references:
 490                                    if reference.startswith("@"):
 491                                        reference = reference[1:]
 492
 493                                    reference = reference.split("/")[0]
 494
 495                                    linked_entities.add(reference)
 496
 497                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
 498                            for link in linked_entities:
 499                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
 500                                    # new entity discovered!
 501                                    # might be discovered (before collection) multiple times, so retain lowest depth
 502                                    # print(f"Potentially crawling {link}")
 503                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
 504                                    if link not in crawl_references:
 505                                        crawl_references[link] = 0
 506                                    crawl_references[link] += 1
 507
 508                                    # Add to queries if it has been referenced enough times
 509                                    if crawl_references[link] >= crawl_msg_threshold:
 510                                        queries.append(link)
 511                                        full_query.add(link)
 512                                        num_queries += 1
 513                                        discovered += 1
 514                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
 515
 516
 517
 518                        serialized_message["4CAT_metadata"] = {
 519                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
 520                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
 521                            "query_depth": depth_map.get(query, 0)
 522                        }
 523                        yield serialized_message
 524
 525                        if entity_posts >= max_items:
 526                            break
 527
 528                except ChannelPrivateError:
 529                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
 530                    self.flawless += 1
 531
 532                except (UsernameInvalidError,):
 533                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
 534                    self.flawless += 1
 535
 536                except FloodWaitError as e:
 537                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
 538                    if e.seconds < self.end_if_rate_limited:
 539                        time.sleep(e.seconds)
 540                        continue
 541                    else:
 542                        self.flawless += 1
 543                        no_additional_queries = True
 544                        self.dataset.update_status(
 545                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
 546                        break
 547
 548                except BadRequestError as e:
 549                    self.dataset.update_status(
 550                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 551                    self.flawless += 1
 552
 553                except ValueError as e:
 554                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 555                    self.flawless += 1
 556
 557                except ChannelPrivateError as e:
 558                    self.dataset.update_status(
 559                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
 560                    break
 561
 562                except TimeoutError:
 563                    if retries < 3:
 564                        self.dataset.update_status(
 565                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
 566                        self.flawless += 1
 567                        break
 568
 569                    self.dataset.update_status(
 570                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
 571                    time.sleep(delay)
 572                    delay *= 2
 573                    continue
 574
 575                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
 576                break
 577
 578    async def resolve_groups(self, client, message):
 579        """
 580        Recursively resolve references to groups and users
 581
 582        :param client:  Telethon client instance
 583        :param dict message:  Message, as already mapped by serialize_obj
 584        :return:  Resolved dictionary
 585        """
 586        resolved_message = message.copy()
 587        for key, value in message.items():
 588            try:
 589                if type(value) is not dict:
 590                    # if it's not a dict, we never have to resolve it, as it
 591                    # does not represent an entity
 592                    continue
 593
 594                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
 595                    # forwarded from a channel!
 596                    if value["channel_id"] in self.failures_cache:
 597                        continue
 598
 599                    if value["channel_id"] not in self.details_cache:
 600                        channel = await client(GetFullChannelRequest(value["channel_id"]))
 601                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
 602
 603                    resolved_message[key] = self.details_cache[value["channel_id"]]
 604                    resolved_message[key]["channel_id"] = value["channel_id"]
 605
 606                elif "_type" in value and value["_type"] == "PeerUser":
 607                    # a user!
 608                    if value["user_id"] in self.failures_cache:
 609                        continue
 610
 611                    if value["user_id"] not in self.details_cache:
 612                        user = await client(GetFullUserRequest(value["user_id"]))
 613                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
 614
 615                    resolved_message[key] = self.details_cache[value["user_id"]]
 616                    resolved_message[key]["user_id"] = value["user_id"]
 617                else:
 618                    resolved_message[key] = await self.resolve_groups(client, value)
 619
 620            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
 621                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
 622                if type(e) in (ChannelPrivateError, UsernameInvalidError):
 623                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
 624                else:
 625                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
 626
 627        return resolved_message
 628
 629    @staticmethod
 630    def cancel_start():
 631        """
 632        Replace interactive phone number input in Telethon
 633
 634        By default, if Telethon cannot use the given session file to
 635        authenticate, it will interactively prompt the user for a phone
 636        number on the command line. That is not useful here, so instead
 637        raise a RuntimeError. This will be caught below and the user will
 638        be told they need to re-authenticate via 4CAT.
 639        """
 640        raise RuntimeError("Connection cancelled")
 641
 642    @staticmethod
 643    def map_item(message):
 644        """
 645        Convert Message object to 4CAT-ready data object
 646
 647        :param Message message:  Message to parse
 648        :return dict:  4CAT-compatible item object
 649        """
 650        if message["_chat"]["username"]:
 651            # chats can apparently not have usernames???
 652            # truly telegram objects are way too lenient for their own good
 653            thread = message["_chat"]["username"]
 654        elif message["_chat"]["title"]:
 655            thread = re.sub(r"\s", "", message["_chat"]["title"])
 656        else:
 657            # just give up
 658            thread = "unknown"
 659
 660        # determine username
 661        # API responses only include the user *ID*, not the username, and to
 662        # complicate things further not everyone is a user and not everyone
 663        # has a username. If no username is available, try the first and
 664        # last name someone has supplied
 665        fullname = ""
 666        username = ""
 667        user_id = message["_sender"]["id"] if message.get("_sender") else ""
 668        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
 669
 670        if message.get("_sender") and message["_sender"].get("username"):
 671            username = message["_sender"]["username"]
 672
 673        if message.get("_sender") and message["_sender"].get("first_name"):
 674            fullname += message["_sender"]["first_name"]
 675
 676        if message.get("_sender") and message["_sender"].get("last_name"):
 677            fullname += " " + message["_sender"]["last_name"]
 678
 679        fullname = fullname.strip()
 680
 681        # determine media type
 682        # these store some extra information of the attachment in
 683        # attachment_data. Since the final result will be serialised as a csv
 684        # file, we can only store text content. As such some media data is
 685        # serialised as JSON.
 686        attachment_type = SearchTelegram.get_media_type(message["media"])
 687        attachment_filename = ""
 688
 689        if attachment_type == "contact":
 690            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
 691            if message["media"].get('contact', False):
 692                # Old datastructure
 693                attachment = message["media"]["contact"]
 694            elif all([property in message["media"].keys() for property in contact_data]):
 695                # New datastructure 2022/7/25
 696                attachment = message["media"]
 697            else:
 698                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
 699            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
 700
 701        elif attachment_type == "document":
 702            # videos, etc
 703            # This could add a separate routine for videos to make them a
 704            # separate type, which could then be scraped later, etc
 705            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
 706            if attachment_type == "video":
 707                attachment = message["media"]["document"]
 708                attachment_data = json.dumps({
 709                    "id": attachment["id"],
 710                    "dc_id": attachment["dc_id"],
 711                    "file_reference": attachment["file_reference"],
 712                })
 713            else:
 714                attachment_data = ""
 715
 716        # elif attachment_type in ("geo", "geo_live"):
 717        # untested whether geo_live is significantly different from geo
 718        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
 719
 720        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
 721            # we don't actually store any metadata about the photo, since very
 722            # little of the metadata attached is of interest. Instead, the
 723            # actual photos may be downloaded via a processor that is run on the
 724            # search results
 725            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
 726            attachment_data = json.dumps({
 727                "id": attachment["id"],
 728                "dc_id": attachment["dc_id"],
 729                "file_reference": attachment["file_reference"],
 730            })
 731            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
 732
 733        elif attachment_type == "poll":
 734            # unfortunately poll results are only available when someone has
 735            # actually voted on the poll - that will usually not be the case,
 736            # so we store -1 as the vote count
 737            attachment = message["media"]
 738            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
 739            attachment_data = json.dumps({
 740                "question": attachment["poll"]["question"],
 741                "voters": attachment["results"]["total_voters"],
 742                "answers": [{
 743                    "answer": options[answer["option"]],
 744                    "votes": answer["voters"]
 745                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
 746                    "answer": options[option],
 747                    "votes": -1
 748                } for option in options]
 749            })
 750
 751        else:
 752            attachment_data = ""
 753
 754        # was the message forwarded from somewhere and if so when?
 755        forwarded_timestamp = ""
 756        forwarded_name = ""
 757        forwarded_id = ""
 758        forwarded_username = ""
 759        if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int:
 760            # forward information is spread out over a lot of places
 761            # we can identify, in order of usefulness: username, full name,
 762            # and ID. But not all of these are always available, and not
 763            # always in the same place either
 764            forwarded_timestamp = int(message["fwd_from"]["date"])
 765            from_data = message["fwd_from"]["from_id"]
 766
 767            if from_data:
 768                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
 769
 770            if message["fwd_from"].get("from_name"):
 771                forwarded_name = message["fwd_from"].get("from_name")
 772
 773            if from_data and from_data.get("from_name"):
 774                forwarded_name = message["fwd_from"]["from_name"]
 775
 776            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
 777                from_data["user"] = from_data["users"][0]
 778
 779            if from_data and ("user" in from_data or "chats" in from_data):
 780                # 'resolve entities' was enabled for this dataset
 781                if "user" in from_data:
 782                    if from_data["user"].get("username"):
 783                        forwarded_username = from_data["user"]["username"]
 784
 785                    if from_data["user"].get("first_name"):
 786                        forwarded_name = from_data["user"]["first_name"]
 787                    if message["fwd_from"].get("last_name"):
 788                        forwarded_name += "  " + from_data["user"]["last_name"]
 789
 790                    forwarded_name = forwarded_name.strip()
 791
 792                elif "chats" in from_data:
 793                    channel_id = from_data.get("channel_id")
 794                    for chat in from_data["chats"]:
 795                        if chat["id"] == channel_id or channel_id is None:
 796                            forwarded_username = chat["username"]
 797
 798            elif message.get("_forward") and message["_forward"].get("_chat"):
 799                if message["_forward"]["_chat"].get("username"):
 800                    forwarded_username = message["_forward"]["_chat"]["username"]
 801
 802                if message["_forward"]["_chat"].get("title"):
 803                    forwarded_name = message["_forward"]["_chat"]["title"]
 804
 805        link_title = ""
 806        link_attached = ""
 807        link_description = ""
 808        reactions = ""
 809
 810        if message.get("media") and message["media"].get("webpage"):
 811            link_title = message["media"]["webpage"].get("title")
 812            link_attached = message["media"]["webpage"].get("url")
 813            link_description = message["media"]["webpage"].get("description")
 814
 815        if message.get("reactions") and message["reactions"].get("results"):
 816            for reaction in message["reactions"]["results"]:
 817                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
 818                    # Updated to support new reaction datastructure
 819                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
 820                elif type(reaction["reaction"]) is str and "count" in reaction:
 821                    reactions += reaction["reaction"] * reaction["count"]
 822                else:
 823                    # Failsafe; can be updated to support formatting of new datastructures in the future
 824                    reactions += f"{reaction}, "
 825
 826        is_reply = False
 827        reply_to = ""
 828        if message.get("reply_to"):
 829            is_reply = True
 830            reply_to = message["reply_to"].get("reply_to_msg_id", "")
 831
 832        # t.me links
 833        linked_entities = set()
 834        all_links = ural.urls_from_text(message["message"])
 835        all_links = [link.split("t.me/")[1] for link in all_links if
 836                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 837
 838        for link in all_links:
 839            if link.startswith("+"):
 840                # invite links
 841                continue
 842
 843            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 844            linked_entities.add(entity_name)
 845
 846        # @references
 847        # in execute_queries we use MessageEntityMention to get these
 848        # however, after serializing these objects we only have the offsets of
 849        # the mentioned username, and telegram does weird unicode things to its
 850        # offsets meaning we can't just substring the message. So use a regex
 851        # as a 'good enough' solution
 852        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
 853
 854        # make this case-insensitive since people may use different casing in
 855        # messages than the 'official' username for example
 856        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
 857        all_ci_connections = set()
 858        seen = set()
 859        for connection in all_connections:
 860            if connection.lower() not in seen:
 861                all_ci_connections.add(connection)
 862                seen.add(connection.lower())
 863
 864        return MappedItem({
 865            "id": f"{message['_chat']['username']}-{message['id']}",
 866            "thread_id": thread,
 867            "chat": message["_chat"]["username"],
 868            "author": user_id,
 869            "author_username": username,
 870            "author_name": fullname,
 871            "author_is_bot": "yes" if user_is_bot else "no",
 872            "body": message["message"],
 873            "body_markdown": message.get("message_markdown", MissingMappedField("")),
 874            "is_reply": is_reply,
 875            "reply_to": reply_to,
 876            "views": message["views"] if message["views"] else "",
 877            # "forwards": message.get("forwards", MissingMappedField(0)),
 878            "reactions": reactions,
 879            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
 880            "unix_timestamp": int(message["date"]),
 881            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
 882                "edit_date"] else "",
 883            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
 884            "author_forwarded_from_name": forwarded_name,
 885            "author_forwarded_from_username": forwarded_username,
 886            "author_forwarded_from_id": forwarded_id,
 887            "entities_linked": ",".join(linked_entities),
 888            "entities_mentioned": ",".join(all_mentions),
 889            "all_connections": ",".join(all_ci_connections),
 890            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
 891                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
 892            "unix_timestamp_forwarded_from": forwarded_timestamp,
 893            "link_title": link_title,
 894            "link_description": link_description,
 895            "link_attached": link_attached,
 896            "attachment_type": attachment_type,
 897            "attachment_data": attachment_data,
 898            "attachment_filename": attachment_filename
 899        })
 900
 901    @staticmethod
 902    def get_media_type(media):
 903        """
 904        Get media type for a Telegram attachment
 905
 906        :param media:  Media object
 907        :return str:  Textual identifier of the media type
 908        """
 909        try:
 910            return {
 911                "NoneType": "",
 912                "MessageMediaContact": "contact",
 913                "MessageMediaDocument": "document",
 914                "MessageMediaEmpty": "",
 915                "MessageMediaGame": "game",
 916                "MessageMediaGeo": "geo",
 917                "MessageMediaGeoLive": "geo_live",
 918                "MessageMediaInvoice": "invoice",
 919                "MessageMediaPhoto": "photo",
 920                "MessageMediaPoll": "poll",
 921                "MessageMediaUnsupported": "unsupported",
 922                "MessageMediaVenue": "venue",
 923                "MessageMediaWebPage": "url"
 924            }[media.get("_type", None)]
 925        except (AttributeError, KeyError):
 926            return ""
 927
 928    @staticmethod
 929    def serialize_obj(input_obj):
 930        """
 931        Serialize an object as a dictionary
 932
 933        Telethon message objects are not serializable by themselves, but most
 934        relevant attributes are simply struct classes. This function replaces
 935        those that are not with placeholders and then returns a dictionary that
 936        can be serialized as JSON.
 937
 938        :param obj:  Object to serialize
 939        :return:  Serialized object
 940        """
 941        scalars = (int, str, float, list, tuple, set, bool)
 942
 943        if type(input_obj) in scalars or input_obj is None:
 944            return input_obj
 945
 946        if type(input_obj) is not dict:
 947            obj = input_obj.__dict__
 948        else:
 949            obj = input_obj.copy()
 950
 951        mapped_obj = {}
 952        for item, value in obj.items():
 953            if type(value) is datetime:
 954                mapped_obj[item] = value.timestamp()
 955            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 956                mapped_obj[item] = SearchTelegram.serialize_obj(value)
 957            elif type(value) is list:
 958                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
 959            elif type(value) is bytes:
 960                mapped_obj[item] = value.hex()
 961            elif type(value) not in scalars and value is not None:
 962                # type we can't make sense of here
 963                continue
 964            else:
 965                mapped_obj[item] = value
 966
 967        # Add the _type if the original object was a telethon type
 968        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 969            mapped_obj["_type"] = type(input_obj).__name__
 970
 971        # Store the markdown-formatted text
 972        if type(input_obj).__name__ == "Message":
 973            mapped_obj["message_markdown"] = input_obj.text
 974
 975        return mapped_obj
 976
 977    @staticmethod
 978    def validate_query(query, request, config):
 979        """
 980        Validate Telegram query
 981
 982        :param config:
 983        :param dict query:  Query parameters, from client-side.
 984        :param request:  Flask request
 985        :param User user:  User object of user who has submitted the query
 986        :param ConfigManager config:  Configuration reader (context-aware)
 987        :return dict:  Safe query parameters
 988        """
 989        # no query 4 u
 990        if not query.get("query", "").strip():
 991            raise QueryParametersException("You must provide a search query.")
 992
 993        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 994            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 995
 996        all_posts = config.get("telegram-search.can_query_all_messages", False)
 997        max_entities = config.get("telegram-search.max_entities", 25)
 998
 999        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options(
1000            config=config)["max_posts"]["max"])
1001
1002        # reformat queries to be a comma-separated list with no wrapping
1003        # whitespace
1004        whitespace = re.compile(r"\s+")
1005        items = whitespace.sub("", query.get("query").replace("\n", ","))
1006        if max_entities > 0 and len(items.split(",")) > max_entities:
1007            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
1008
1009        sanitized_items = []
1010        # handle telegram URLs
1011        for item in items.split(","):
1012            if not item.strip():
1013                continue
1014            item = re.sub(r"^https?://t\.me/", "", item)
1015            item = re.sub(r"^/?s/", "", item)
1016            item = re.sub(r"[/]*$", "", item)
1017            sanitized_items.append(item)
1018
1019        # the dates need to make sense as a range to search within
1020        min_date, max_date = query.get("daterange")
1021
1022        # now check if there is an active API session
1023        if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous:
1024            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1025                                           "accounts.")
1026
1027        # check for the information we need
1028        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1029                                                      query.get("api_hash"))
1030        config.user.set_value("telegram.session", session_id)
1031        session_path = config.get('PATH_SESSIONS').joinpath(session_id + ".session")
1032
1033        client = None
1034
1035        # API ID is always a number, if it's not, we can immediately fail
1036        try:
1037            api_id = int(query.get("api_id"))
1038        except ValueError:
1039            raise QueryParametersException("Invalid API ID.")
1040
1041        # maybe we've entered a code already and submitted it with the request
1042        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1043            code_callback = lambda: request.form.get("option-security-code")  # noqa: E731
1044            max_attempts = 1
1045        else:
1046            code_callback = lambda: -1  # noqa: E731
1047            # max_attempts = 0 because authing will always fail: we can't wait for
1048            # the code to be entered interactively, we'll need to do a new request
1049            # but we can't just immediately return, we still need to call start()
1050            # to get telegram to send us a code
1051            max_attempts = 0
1052
1053        # now try authenticating
1054        needs_code = False
1055        try:
1056            loop = asyncio.new_event_loop()
1057            asyncio.set_event_loop(loop)
1058            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1059
1060            try:
1061                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1062
1063            except ValueError as e:
1064                # this happens if 2FA is required
1065                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1066                                               f"does not support this authentication mode for Telegram. ({e})")
1067            except RuntimeError:
1068                # A code was sent to the given phone number
1069                needs_code = True
1070        except FloodWaitError as e:
1071            # uh oh, we got rate-limited
1072            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1073                                           str(e).split("(")[0] + ".")
1074        except ApiIdInvalidError:
1075            # wrong credentials
1076            raise QueryParametersException("Your API credentials are invalid.")
1077        except PhoneNumberInvalidError:
1078            # wrong phone number
1079            raise QueryParametersException(
1080                "The phone number provided is not a valid phone number for these credentials.")
1081        except RPCError as e:
1082            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1083            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1084                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1085        except Exception as e:
1086            # ?
1087            raise QueryParametersException(
1088                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1089        finally:
1090            if client:
1091                client.disconnect()
1092
1093        if needs_code:
1094            raise QueryNeedsFurtherInputException(config={
1095                "code-info": {
1096                    "type": UserInput.OPTION_INFO,
1097                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1098                },
1099                "security-code": {
1100                    "type": UserInput.OPTION_TEXT,
1101                    "help": "Security code",
1102                    "sensitive": True
1103                }})
1104
1105        # simple!
1106        return {
1107            "items": num_items,
1108            "query": ",".join(sanitized_items),
1109            "api_id": query.get("api_id"),
1110            "api_hash": query.get("api_hash"),
1111            "api_phone": query.get("api_phone"),
1112            "save-session": query.get("save-session"),
1113            "resolve-entities": query.get("resolve-entities"),
1114            "min_date": min_date,
1115            "max_date": max_date,
1116            "crawl-depth": query.get("crawl-depth"),
1117            "crawl-threshold": query.get("crawl-threshold"),
1118            "crawl-via-links": query.get("crawl-via-links")
1119        }
1120
1121    @staticmethod
1122    def create_session_id(api_phone, api_id, api_hash):
1123        """
1124        Generate a filename for the session file
1125
1126        This is a combination of phone number and API credentials, but hashed
1127        so that one cannot actually derive someone's phone number from it.
1128
1129        :param str api_phone:  Phone number for API ID
1130        :param int api_id:  Telegram API ID
1131        :param str api_hash:  Telegram API Hash
1132        :return str: A hash value derived from the input
1133        """
1134        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1135        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Search Telegram via API

type = 'telegram-search'
category = 'Search'
title = 'Telegram API search'
description = 'Scrapes messages from open Telegram groups via its API.'
extension = 'ndjson'
is_local = False
is_static = False
details_cache = None
failures_cache = None
eventloop = None
import_issues = 0
end_if_rate_limited = 600
max_workers = 1
max_retries = 3
flawless = 0
config = {'telegram-search.can_query_all_messages': {'type': 'toggle', 'help': 'Remove message amount limit', 'default': False, 'tooltip': 'Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!'}, 'telegram-search.max_entities': {'type': 'string', 'help': 'Max entities to query', 'coerce_type': <class 'int'>, 'min': 0, 'default': 25, 'tooltip': 'Amount of entities that can be queried at a time. Entities are groups or channels. 0 to disable limit.'}, 'telegram-search.max_crawl_depth': {'type': 'string', 'help': 'Max crawl depth', 'coerce_type': <class 'int'>, 'min': 0, 'default': 0, 'tooltip': 'If higher than 0, 4CAT can automatically add new entities to the query based on forwarded messages. Recommended to leave at 0 for most users since this can exponentially increase dataset sizes.'}}
@classmethod
def get_options(cls, parent_dataset=None, config=None):
 80    @classmethod
 81    def get_options(cls, parent_dataset=None, config=None):
 82        """
 83        Get processor options
 84
 85        Just updates the description of the entities field based on the
 86        configured max entities.
 87
 88        :param DataSet parent_dataset:  An object representing the dataset that
 89          the processor would be run on
 90        :param ConfigManager|None config:  Configuration reader (context-aware)
 91        """
 92
 93        max_entities = config.get("telegram-search.max_entities", 25)
 94        options = {
 95            "intro": {
 96                "type": UserInput.OPTION_INFO,
 97                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
 98                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
 99                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
100                        "authentication for Telegram."
101            },
102            "api_id": {
103                "type": UserInput.OPTION_TEXT,
104                "help": "API ID",
105                "cache": True,
106            },
107            "api_hash": {
108                "type": UserInput.OPTION_TEXT,
109                "help": "API Hash",
110                "cache": True,
111            },
112            "api_phone": {
113                "type": UserInput.OPTION_TEXT,
114                "help": "Phone number",
115                "cache": True,
116                "default": "+xxxxxxxxxx"
117            },
118            "divider": {
119                "type": UserInput.OPTION_DIVIDER
120            },
121            "query-intro": {
122                "type": UserInput.OPTION_INFO,
123                "help": "Separate with commas or line breaks."
124            },
125            "query": {
126                "type": UserInput.OPTION_TEXT_LARGE,
127                "help": "Entities to scrape",
128                "tooltip": "Separate with commas or line breaks."
129            },
130            "max_posts": {
131                "type": UserInput.OPTION_TEXT,
132                "help": "Messages per group",
133                "min": 1,
134                "max": 50000,
135                "default": 10
136            },
137            "daterange": {
138                "type": UserInput.OPTION_DATERANGE,
139                "help": "Date range"
140            },
141            "divider-2": {
142                "type": UserInput.OPTION_DIVIDER
143            },
144            "info-sensitive": {
145                "type": UserInput.OPTION_INFO,
146                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
147                        "there while data is fetched. After the dataset has been created your credentials will be "
148                        "deleted from the server, unless you enable the option below. If you want to download images "
149                        "attached to the messages in your collected data, you need to enable this option. Your "
150                        "credentials will never be visible to other users and can be erased later via the result page."
151            },
152            "save-session": {
153                "type": UserInput.OPTION_TOGGLE,
154                "help": "Save session:",
155                "default": False
156            },
157            "resolve-entities-intro": {
158                "type": UserInput.OPTION_INFO,
159                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
160                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
161                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
162                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
163                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
164                        "data, so only enable this if you really need it!"
165            },
166            "resolve-entities": {
167                "type": UserInput.OPTION_TOGGLE,
168                "help": "Resolve references",
169                "default": False,
170            }
171        }
172
173        if max_entities:
174            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
175                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
176
177        all_messages = config.get("telegram-search.can_query_all_messages", False)
178        if all_messages:
179            if "max" in options["max_posts"]:
180                del options["max_posts"]["max"]
181        else:
182            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
183                                             f"{options['max_posts']['max']:,} messages per entity.")
184
185        if config.get("telegram-search.max_crawl_depth", 0) > 0:
186            options["crawl_intro"] = {
187                "type": UserInput.OPTION_INFO,
188                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
189                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
190                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
191                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
192                        "progress cannot be accurately tracked when you use this feature."
193            }
194            options["crawl-depth"] = {
195                "type": UserInput.OPTION_TEXT,
196                "coerce_type": int,
197                "min": 0,
198                "max": config.get("telegram-search.max_crawl_depth"),
199                "default": 0,
200                "help": "Crawl depth",
201                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
202                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
203                           "starting entities."
204            }
205            options["crawl-threshold"] = {
206                "type": UserInput.OPTION_TEXT,
207                "coerce_type": int,
208                "min": 0,
209                "default": 5,
210                "help": "Crawl threshold",
211                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
212                           "references discovered below the max crawl depth are taken into account."
213            }
214            options["crawl-via-links"] = {
215                "type": UserInput.OPTION_TOGGLE,
216                "default": False,
217                "help": "Extract new groups from links",
218                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
219                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
220                           "links that would otherwise remain undetected."
221            }
222
223        return options

Get processor options

Just updates the description of the entities field based on the configured max entities.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
  • ConfigManager|None config: Configuration reader (context-aware)
def get_items(self, query):
226    def get_items(self, query):
227        """
228        Execute a query; get messages for given parameters
229
230        Basically a wrapper around execute_queries() to call it with asyncio.
231
232        :param dict query:  Query parameters, as part of the DataSet object
233        :return list:  Posts, sorted by thread and post ID, in ascending order
234        """
235        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
236            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
237                                       "creating it again from scratch.", is_final=True)
238            return None
239
240        self.details_cache = {}
241        self.failures_cache = set()
242        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
243        results = asyncio.run(self.execute_queries())
244
245        if not query.get("save-session"):
246            self.dataset.delete_parameter("api_hash", instant=True)
247            self.dataset.delete_parameter("api_phone", instant=True)
248            self.dataset.delete_parameter("api_id", instant=True)
249
250        if self.flawless:
251            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
252                                       "been private). View the log file for details.", is_final=True)
253
254        return results

Execute a query; get messages for given parameters

Basically a wrapper around execute_queries() to call it with asyncio.

Parameters
  • dict query: Query parameters, as part of the DataSet object
Returns

Posts, sorted by thread and post ID, in ascending order

async def execute_queries(self):
256    async def execute_queries(self):
257        """
258        Get messages for queries
259
260        This is basically what would be done in get_items(), except due to
261        Telethon's architecture this needs to be called in an async method,
262        which is this one.
263
264        :return list:  Collected messages
265        """
266        # session file has been created earlier, and we can re-use it here in
267        # order to avoid having to re-enter the security code
268        query = self.parameters
269
270        session_id = SearchTelegram.create_session_id(query["api_phone"].strip(),
271                                                      query["api_id"].strip(),
272                                                      query["api_hash"].strip())
273        self.dataset.log(f'Telegram session id: {session_id}')
274        session_path = self.config.get("PATH_SESSIONS").joinpath(session_id + ".session")
275
276        client = None
277
278        try:
279            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
280                                    loop=self.eventloop)
281            await client.start(phone=SearchTelegram.cancel_start)
282        except RuntimeError:
283            # session is no longer useable, delete file so user will be asked
284            # for security code again. The RuntimeError is raised by
285            # `cancel_start()`
286            self.dataset.update_status(
287                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
288                is_final=True)
289
290            if client and hasattr(client, "disconnect"):
291                await client.disconnect()
292
293            if session_path.exists():
294                session_path.unlink()
295
296            return []
297        except Exception as e:
298            # not sure what exception specifically is triggered here, but it
299            # always means the connection failed
300            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
301            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
302            if client and hasattr(client, "disconnect"):
303                await client.disconnect()
304            return []
305
306        # ready our parameters
307        parameters = self.dataset.get_parameters()
308        queries = [query.strip() for query in parameters.get("query", "").split(",")]
309        max_items = convert_to_int(parameters.get("items", 10), 10)
310
311        # Telethon requires the offset date to be a datetime date
312        max_date = parameters.get("max_date")
313        if max_date:
314            try:
315                max_date = datetime.fromtimestamp(int(max_date))
316            except ValueError:
317                max_date = None
318
319        # min_date can remain an integer
320        min_date = parameters.get("min_date")
321        if min_date:
322            try:
323                min_date = int(min_date)
324            except ValueError:
325                min_date = None
326
327        posts = []
328        try:
329            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
330                posts.append(post)
331            return posts
332        except ProcessorInterruptedException as e:
333            raise e
334        except Exception:
335            # catch-all so we can disconnect properly
336            # ...should we?
337            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
338            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
339            # May as well return what was captured, yes?
340            return posts
341        finally:
342            await client.disconnect()

Get messages for queries

This is basically what would be done in get_items(), except due to Telethon's architecture this needs to be called in an async method, which is this one.

Returns

Collected messages

async def gather_posts(self, client, queries, max_items, min_date, max_date):
344    async def gather_posts(self, client, queries, max_items, min_date, max_date):
345        """
346        Gather messages for each entity for which messages are requested
347
348        :param TelegramClient client:  Telegram Client
349        :param list queries:  List of entities to query (as string)
350        :param int max_items:  Messages to scrape per entity
351        :param int min_date:  Datetime date to get posts after
352        :param int max_date:  Datetime date to get posts before
353        :return list:  List of messages, each message a dictionary.
354        """
355        resolve_refs = self.parameters.get("resolve-entities")
356
357        # Adding flag to stop; using for rate limits
358        no_additional_queries = False
359
360        # This is used for the 'crawl' feature so we know at which depth a
361        # given entity was discovered
362        depth_map = {
363            entity: 0 for entity in queries
364        }
365
366        crawl_max_depth = self.parameters.get("crawl-depth", 0)
367        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
368        crawl_via_links = self.parameters.get("crawl-via-links", False)
369
370        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
371        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
372
373        # this keeps track of how often an entity not in the original query
374        # has been mentioned. When crawling is enabled and this exceeds the
375        # given threshold, the entity is added to the query
376        crawl_references = {}
377        full_query = set(queries)
378        num_queries = len(queries)
379
380        # we may not always know the 'entity username' for an entity ID, so
381        # keep a reference map as we go
382        entity_id_map = {}
383
384        # Collect queries
385        # Use while instead of for so we can change queries during iteration
386        # this is needed for the 'crawl' feature which can discover new
387        # entities during crawl
388        processed = 0
389        total_messages = 0
390        while queries:
391            query = queries.pop(0)
392
393            delay = 10
394            retries = 0
395            processed += 1
396            self.dataset.update_progress(processed / num_queries)
397
398            if no_additional_queries:
399                # Note that we are not completing this query
400                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
401                continue
402
403            while True:
404                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
405                entity_posts = 0
406                discovered = 0
407                try:
408                    async for message in client.iter_messages(entity=query, offset_date=max_date):
409                        entity_posts += 1
410                        total_messages += 1
411                        if self.interrupted:
412                            raise ProcessorInterruptedException(
413                                "Interrupted while fetching message data from the Telegram API")
414
415                        if entity_posts % 100 == 0:
416                            self.dataset.update_status(
417                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
418
419                        if message.action is not None:
420                            # e.g. someone joins the channel - not an actual message
421                            continue
422
423                        # todo: possibly enrich object with e.g. the name of
424                        # the channel a message was forwarded from (but that
425                        # needs extra API requests...)
426                        serialized_message = SearchTelegram.serialize_obj(message)
427                        if "_chat" in serialized_message:
428                            # Add query ID to check if queries have been crawled previously
429                            full_query.add(serialized_message["_chat"]["id"])
430                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
431                                # once we know what a channel ID resolves to, use the username instead so it is easier to
432                                # understand for the user
433                                entity_id_map[query] = serialized_message["_chat"]["username"]
434                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
435
436                        if resolve_refs:
437                            serialized_message = await self.resolve_groups(client, serialized_message)
438
439                        # Stop if we're below the min date
440                        if min_date and serialized_message.get("date") < min_date:
441                            break
442
443                        # if crawling is enabled, see if we found something to add to the query
444                        linked_entities = set()
445                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
446                            message_fwd = serialized_message.get("fwd_from")
447                            fwd_from = None
448                            fwd_source_type = None
449                            if message_fwd and message_fwd.get("from_id"):
450                                if message_fwd["from_id"].get("_type") == "PeerChannel":
451                                    # Legacy(?) data structure (pre 2024/7/22)
452                                    # even if we haven't resolved the ID, we can feed the numeric ID
453                                    # to Telethon! this is nice because it means we don't have to
454                                    # resolve entities to crawl iteratively
455                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
456                                    fwd_source_type = "channel"
457                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
458                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
459                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
460                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
461                                    fwd_source_type = "channel"
462                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
463                                    # forwards can also come from users
464                                    # these can never be followed, so don't add these to the crawl, but do document them
465                                    fwd_source_type = "user"
466                                else:
467                                    print(json.dumps(message_fwd))
468                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
469                                    fwd_source_type = "unknown"
470
471                                if fwd_from:
472                                    linked_entities.add(fwd_from)
473
474
475                            if crawl_via_links:
476                                # t.me links
477                                all_links = ural.urls_from_text(serialized_message["message"])
478                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
479                                for link in all_links:
480                                    if link.startswith("+"):
481                                        # invite links
482                                        continue
483
484                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
485                                    linked_entities.add(entity_name)
486
487                                # @references
488                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
489                                for reference in references:
490                                    if reference.startswith("@"):
491                                        reference = reference[1:]
492
493                                    reference = reference.split("/")[0]
494
495                                    linked_entities.add(reference)
496
497                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
498                            for link in linked_entities:
499                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
500                                    # new entity discovered!
501                                    # might be discovered (before collection) multiple times, so retain lowest depth
502                                    # print(f"Potentially crawling {link}")
503                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
504                                    if link not in crawl_references:
505                                        crawl_references[link] = 0
506                                    crawl_references[link] += 1
507
508                                    # Add to queries if it has been referenced enough times
509                                    if crawl_references[link] >= crawl_msg_threshold:
510                                        queries.append(link)
511                                        full_query.add(link)
512                                        num_queries += 1
513                                        discovered += 1
514                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
515
516
517
518                        serialized_message["4CAT_metadata"] = {
519                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
520                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
521                            "query_depth": depth_map.get(query, 0)
522                        }
523                        yield serialized_message
524
525                        if entity_posts >= max_items:
526                            break
527
528                except ChannelPrivateError:
529                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
530                    self.flawless += 1
531
532                except (UsernameInvalidError,):
533                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
534                    self.flawless += 1
535
536                except FloodWaitError as e:
537                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
538                    if e.seconds < self.end_if_rate_limited:
539                        time.sleep(e.seconds)
540                        continue
541                    else:
542                        self.flawless += 1
543                        no_additional_queries = True
544                        self.dataset.update_status(
545                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
546                        break
547
548                except BadRequestError as e:
549                    self.dataset.update_status(
550                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
551                    self.flawless += 1
552
553                except ValueError as e:
554                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
555                    self.flawless += 1
556
557                except ChannelPrivateError as e:
558                    self.dataset.update_status(
559                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
560                    break
561
562                except TimeoutError:
563                    if retries < 3:
564                        self.dataset.update_status(
565                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
566                        self.flawless += 1
567                        break
568
569                    self.dataset.update_status(
570                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
571                    time.sleep(delay)
572                    delay *= 2
573                    continue
574
575                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
576                break

Gather messages for each entity for which messages are requested

Parameters
  • TelegramClient client: Telegram Client
  • list queries: List of entities to query (as string)
  • int max_items: Messages to scrape per entity
  • int min_date: Datetime date to get posts after
  • int max_date: Datetime date to get posts before
Returns

List of messages, each message a dictionary.

async def resolve_groups(self, client, message):
578    async def resolve_groups(self, client, message):
579        """
580        Recursively resolve references to groups and users
581
582        :param client:  Telethon client instance
583        :param dict message:  Message, as already mapped by serialize_obj
584        :return:  Resolved dictionary
585        """
586        resolved_message = message.copy()
587        for key, value in message.items():
588            try:
589                if type(value) is not dict:
590                    # if it's not a dict, we never have to resolve it, as it
591                    # does not represent an entity
592                    continue
593
594                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
595                    # forwarded from a channel!
596                    if value["channel_id"] in self.failures_cache:
597                        continue
598
599                    if value["channel_id"] not in self.details_cache:
600                        channel = await client(GetFullChannelRequest(value["channel_id"]))
601                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
602
603                    resolved_message[key] = self.details_cache[value["channel_id"]]
604                    resolved_message[key]["channel_id"] = value["channel_id"]
605
606                elif "_type" in value and value["_type"] == "PeerUser":
607                    # a user!
608                    if value["user_id"] in self.failures_cache:
609                        continue
610
611                    if value["user_id"] not in self.details_cache:
612                        user = await client(GetFullUserRequest(value["user_id"]))
613                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
614
615                    resolved_message[key] = self.details_cache[value["user_id"]]
616                    resolved_message[key]["user_id"] = value["user_id"]
617                else:
618                    resolved_message[key] = await self.resolve_groups(client, value)
619
620            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
621                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
622                if type(e) in (ChannelPrivateError, UsernameInvalidError):
623                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
624                else:
625                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
626
627        return resolved_message

Recursively resolve references to groups and users

Parameters
  • client: Telethon client instance
  • dict message: Message, as already mapped by serialize_obj
Returns

Resolved dictionary

@staticmethod
def cancel_start():
629    @staticmethod
630    def cancel_start():
631        """
632        Replace interactive phone number input in Telethon
633
634        By default, if Telethon cannot use the given session file to
635        authenticate, it will interactively prompt the user for a phone
636        number on the command line. That is not useful here, so instead
637        raise a RuntimeError. This will be caught below and the user will
638        be told they need to re-authenticate via 4CAT.
639        """
640        raise RuntimeError("Connection cancelled")

Replace interactive phone number input in Telethon

By default, if Telethon cannot use the given session file to authenticate, it will interactively prompt the user for a phone number on the command line. That is not useful here, so instead raise a RuntimeError. This will be caught below and the user will be told they need to re-authenticate via 4CAT.

@staticmethod
def map_item(message):
642    @staticmethod
643    def map_item(message):
644        """
645        Convert Message object to 4CAT-ready data object
646
647        :param Message message:  Message to parse
648        :return dict:  4CAT-compatible item object
649        """
650        if message["_chat"]["username"]:
651            # chats can apparently not have usernames???
652            # truly telegram objects are way too lenient for their own good
653            thread = message["_chat"]["username"]
654        elif message["_chat"]["title"]:
655            thread = re.sub(r"\s", "", message["_chat"]["title"])
656        else:
657            # just give up
658            thread = "unknown"
659
660        # determine username
661        # API responses only include the user *ID*, not the username, and to
662        # complicate things further not everyone is a user and not everyone
663        # has a username. If no username is available, try the first and
664        # last name someone has supplied
665        fullname = ""
666        username = ""
667        user_id = message["_sender"]["id"] if message.get("_sender") else ""
668        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
669
670        if message.get("_sender") and message["_sender"].get("username"):
671            username = message["_sender"]["username"]
672
673        if message.get("_sender") and message["_sender"].get("first_name"):
674            fullname += message["_sender"]["first_name"]
675
676        if message.get("_sender") and message["_sender"].get("last_name"):
677            fullname += " " + message["_sender"]["last_name"]
678
679        fullname = fullname.strip()
680
681        # determine media type
682        # these store some extra information of the attachment in
683        # attachment_data. Since the final result will be serialised as a csv
684        # file, we can only store text content. As such some media data is
685        # serialised as JSON.
686        attachment_type = SearchTelegram.get_media_type(message["media"])
687        attachment_filename = ""
688
689        if attachment_type == "contact":
690            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
691            if message["media"].get('contact', False):
692                # Old datastructure
693                attachment = message["media"]["contact"]
694            elif all([property in message["media"].keys() for property in contact_data]):
695                # New datastructure 2022/7/25
696                attachment = message["media"]
697            else:
698                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
699            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
700
701        elif attachment_type == "document":
702            # videos, etc
703            # This could add a separate routine for videos to make them a
704            # separate type, which could then be scraped later, etc
705            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
706            if attachment_type == "video":
707                attachment = message["media"]["document"]
708                attachment_data = json.dumps({
709                    "id": attachment["id"],
710                    "dc_id": attachment["dc_id"],
711                    "file_reference": attachment["file_reference"],
712                })
713            else:
714                attachment_data = ""
715
716        # elif attachment_type in ("geo", "geo_live"):
717        # untested whether geo_live is significantly different from geo
718        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
719
720        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
721            # we don't actually store any metadata about the photo, since very
722            # little of the metadata attached is of interest. Instead, the
723            # actual photos may be downloaded via a processor that is run on the
724            # search results
725            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
726            attachment_data = json.dumps({
727                "id": attachment["id"],
728                "dc_id": attachment["dc_id"],
729                "file_reference": attachment["file_reference"],
730            })
731            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
732
733        elif attachment_type == "poll":
734            # unfortunately poll results are only available when someone has
735            # actually voted on the poll - that will usually not be the case,
736            # so we store -1 as the vote count
737            attachment = message["media"]
738            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
739            attachment_data = json.dumps({
740                "question": attachment["poll"]["question"],
741                "voters": attachment["results"]["total_voters"],
742                "answers": [{
743                    "answer": options[answer["option"]],
744                    "votes": answer["voters"]
745                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
746                    "answer": options[option],
747                    "votes": -1
748                } for option in options]
749            })
750
751        else:
752            attachment_data = ""
753
754        # was the message forwarded from somewhere and if so when?
755        forwarded_timestamp = ""
756        forwarded_name = ""
757        forwarded_id = ""
758        forwarded_username = ""
759        if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int:
760            # forward information is spread out over a lot of places
761            # we can identify, in order of usefulness: username, full name,
762            # and ID. But not all of these are always available, and not
763            # always in the same place either
764            forwarded_timestamp = int(message["fwd_from"]["date"])
765            from_data = message["fwd_from"]["from_id"]
766
767            if from_data:
768                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
769
770            if message["fwd_from"].get("from_name"):
771                forwarded_name = message["fwd_from"].get("from_name")
772
773            if from_data and from_data.get("from_name"):
774                forwarded_name = message["fwd_from"]["from_name"]
775
776            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
777                from_data["user"] = from_data["users"][0]
778
779            if from_data and ("user" in from_data or "chats" in from_data):
780                # 'resolve entities' was enabled for this dataset
781                if "user" in from_data:
782                    if from_data["user"].get("username"):
783                        forwarded_username = from_data["user"]["username"]
784
785                    if from_data["user"].get("first_name"):
786                        forwarded_name = from_data["user"]["first_name"]
787                    if message["fwd_from"].get("last_name"):
788                        forwarded_name += "  " + from_data["user"]["last_name"]
789
790                    forwarded_name = forwarded_name.strip()
791
792                elif "chats" in from_data:
793                    channel_id = from_data.get("channel_id")
794                    for chat in from_data["chats"]:
795                        if chat["id"] == channel_id or channel_id is None:
796                            forwarded_username = chat["username"]
797
798            elif message.get("_forward") and message["_forward"].get("_chat"):
799                if message["_forward"]["_chat"].get("username"):
800                    forwarded_username = message["_forward"]["_chat"]["username"]
801
802                if message["_forward"]["_chat"].get("title"):
803                    forwarded_name = message["_forward"]["_chat"]["title"]
804
805        link_title = ""
806        link_attached = ""
807        link_description = ""
808        reactions = ""
809
810        if message.get("media") and message["media"].get("webpage"):
811            link_title = message["media"]["webpage"].get("title")
812            link_attached = message["media"]["webpage"].get("url")
813            link_description = message["media"]["webpage"].get("description")
814
815        if message.get("reactions") and message["reactions"].get("results"):
816            for reaction in message["reactions"]["results"]:
817                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
818                    # Updated to support new reaction datastructure
819                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
820                elif type(reaction["reaction"]) is str and "count" in reaction:
821                    reactions += reaction["reaction"] * reaction["count"]
822                else:
823                    # Failsafe; can be updated to support formatting of new datastructures in the future
824                    reactions += f"{reaction}, "
825
826        is_reply = False
827        reply_to = ""
828        if message.get("reply_to"):
829            is_reply = True
830            reply_to = message["reply_to"].get("reply_to_msg_id", "")
831
832        # t.me links
833        linked_entities = set()
834        all_links = ural.urls_from_text(message["message"])
835        all_links = [link.split("t.me/")[1] for link in all_links if
836                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
837
838        for link in all_links:
839            if link.startswith("+"):
840                # invite links
841                continue
842
843            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
844            linked_entities.add(entity_name)
845
846        # @references
847        # in execute_queries we use MessageEntityMention to get these
848        # however, after serializing these objects we only have the offsets of
849        # the mentioned username, and telegram does weird unicode things to its
850        # offsets meaning we can't just substring the message. So use a regex
851        # as a 'good enough' solution
852        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
853
854        # make this case-insensitive since people may use different casing in
855        # messages than the 'official' username for example
856        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
857        all_ci_connections = set()
858        seen = set()
859        for connection in all_connections:
860            if connection.lower() not in seen:
861                all_ci_connections.add(connection)
862                seen.add(connection.lower())
863
864        return MappedItem({
865            "id": f"{message['_chat']['username']}-{message['id']}",
866            "thread_id": thread,
867            "chat": message["_chat"]["username"],
868            "author": user_id,
869            "author_username": username,
870            "author_name": fullname,
871            "author_is_bot": "yes" if user_is_bot else "no",
872            "body": message["message"],
873            "body_markdown": message.get("message_markdown", MissingMappedField("")),
874            "is_reply": is_reply,
875            "reply_to": reply_to,
876            "views": message["views"] if message["views"] else "",
877            # "forwards": message.get("forwards", MissingMappedField(0)),
878            "reactions": reactions,
879            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
880            "unix_timestamp": int(message["date"]),
881            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
882                "edit_date"] else "",
883            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
884            "author_forwarded_from_name": forwarded_name,
885            "author_forwarded_from_username": forwarded_username,
886            "author_forwarded_from_id": forwarded_id,
887            "entities_linked": ",".join(linked_entities),
888            "entities_mentioned": ",".join(all_mentions),
889            "all_connections": ",".join(all_ci_connections),
890            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
891                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
892            "unix_timestamp_forwarded_from": forwarded_timestamp,
893            "link_title": link_title,
894            "link_description": link_description,
895            "link_attached": link_attached,
896            "attachment_type": attachment_type,
897            "attachment_data": attachment_data,
898            "attachment_filename": attachment_filename
899        })

Convert Message object to 4CAT-ready data object

Parameters
  • Message message: Message to parse
Returns

4CAT-compatible item object

@staticmethod
def get_media_type(media):
901    @staticmethod
902    def get_media_type(media):
903        """
904        Get media type for a Telegram attachment
905
906        :param media:  Media object
907        :return str:  Textual identifier of the media type
908        """
909        try:
910            return {
911                "NoneType": "",
912                "MessageMediaContact": "contact",
913                "MessageMediaDocument": "document",
914                "MessageMediaEmpty": "",
915                "MessageMediaGame": "game",
916                "MessageMediaGeo": "geo",
917                "MessageMediaGeoLive": "geo_live",
918                "MessageMediaInvoice": "invoice",
919                "MessageMediaPhoto": "photo",
920                "MessageMediaPoll": "poll",
921                "MessageMediaUnsupported": "unsupported",
922                "MessageMediaVenue": "venue",
923                "MessageMediaWebPage": "url"
924            }[media.get("_type", None)]
925        except (AttributeError, KeyError):
926            return ""

Get media type for a Telegram attachment

Parameters
  • media: Media object
Returns

Textual identifier of the media type

@staticmethod
def serialize_obj(input_obj):
928    @staticmethod
929    def serialize_obj(input_obj):
930        """
931        Serialize an object as a dictionary
932
933        Telethon message objects are not serializable by themselves, but most
934        relevant attributes are simply struct classes. This function replaces
935        those that are not with placeholders and then returns a dictionary that
936        can be serialized as JSON.
937
938        :param obj:  Object to serialize
939        :return:  Serialized object
940        """
941        scalars = (int, str, float, list, tuple, set, bool)
942
943        if type(input_obj) in scalars or input_obj is None:
944            return input_obj
945
946        if type(input_obj) is not dict:
947            obj = input_obj.__dict__
948        else:
949            obj = input_obj.copy()
950
951        mapped_obj = {}
952        for item, value in obj.items():
953            if type(value) is datetime:
954                mapped_obj[item] = value.timestamp()
955            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
956                mapped_obj[item] = SearchTelegram.serialize_obj(value)
957            elif type(value) is list:
958                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
959            elif type(value) is bytes:
960                mapped_obj[item] = value.hex()
961            elif type(value) not in scalars and value is not None:
962                # type we can't make sense of here
963                continue
964            else:
965                mapped_obj[item] = value
966
967        # Add the _type if the original object was a telethon type
968        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
969            mapped_obj["_type"] = type(input_obj).__name__
970
971        # Store the markdown-formatted text
972        if type(input_obj).__name__ == "Message":
973            mapped_obj["message_markdown"] = input_obj.text
974
975        return mapped_obj

Serialize an object as a dictionary

Telethon message objects are not serializable by themselves, but most relevant attributes are simply struct classes. This function replaces those that are not with placeholders and then returns a dictionary that can be serialized as JSON.

Parameters
  • obj: Object to serialize
Returns

Serialized object

@staticmethod
def validate_query(query, request, config):
 977    @staticmethod
 978    def validate_query(query, request, config):
 979        """
 980        Validate Telegram query
 981
 982        :param config:
 983        :param dict query:  Query parameters, from client-side.
 984        :param request:  Flask request
 985        :param User user:  User object of user who has submitted the query
 986        :param ConfigManager config:  Configuration reader (context-aware)
 987        :return dict:  Safe query parameters
 988        """
 989        # no query 4 u
 990        if not query.get("query", "").strip():
 991            raise QueryParametersException("You must provide a search query.")
 992
 993        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 994            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 995
 996        all_posts = config.get("telegram-search.can_query_all_messages", False)
 997        max_entities = config.get("telegram-search.max_entities", 25)
 998
 999        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options(
1000            config=config)["max_posts"]["max"])
1001
1002        # reformat queries to be a comma-separated list with no wrapping
1003        # whitespace
1004        whitespace = re.compile(r"\s+")
1005        items = whitespace.sub("", query.get("query").replace("\n", ","))
1006        if max_entities > 0 and len(items.split(",")) > max_entities:
1007            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
1008
1009        sanitized_items = []
1010        # handle telegram URLs
1011        for item in items.split(","):
1012            if not item.strip():
1013                continue
1014            item = re.sub(r"^https?://t\.me/", "", item)
1015            item = re.sub(r"^/?s/", "", item)
1016            item = re.sub(r"[/]*$", "", item)
1017            sanitized_items.append(item)
1018
1019        # the dates need to make sense as a range to search within
1020        min_date, max_date = query.get("daterange")
1021
1022        # now check if there is an active API session
1023        if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous:
1024            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1025                                           "accounts.")
1026
1027        # check for the information we need
1028        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1029                                                      query.get("api_hash"))
1030        config.user.set_value("telegram.session", session_id)
1031        session_path = config.get('PATH_SESSIONS').joinpath(session_id + ".session")
1032
1033        client = None
1034
1035        # API ID is always a number, if it's not, we can immediately fail
1036        try:
1037            api_id = int(query.get("api_id"))
1038        except ValueError:
1039            raise QueryParametersException("Invalid API ID.")
1040
1041        # maybe we've entered a code already and submitted it with the request
1042        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1043            code_callback = lambda: request.form.get("option-security-code")  # noqa: E731
1044            max_attempts = 1
1045        else:
1046            code_callback = lambda: -1  # noqa: E731
1047            # max_attempts = 0 because authing will always fail: we can't wait for
1048            # the code to be entered interactively, we'll need to do a new request
1049            # but we can't just immediately return, we still need to call start()
1050            # to get telegram to send us a code
1051            max_attempts = 0
1052
1053        # now try authenticating
1054        needs_code = False
1055        try:
1056            loop = asyncio.new_event_loop()
1057            asyncio.set_event_loop(loop)
1058            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1059
1060            try:
1061                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1062
1063            except ValueError as e:
1064                # this happens if 2FA is required
1065                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1066                                               f"does not support this authentication mode for Telegram. ({e})")
1067            except RuntimeError:
1068                # A code was sent to the given phone number
1069                needs_code = True
1070        except FloodWaitError as e:
1071            # uh oh, we got rate-limited
1072            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1073                                           str(e).split("(")[0] + ".")
1074        except ApiIdInvalidError:
1075            # wrong credentials
1076            raise QueryParametersException("Your API credentials are invalid.")
1077        except PhoneNumberInvalidError:
1078            # wrong phone number
1079            raise QueryParametersException(
1080                "The phone number provided is not a valid phone number for these credentials.")
1081        except RPCError as e:
1082            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1083            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1084                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1085        except Exception as e:
1086            # ?
1087            raise QueryParametersException(
1088                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1089        finally:
1090            if client:
1091                client.disconnect()
1092
1093        if needs_code:
1094            raise QueryNeedsFurtherInputException(config={
1095                "code-info": {
1096                    "type": UserInput.OPTION_INFO,
1097                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1098                },
1099                "security-code": {
1100                    "type": UserInput.OPTION_TEXT,
1101                    "help": "Security code",
1102                    "sensitive": True
1103                }})
1104
1105        # simple!
1106        return {
1107            "items": num_items,
1108            "query": ",".join(sanitized_items),
1109            "api_id": query.get("api_id"),
1110            "api_hash": query.get("api_hash"),
1111            "api_phone": query.get("api_phone"),
1112            "save-session": query.get("save-session"),
1113            "resolve-entities": query.get("resolve-entities"),
1114            "min_date": min_date,
1115            "max_date": max_date,
1116            "crawl-depth": query.get("crawl-depth"),
1117            "crawl-threshold": query.get("crawl-threshold"),
1118            "crawl-via-links": query.get("crawl-via-links")
1119        }

Validate Telegram query

Parameters
  • config:
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
  • ConfigManager config: Configuration reader (context-aware)
Returns

Safe query parameters

@staticmethod
def create_session_id(api_phone, api_id, api_hash):
1121    @staticmethod
1122    def create_session_id(api_phone, api_id, api_hash):
1123        """
1124        Generate a filename for the session file
1125
1126        This is a combination of phone number and API credentials, but hashed
1127        so that one cannot actually derive someone's phone number from it.
1128
1129        :param str api_phone:  Phone number for API ID
1130        :param int api_id:  Telegram API ID
1131        :param str api_hash:  Telegram API Hash
1132        :return str: A hash value derived from the input
1133        """
1134        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1135        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Generate a filename for the session file

This is a combination of phone number and API credentials, but hashed so that one cannot actually derive someone's phone number from it.

Parameters
  • str api_phone: Phone number for API ID
  • int api_id: Telegram API ID
  • str api_hash: Telegram API Hash
Returns

A hash value derived from the input