Edit on GitHub

datasources.telegram.search_telegram

Search Telegram via API

   1"""
   2Search Telegram via API
   3"""
   4import traceback
   5import hashlib
   6import asyncio
   7import json
   8import ural
   9import time
  10import re
  11
  12from pathlib import Path
  13
  14from backend.lib.search import Search
  15from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException, \
  16    QueryNeedsFurtherInputException
  17from common.lib.helpers import convert_to_int, UserInput
  18from common.lib.item_mapping import MappedItem, MissingMappedField
  19
  20from datetime import datetime
  21from telethon import TelegramClient
  22from telethon.errors.rpcerrorlist import UsernameInvalidError, TimeoutError, ChannelPrivateError, BadRequestError, \
  23    FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError
  24from telethon.tl.functions.channels import GetFullChannelRequest
  25from telethon.tl.functions.users import GetFullUserRequest
  26from telethon.tl.types import MessageEntityMention
  27
  28
  29
  30class SearchTelegram(Search):
  31    """
  32    Search Telegram via API
  33    """
  34    type = "telegram-search"  # job ID
  35    category = "Search"  # category
  36    title = "Telegram API search"  # title displayed in UI
  37    description = "Scrapes messages from open Telegram groups via its API."  # description displayed in UI
  38    extension = "ndjson"  # extension of result file, used internally and in UI
  39    is_local = False  # Whether this datasource is locally scraped
  40    is_static = False  # Whether this datasource is still updated
  41
  42    # cache
  43    details_cache = None
  44    failures_cache = None
  45    eventloop = None
  46    import_issues = 0
  47    end_if_rate_limited = 600  # break if Telegram requires wait time above number of seconds
  48
  49    max_workers = 1
  50    max_retries = 3
  51    flawless = 0
  52
  53    config = {
  54        "telegram-search.can_query_all_messages": {
  55            "type": UserInput.OPTION_TOGGLE,
  56            "help": "Remove message amount limit",
  57            "default": False,
  58            "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!"
  59        },
  60        "telegram-search.max_entities": {
  61            "type": UserInput.OPTION_TEXT,
  62            "help": "Max entities to query",
  63            "coerce_type": int,
  64            "min": 0,
  65            "default": 25,
  66            "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to "
  67                       "disable limit."
  68        },
  69        "telegram-search.max_crawl_depth": {
  70            "type": UserInput.OPTION_TEXT,
  71            "help": "Max crawl depth",
  72            "coerce_type": int,
  73            "min": 0,
  74            "default": 0,
  75            "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded "
  76                       "messages. Recommended to leave at 0 for most users since this can exponentially increase "
  77                       "dataset sizes."
  78        }
  79    }
  80
  81    @classmethod
  82    def get_options(cls, parent_dataset=None, config=None):
  83        """
  84        Get processor options
  85
  86        Just updates the description of the entities field based on the
  87        configured max entities.
  88
  89        :param DataSet parent_dataset:  An object representing the dataset that
  90          the processor would be run on
  91        :param ConfigManager|None config:  Configuration reader (context-aware)
  92        """
  93
  94        max_entities = config.get("telegram-search.max_entities", 25)
  95        options = {
  96            "intro": {
  97                "type": UserInput.OPTION_INFO,
  98                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
  99                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
 100                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
 101                        "authentication for Telegram."
 102            },
 103            "api_id": {
 104                "type": UserInput.OPTION_TEXT,
 105                "help": "API ID",
 106                "cache": True,
 107            },
 108            "api_hash": {
 109                "type": UserInput.OPTION_TEXT,
 110                "help": "API Hash",
 111                "cache": True,
 112            },
 113            "api_phone": {
 114                "type": UserInput.OPTION_TEXT,
 115                "help": "Phone number",
 116                "cache": True,
 117                "default": "+xxxxxxxxxx"
 118            },
 119            "divider": {
 120                "type": UserInput.OPTION_DIVIDER
 121            },
 122            "query-intro": {
 123                "type": UserInput.OPTION_INFO,
 124                "help": "Separate with commas or line breaks."
 125            },
 126            "query": {
 127                "type": UserInput.OPTION_TEXT_LARGE,
 128                "help": "Entities to scrape",
 129                "tooltip": "Separate with commas or line breaks."
 130            },
 131            "max_posts": {
 132                "type": UserInput.OPTION_TEXT,
 133                "help": "Messages per group",
 134                "min": 1,
 135                "max": 50000,
 136                "default": 10
 137            },
 138            "daterange": {
 139                "type": UserInput.OPTION_DATERANGE,
 140                "help": "Date range"
 141            },
 142            "divider-2": {
 143                "type": UserInput.OPTION_DIVIDER
 144            },
 145            "info-sensitive": {
 146                "type": UserInput.OPTION_INFO,
 147                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
 148                        "there while data is fetched. After the dataset has been created your credentials will be "
 149                        "deleted from the server, unless you enable the option below. If you want to download images "
 150                        "attached to the messages in your collected data, you need to enable this option. Your "
 151                        "credentials will never be visible to other users and can be erased later via the result page."
 152            },
 153            "save-session": {
 154                "type": UserInput.OPTION_TOGGLE,
 155                "help": "Save session:",
 156                "default": False
 157            },
 158            "resolve-entities-intro": {
 159                "type": UserInput.OPTION_INFO,
 160                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
 161                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
 162                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
 163                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
 164                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
 165                        "data, so only enable this if you really need it!"
 166            },
 167            "resolve-entities": {
 168                "type": UserInput.OPTION_TOGGLE,
 169                "help": "Resolve references",
 170                "default": False,
 171            }
 172        }
 173
 174        if max_entities:
 175            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
 176                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
 177
 178        all_messages = config.get("telegram-search.can_query_all_messages", False)
 179        if all_messages:
 180            if "max" in options["max_posts"]:
 181                del options["max_posts"]["max"]
 182        else:
 183            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
 184                                             f"{options['max_posts']['max']:,} messages per entity.")
 185
 186        if config.get("telegram-search.max_crawl_depth", 0) > 0:
 187            options["crawl_intro"] = {
 188                "type": UserInput.OPTION_INFO,
 189                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
 190                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
 191                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
 192                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
 193                        "progress cannot be accurately tracked when you use this feature."
 194            }
 195            options["crawl-depth"] = {
 196                "type": UserInput.OPTION_TEXT,
 197                "coerce_type": int,
 198                "min": 0,
 199                "max": config.get("telegram-search.max_crawl_depth"),
 200                "default": 0,
 201                "help": "Crawl depth",
 202                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
 203                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
 204                           "starting entities."
 205            }
 206            options["crawl-threshold"] = {
 207                "type": UserInput.OPTION_TEXT,
 208                "coerce_type": int,
 209                "min": 0,
 210                "default": 5,
 211                "help": "Crawl threshold",
 212                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
 213                           "references discovered below the max crawl depth are taken into account."
 214            }
 215            options["crawl-via-links"] = {
 216                "type": UserInput.OPTION_TOGGLE,
 217                "default": False,
 218                "help": "Extract new groups from links",
 219                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
 220                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
 221                           "links that would otherwise remain undetected."
 222            }
 223
 224        return options
 225
 226
 227    def get_items(self, query):
 228        """
 229        Execute a query; get messages for given parameters
 230
 231        Basically a wrapper around execute_queries() to call it with asyncio.
 232
 233        :param dict query:  Query parameters, as part of the DataSet object
 234        :return list:  Posts, sorted by thread and post ID, in ascending order
 235        """
 236        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
 237            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
 238                                       "creating it again from scratch.", is_final=True)
 239            return None
 240
 241        self.details_cache = {}
 242        self.failures_cache = set()
 243        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
 244        results = asyncio.run(self.execute_queries())
 245
 246        if not query.get("save-session"):
 247            self.dataset.delete_parameter("api_hash", instant=True)
 248            self.dataset.delete_parameter("api_phone", instant=True)
 249            self.dataset.delete_parameter("api_id", instant=True)
 250
 251        if self.flawless:
 252            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
 253                                       "been private). View the log file for details.", is_final=True)
 254
 255        return results
 256
 257    async def execute_queries(self):
 258        """
 259        Get messages for queries
 260
 261        This is basically what would be done in get_items(), except due to
 262        Telethon's architecture this needs to be called in an async method,
 263        which is this one.
 264
 265        :return list:  Collected messages
 266        """
 267        # session file has been created earlier, and we can re-use it here in
 268        # order to avoid having to re-enter the security code
 269        query = self.parameters
 270
 271        session_id = SearchTelegram.create_session_id(query["api_phone"].strip(),
 272                                                      query["api_id"].strip(),
 273                                                      query["api_hash"].strip())
 274        self.dataset.log(f'Telegram session id: {session_id}')
 275        session_path = Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), session_id + ".session")
 276
 277        client = None
 278
 279        try:
 280            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
 281                                    loop=self.eventloop)
 282            await client.start(phone=SearchTelegram.cancel_start)
 283        except RuntimeError:
 284            # session is no longer useable, delete file so user will be asked
 285            # for security code again. The RuntimeError is raised by
 286            # `cancel_start()`
 287            self.dataset.update_status(
 288                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
 289                is_final=True)
 290
 291            if client and hasattr(client, "disconnect"):
 292                await client.disconnect()
 293
 294            if session_path.exists():
 295                session_path.unlink()
 296
 297            return []
 298        except Exception as e:
 299            # not sure what exception specifically is triggered here, but it
 300            # always means the connection failed
 301            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
 302            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
 303            if client and hasattr(client, "disconnect"):
 304                await client.disconnect()
 305            return []
 306
 307        # ready our parameters
 308        parameters = self.dataset.get_parameters()
 309        queries = [query.strip() for query in parameters.get("query", "").split(",")]
 310        max_items = convert_to_int(parameters.get("items", 10), 10)
 311
 312        # Telethon requires the offset date to be a datetime date
 313        max_date = parameters.get("max_date")
 314        if max_date:
 315            try:
 316                max_date = datetime.fromtimestamp(int(max_date))
 317            except ValueError:
 318                max_date = None
 319
 320        # min_date can remain an integer
 321        min_date = parameters.get("min_date")
 322        if min_date:
 323            try:
 324                min_date = int(min_date)
 325            except ValueError:
 326                min_date = None
 327
 328        posts = []
 329        try:
 330            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
 331                posts.append(post)
 332            return posts
 333        except ProcessorInterruptedException as e:
 334            raise e
 335        except Exception:
 336            # catch-all so we can disconnect properly
 337            # ...should we?
 338            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
 339            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
 340            # May as well return what was captured, yes?
 341            return posts
 342        finally:
 343            await client.disconnect()
 344
 345    async def gather_posts(self, client, queries, max_items, min_date, max_date):
 346        """
 347        Gather messages for each entity for which messages are requested
 348
 349        :param TelegramClient client:  Telegram Client
 350        :param list queries:  List of entities to query (as string)
 351        :param int max_items:  Messages to scrape per entity
 352        :param int min_date:  Datetime date to get posts after
 353        :param int max_date:  Datetime date to get posts before
 354        :return list:  List of messages, each message a dictionary.
 355        """
 356        resolve_refs = self.parameters.get("resolve-entities")
 357
 358        # Adding flag to stop; using for rate limits
 359        no_additional_queries = False
 360
 361        # This is used for the 'crawl' feature so we know at which depth a
 362        # given entity was discovered
 363        depth_map = {
 364            entity: 0 for entity in queries
 365        }
 366
 367        crawl_max_depth = self.parameters.get("crawl-depth", 0)
 368        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
 369        crawl_via_links = self.parameters.get("crawl-via-links", False)
 370
 371        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
 372        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
 373
 374        # this keeps track of how often an entity not in the original query
 375        # has been mentioned. When crawling is enabled and this exceeds the
 376        # given threshold, the entity is added to the query
 377        crawl_references = {}
 378        full_query = set(queries)
 379        num_queries = len(queries)
 380
 381        # we may not always know the 'entity username' for an entity ID, so
 382        # keep a reference map as we go
 383        entity_id_map = {}
 384
 385        # Collect queries
 386        # Use while instead of for so we can change queries during iteration
 387        # this is needed for the 'crawl' feature which can discover new
 388        # entities during crawl
 389        processed = 0
 390        total_messages = 0
 391        while queries:
 392            query = queries.pop(0)
 393
 394            delay = 10
 395            retries = 0
 396            processed += 1
 397            self.dataset.update_progress(processed / num_queries)
 398
 399            if no_additional_queries:
 400                # Note that we are not completing this query
 401                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
 402                continue
 403
 404            while True:
 405                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
 406                entity_posts = 0
 407                discovered = 0
 408                try:
 409                    async for message in client.iter_messages(entity=query, offset_date=max_date):
 410                        entity_posts += 1
 411                        total_messages += 1
 412                        if self.interrupted:
 413                            raise ProcessorInterruptedException(
 414                                "Interrupted while fetching message data from the Telegram API")
 415
 416                        if entity_posts % 100 == 0:
 417                            self.dataset.update_status(
 418                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
 419
 420                        if message.action is not None:
 421                            # e.g. someone joins the channel - not an actual message
 422                            continue
 423
 424                        # todo: possibly enrich object with e.g. the name of
 425                        # the channel a message was forwarded from (but that
 426                        # needs extra API requests...)
 427                        serialized_message = SearchTelegram.serialize_obj(message)
 428                        if "_chat" in serialized_message:
 429                            # Add query ID to check if queries have been crawled previously
 430                            full_query.add(serialized_message["_chat"]["id"])
 431                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
 432                                # once we know what a channel ID resolves to, use the username instead so it is easier to
 433                                # understand for the user
 434                                entity_id_map[query] = serialized_message["_chat"]["username"]
 435                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
 436
 437                        if resolve_refs:
 438                            serialized_message = await self.resolve_groups(client, serialized_message)
 439
 440                        # Stop if we're below the min date
 441                        if min_date and serialized_message.get("date") < min_date:
 442                            break
 443
 444                        # if crawling is enabled, see if we found something to add to the query
 445                        linked_entities = set()
 446                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
 447                            message_fwd = serialized_message.get("fwd_from")
 448                            fwd_from = None
 449                            fwd_source_type = None
 450                            if message_fwd and message_fwd.get("from_id"):
 451                                if message_fwd["from_id"].get("_type") == "PeerChannel":
 452                                    # Legacy(?) data structure (pre 2024/7/22)
 453                                    # even if we haven't resolved the ID, we can feed the numeric ID
 454                                    # to Telethon! this is nice because it means we don't have to
 455                                    # resolve entities to crawl iteratively
 456                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
 457                                    fwd_source_type = "channel"
 458                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
 459                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
 460                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
 461                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
 462                                    fwd_source_type = "channel"
 463                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
 464                                    # forwards can also come from users
 465                                    # these can never be followed, so don't add these to the crawl, but do document them
 466                                    fwd_source_type = "user"
 467                                else:
 468                                    print(json.dumps(message_fwd))
 469                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
 470                                    fwd_source_type = "unknown"
 471
 472                                if fwd_from:
 473                                    linked_entities.add(fwd_from)
 474
 475
 476                            if crawl_via_links:
 477                                # t.me links
 478                                all_links = ural.urls_from_text(serialized_message["message"])
 479                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 480                                for link in all_links:
 481                                    if link.startswith("+"):
 482                                        # invite links
 483                                        continue
 484
 485                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 486                                    linked_entities.add(entity_name)
 487
 488                                # @references
 489                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
 490                                for reference in references:
 491                                    if reference.startswith("@"):
 492                                        reference = reference[1:]
 493
 494                                    reference = reference.split("/")[0]
 495
 496                                    linked_entities.add(reference)
 497
 498                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
 499                            for link in linked_entities:
 500                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
 501                                    # new entity discovered!
 502                                    # might be discovered (before collection) multiple times, so retain lowest depth
 503                                    # print(f"Potentially crawling {link}")
 504                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
 505                                    if link not in crawl_references:
 506                                        crawl_references[link] = 0
 507                                    crawl_references[link] += 1
 508
 509                                    # Add to queries if it has been referenced enough times
 510                                    if crawl_references[link] >= crawl_msg_threshold:
 511                                        queries.append(link)
 512                                        full_query.add(link)
 513                                        num_queries += 1
 514                                        discovered += 1
 515                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
 516
 517
 518
 519                        serialized_message["4CAT_metadata"] = {
 520                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
 521                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
 522                            "query_depth": depth_map.get(query, 0)
 523                        }
 524                        yield serialized_message
 525
 526                        if entity_posts >= max_items:
 527                            break
 528
 529                except ChannelPrivateError:
 530                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
 531                    self.flawless += 1
 532
 533                except (UsernameInvalidError,):
 534                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
 535                    self.flawless += 1
 536
 537                except FloodWaitError as e:
 538                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
 539                    if e.seconds < self.end_if_rate_limited:
 540                        time.sleep(e.seconds)
 541                        continue
 542                    else:
 543                        self.flawless += 1
 544                        no_additional_queries = True
 545                        self.dataset.update_status(
 546                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
 547                        break
 548
 549                except BadRequestError as e:
 550                    self.dataset.update_status(
 551                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 552                    self.flawless += 1
 553
 554                except ValueError as e:
 555                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 556                    self.flawless += 1
 557
 558                except ChannelPrivateError as e:
 559                    self.dataset.update_status(
 560                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
 561                    break
 562
 563                except TimeoutError:
 564                    if retries < 3:
 565                        self.dataset.update_status(
 566                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
 567                        self.flawless += 1
 568                        break
 569
 570                    self.dataset.update_status(
 571                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
 572                    time.sleep(delay)
 573                    delay *= 2
 574                    continue
 575
 576                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
 577                break
 578
 579    async def resolve_groups(self, client, message):
 580        """
 581        Recursively resolve references to groups and users
 582
 583        :param client:  Telethon client instance
 584        :param dict message:  Message, as already mapped by serialize_obj
 585        :return:  Resolved dictionary
 586        """
 587        resolved_message = message.copy()
 588        for key, value in message.items():
 589            try:
 590                if type(value) is not dict:
 591                    # if it's not a dict, we never have to resolve it, as it
 592                    # does not represent an entity
 593                    continue
 594
 595                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
 596                    # forwarded from a channel!
 597                    if value["channel_id"] in self.failures_cache:
 598                        continue
 599
 600                    if value["channel_id"] not in self.details_cache:
 601                        channel = await client(GetFullChannelRequest(value["channel_id"]))
 602                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
 603
 604                    resolved_message[key] = self.details_cache[value["channel_id"]]
 605                    resolved_message[key]["channel_id"] = value["channel_id"]
 606
 607                elif "_type" in value and value["_type"] == "PeerUser":
 608                    # a user!
 609                    if value["user_id"] in self.failures_cache:
 610                        continue
 611
 612                    if value["user_id"] not in self.details_cache:
 613                        user = await client(GetFullUserRequest(value["user_id"]))
 614                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
 615
 616                    resolved_message[key] = self.details_cache[value["user_id"]]
 617                    resolved_message[key]["user_id"] = value["user_id"]
 618                else:
 619                    resolved_message[key] = await self.resolve_groups(client, value)
 620
 621            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
 622                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
 623                if type(e) in (ChannelPrivateError, UsernameInvalidError):
 624                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
 625                else:
 626                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
 627
 628        return resolved_message
 629
 630    @staticmethod
 631    def cancel_start():
 632        """
 633        Replace interactive phone number input in Telethon
 634
 635        By default, if Telethon cannot use the given session file to
 636        authenticate, it will interactively prompt the user for a phone
 637        number on the command line. That is not useful here, so instead
 638        raise a RuntimeError. This will be caught below and the user will
 639        be told they need to re-authenticate via 4CAT.
 640        """
 641        raise RuntimeError("Connection cancelled")
 642
 643    @staticmethod
 644    def map_item(message):
 645        """
 646        Convert Message object to 4CAT-ready data object
 647
 648        :param Message message:  Message to parse
 649        :return dict:  4CAT-compatible item object
 650        """
 651        if message["_chat"]["username"]:
 652            # chats can apparently not have usernames???
 653            # truly telegram objects are way too lenient for their own good
 654            thread = message["_chat"]["username"]
 655        elif message["_chat"]["title"]:
 656            thread = re.sub(r"\s", "", message["_chat"]["title"])
 657        else:
 658            # just give up
 659            thread = "unknown"
 660
 661        # determine username
 662        # API responses only include the user *ID*, not the username, and to
 663        # complicate things further not everyone is a user and not everyone
 664        # has a username. If no username is available, try the first and
 665        # last name someone has supplied
 666        fullname = ""
 667        username = ""
 668        user_id = message["_sender"]["id"] if message.get("_sender") else ""
 669        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
 670
 671        if message.get("_sender") and message["_sender"].get("username"):
 672            username = message["_sender"]["username"]
 673
 674        if message.get("_sender") and message["_sender"].get("first_name"):
 675            fullname += message["_sender"]["first_name"]
 676
 677        if message.get("_sender") and message["_sender"].get("last_name"):
 678            fullname += " " + message["_sender"]["last_name"]
 679
 680        fullname = fullname.strip()
 681
 682        # determine media type
 683        # these store some extra information of the attachment in
 684        # attachment_data. Since the final result will be serialised as a csv
 685        # file, we can only store text content. As such some media data is
 686        # serialised as JSON.
 687        attachment_type = SearchTelegram.get_media_type(message["media"])
 688        attachment_filename = ""
 689
 690        if attachment_type == "contact":
 691            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
 692            if message["media"].get('contact', False):
 693                # Old datastructure
 694                attachment = message["media"]["contact"]
 695            elif all([property in message["media"].keys() for property in contact_data]):
 696                # New datastructure 2022/7/25
 697                attachment = message["media"]
 698            else:
 699                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
 700            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
 701
 702        elif attachment_type == "document":
 703            # videos, etc
 704            # This could add a separate routine for videos to make them a
 705            # separate type, which could then be scraped later, etc
 706            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
 707            if attachment_type == "video":
 708                attachment = message["media"]["document"]
 709                attachment_data = json.dumps({
 710                    "id": attachment["id"],
 711                    "dc_id": attachment["dc_id"],
 712                    "file_reference": attachment["file_reference"],
 713                })
 714            else:
 715                attachment_data = ""
 716
 717        # elif attachment_type in ("geo", "geo_live"):
 718        # untested whether geo_live is significantly different from geo
 719        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
 720
 721        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
 722            # we don't actually store any metadata about the photo, since very
 723            # little of the metadata attached is of interest. Instead, the
 724            # actual photos may be downloaded via a processor that is run on the
 725            # search results
 726            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
 727            attachment_data = json.dumps({
 728                "id": attachment["id"],
 729                "dc_id": attachment["dc_id"],
 730                "file_reference": attachment["file_reference"],
 731            })
 732            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
 733
 734        elif attachment_type == "poll":
 735            # unfortunately poll results are only available when someone has
 736            # actually voted on the poll - that will usually not be the case,
 737            # so we store -1 as the vote count
 738            attachment = message["media"]
 739            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
 740            attachment_data = json.dumps({
 741                "question": attachment["poll"]["question"],
 742                "voters": attachment["results"]["total_voters"],
 743                "answers": [{
 744                    "answer": options[answer["option"]],
 745                    "votes": answer["voters"]
 746                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
 747                    "answer": options[option],
 748                    "votes": -1
 749                } for option in options]
 750            })
 751
 752        else:
 753            attachment_data = ""
 754
 755        # was the message forwarded from somewhere and if so when?
 756        forwarded_timestamp = ""
 757        forwarded_name = ""
 758        forwarded_id = ""
 759        forwarded_username = ""
 760        if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int:
 761            # forward information is spread out over a lot of places
 762            # we can identify, in order of usefulness: username, full name,
 763            # and ID. But not all of these are always available, and not
 764            # always in the same place either
 765            forwarded_timestamp = int(message["fwd_from"]["date"])
 766            from_data = message["fwd_from"]["from_id"]
 767
 768            if from_data:
 769                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
 770
 771            if message["fwd_from"].get("from_name"):
 772                forwarded_name = message["fwd_from"].get("from_name")
 773
 774            if from_data and from_data.get("from_name"):
 775                forwarded_name = message["fwd_from"]["from_name"]
 776
 777            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
 778                from_data["user"] = from_data["users"][0]
 779
 780            if from_data and ("user" in from_data or "chats" in from_data):
 781                # 'resolve entities' was enabled for this dataset
 782                if "user" in from_data:
 783                    if from_data["user"].get("username"):
 784                        forwarded_username = from_data["user"]["username"]
 785
 786                    if from_data["user"].get("first_name"):
 787                        forwarded_name = from_data["user"]["first_name"]
 788                    if message["fwd_from"].get("last_name"):
 789                        forwarded_name += "  " + from_data["user"]["last_name"]
 790
 791                    forwarded_name = forwarded_name.strip()
 792
 793                elif "chats" in from_data:
 794                    channel_id = from_data.get("channel_id")
 795                    for chat in from_data["chats"]:
 796                        if chat["id"] == channel_id or channel_id is None:
 797                            forwarded_username = chat["username"]
 798
 799            elif message.get("_forward") and message["_forward"].get("_chat"):
 800                if message["_forward"]["_chat"].get("username"):
 801                    forwarded_username = message["_forward"]["_chat"]["username"]
 802
 803                if message["_forward"]["_chat"].get("title"):
 804                    forwarded_name = message["_forward"]["_chat"]["title"]
 805
 806        link_title = ""
 807        link_attached = ""
 808        link_description = ""
 809        reactions = ""
 810
 811        if message.get("media") and message["media"].get("webpage"):
 812            link_title = message["media"]["webpage"].get("title")
 813            link_attached = message["media"]["webpage"].get("url")
 814            link_description = message["media"]["webpage"].get("description")
 815
 816        if message.get("reactions") and message["reactions"].get("results"):
 817            for reaction in message["reactions"]["results"]:
 818                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
 819                    # Updated to support new reaction datastructure
 820                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
 821                elif type(reaction["reaction"]) is str and "count" in reaction:
 822                    reactions += reaction["reaction"] * reaction["count"]
 823                else:
 824                    # Failsafe; can be updated to support formatting of new datastructures in the future
 825                    reactions += f"{reaction}, "
 826
 827        is_reply = False
 828        reply_to = ""
 829        if message.get("reply_to"):
 830            is_reply = True
 831            reply_to = message["reply_to"].get("reply_to_msg_id", "")
 832
 833        # t.me links
 834        linked_entities = set()
 835        all_links = ural.urls_from_text(message["message"])
 836        all_links = [link.split("t.me/")[1] for link in all_links if
 837                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 838
 839        for link in all_links:
 840            if link.startswith("+"):
 841                # invite links
 842                continue
 843
 844            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 845            linked_entities.add(entity_name)
 846
 847        # @references
 848        # in execute_queries we use MessageEntityMention to get these
 849        # however, after serializing these objects we only have the offsets of
 850        # the mentioned username, and telegram does weird unicode things to its
 851        # offsets meaning we can't just substring the message. So use a regex
 852        # as a 'good enough' solution
 853        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
 854
 855        # make this case-insensitive since people may use different casing in
 856        # messages than the 'official' username for example
 857        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
 858        all_ci_connections = set()
 859        seen = set()
 860        for connection in all_connections:
 861            if connection.lower() not in seen:
 862                all_ci_connections.add(connection)
 863                seen.add(connection.lower())
 864
 865        return MappedItem({
 866            "id": f"{message['_chat']['username']}-{message['id']}",
 867            "thread_id": thread,
 868            "chat": message["_chat"]["username"],
 869            "author": user_id,
 870            "author_username": username,
 871            "author_name": fullname,
 872            "author_is_bot": "yes" if user_is_bot else "no",
 873            "body": message["message"],
 874            "body_markdown": message.get("message_markdown", MissingMappedField("")),
 875            "is_reply": is_reply,
 876            "reply_to": reply_to,
 877            "views": message["views"] if message["views"] else "",
 878            # "forwards": message.get("forwards", MissingMappedField(0)),
 879            "reactions": reactions,
 880            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
 881            "unix_timestamp": int(message["date"]),
 882            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
 883                "edit_date"] else "",
 884            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
 885            "author_forwarded_from_name": forwarded_name,
 886            "author_forwarded_from_username": forwarded_username,
 887            "author_forwarded_from_id": forwarded_id,
 888            "entities_linked": ",".join(linked_entities),
 889            "entities_mentioned": ",".join(all_mentions),
 890            "all_connections": ",".join(all_ci_connections),
 891            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
 892                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
 893            "unix_timestamp_forwarded_from": forwarded_timestamp,
 894            "link_title": link_title,
 895            "link_description": link_description,
 896            "link_attached": link_attached,
 897            "attachment_type": attachment_type,
 898            "attachment_data": attachment_data,
 899            "attachment_filename": attachment_filename
 900        })
 901
 902    @staticmethod
 903    def get_media_type(media):
 904        """
 905        Get media type for a Telegram attachment
 906
 907        :param media:  Media object
 908        :return str:  Textual identifier of the media type
 909        """
 910        try:
 911            return {
 912                "NoneType": "",
 913                "MessageMediaContact": "contact",
 914                "MessageMediaDocument": "document",
 915                "MessageMediaEmpty": "",
 916                "MessageMediaGame": "game",
 917                "MessageMediaGeo": "geo",
 918                "MessageMediaGeoLive": "geo_live",
 919                "MessageMediaInvoice": "invoice",
 920                "MessageMediaPhoto": "photo",
 921                "MessageMediaPoll": "poll",
 922                "MessageMediaUnsupported": "unsupported",
 923                "MessageMediaVenue": "venue",
 924                "MessageMediaWebPage": "url"
 925            }[media.get("_type", None)]
 926        except (AttributeError, KeyError):
 927            return ""
 928
 929    @staticmethod
 930    def serialize_obj(input_obj):
 931        """
 932        Serialize an object as a dictionary
 933
 934        Telethon message objects are not serializable by themselves, but most
 935        relevant attributes are simply struct classes. This function replaces
 936        those that are not with placeholders and then returns a dictionary that
 937        can be serialized as JSON.
 938
 939        :param obj:  Object to serialize
 940        :return:  Serialized object
 941        """
 942        scalars = (int, str, float, list, tuple, set, bool)
 943
 944        if type(input_obj) in scalars or input_obj is None:
 945            return input_obj
 946
 947        if type(input_obj) is not dict:
 948            obj = input_obj.__dict__
 949        else:
 950            obj = input_obj.copy()
 951
 952        mapped_obj = {}
 953        for item, value in obj.items():
 954            if type(value) is datetime:
 955                mapped_obj[item] = value.timestamp()
 956            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 957                mapped_obj[item] = SearchTelegram.serialize_obj(value)
 958            elif type(value) is list:
 959                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
 960            elif type(value) is bytes:
 961                mapped_obj[item] = value.hex()
 962            elif type(value) not in scalars and value is not None:
 963                # type we can't make sense of here
 964                continue
 965            else:
 966                mapped_obj[item] = value
 967
 968        # Add the _type if the original object was a telethon type
 969        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 970            mapped_obj["_type"] = type(input_obj).__name__
 971
 972        # Store the markdown-formatted text
 973        if type(input_obj).__name__ == "Message":
 974            mapped_obj["message_markdown"] = input_obj.text
 975
 976        return mapped_obj
 977
 978    @staticmethod
 979    def validate_query(query, request, config):
 980        """
 981        Validate Telegram query
 982
 983        :param config:
 984        :param dict query:  Query parameters, from client-side.
 985        :param request:  Flask request
 986        :param User user:  User object of user who has submitted the query
 987        :param ConfigManager config:  Configuration reader (context-aware)
 988        :return dict:  Safe query parameters
 989        """
 990        # no query 4 u
 991        if not query.get("query", "").strip():
 992            raise QueryParametersException("You must provide a search query.")
 993
 994        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 995            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 996
 997        all_posts = config.get("telegram-search.can_query_all_messages", False)
 998        max_entities = config.get("telegram-search.max_entities", 25)
 999
1000        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options(
1001            config=config)["max_posts"]["max"])
1002
1003        # reformat queries to be a comma-separated list with no wrapping
1004        # whitespace
1005        whitespace = re.compile(r"\s+")
1006        items = whitespace.sub("", query.get("query").replace("\n", ","))
1007        if max_entities > 0 and len(items.split(",")) > max_entities:
1008            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
1009
1010        sanitized_items = []
1011        # handle telegram URLs
1012        for item in items.split(","):
1013            if not item.strip():
1014                continue
1015            item = re.sub(r"^https?://t\.me/", "", item)
1016            item = re.sub(r"^/?s/", "", item)
1017            item = re.sub(r"[/]*$", "", item)
1018            sanitized_items.append(item)
1019
1020        # the dates need to make sense as a range to search within
1021        min_date, max_date = query.get("daterange")
1022
1023        # now check if there is an active API session
1024        if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous:
1025            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1026                                           "accounts.")
1027
1028        # check for the information we need
1029        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1030                                                      query.get("api_hash"))
1031        config.user.set_value("telegram.session", session_id)
1032        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session")
1033
1034        client = None
1035
1036        # API ID is always a number, if it's not, we can immediately fail
1037        try:
1038            api_id = int(query.get("api_id"))
1039        except ValueError:
1040            raise QueryParametersException("Invalid API ID.")
1041
1042        # maybe we've entered a code already and submitted it with the request
1043        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1044            code_callback = lambda: request.form.get("option-security-code")  # noqa: E731
1045            max_attempts = 1
1046        else:
1047            code_callback = lambda: -1  # noqa: E731
1048            # max_attempts = 0 because authing will always fail: we can't wait for
1049            # the code to be entered interactively, we'll need to do a new request
1050            # but we can't just immediately return, we still need to call start()
1051            # to get telegram to send us a code
1052            max_attempts = 0
1053
1054        # now try authenticating
1055        needs_code = False
1056        try:
1057            loop = asyncio.new_event_loop()
1058            asyncio.set_event_loop(loop)
1059            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1060
1061            try:
1062                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1063
1064            except ValueError as e:
1065                # this happens if 2FA is required
1066                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1067                                               f"does not support this authentication mode for Telegram. ({e})")
1068            except RuntimeError:
1069                # A code was sent to the given phone number
1070                needs_code = True
1071        except FloodWaitError as e:
1072            # uh oh, we got rate-limited
1073            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1074                                           str(e).split("(")[0] + ".")
1075        except ApiIdInvalidError:
1076            # wrong credentials
1077            raise QueryParametersException("Your API credentials are invalid.")
1078        except PhoneNumberInvalidError:
1079            # wrong phone number
1080            raise QueryParametersException(
1081                "The phone number provided is not a valid phone number for these credentials.")
1082        except RPCError as e:
1083            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1084            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1085                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1086        except Exception as e:
1087            # ?
1088            raise QueryParametersException(
1089                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1090        finally:
1091            if client:
1092                client.disconnect()
1093
1094        if needs_code:
1095            raise QueryNeedsFurtherInputException(config={
1096                "code-info": {
1097                    "type": UserInput.OPTION_INFO,
1098                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1099                },
1100                "security-code": {
1101                    "type": UserInput.OPTION_TEXT,
1102                    "help": "Security code",
1103                    "sensitive": True
1104                }})
1105
1106        # simple!
1107        return {
1108            "items": num_items,
1109            "query": ",".join(sanitized_items),
1110            "api_id": query.get("api_id"),
1111            "api_hash": query.get("api_hash"),
1112            "api_phone": query.get("api_phone"),
1113            "save-session": query.get("save-session"),
1114            "resolve-entities": query.get("resolve-entities"),
1115            "min_date": min_date,
1116            "max_date": max_date,
1117            "crawl-depth": query.get("crawl-depth"),
1118            "crawl-threshold": query.get("crawl-threshold"),
1119            "crawl-via-links": query.get("crawl-via-links")
1120        }
1121
1122    @staticmethod
1123    def create_session_id(api_phone, api_id, api_hash):
1124        """
1125        Generate a filename for the session file
1126
1127        This is a combination of phone number and API credentials, but hashed
1128        so that one cannot actually derive someone's phone number from it.
1129
1130        :param str api_phone:  Phone number for API ID
1131        :param int api_id:  Telegram API ID
1132        :param str api_hash:  Telegram API Hash
1133        :return str: A hash value derived from the input
1134        """
1135        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1136        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
class SearchTelegram(backend.lib.search.Search):
  31class SearchTelegram(Search):
  32    """
  33    Search Telegram via API
  34    """
  35    type = "telegram-search"  # job ID
  36    category = "Search"  # category
  37    title = "Telegram API search"  # title displayed in UI
  38    description = "Scrapes messages from open Telegram groups via its API."  # description displayed in UI
  39    extension = "ndjson"  # extension of result file, used internally and in UI
  40    is_local = False  # Whether this datasource is locally scraped
  41    is_static = False  # Whether this datasource is still updated
  42
  43    # cache
  44    details_cache = None
  45    failures_cache = None
  46    eventloop = None
  47    import_issues = 0
  48    end_if_rate_limited = 600  # break if Telegram requires wait time above number of seconds
  49
  50    max_workers = 1
  51    max_retries = 3
  52    flawless = 0
  53
  54    config = {
  55        "telegram-search.can_query_all_messages": {
  56            "type": UserInput.OPTION_TOGGLE,
  57            "help": "Remove message amount limit",
  58            "default": False,
  59            "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!"
  60        },
  61        "telegram-search.max_entities": {
  62            "type": UserInput.OPTION_TEXT,
  63            "help": "Max entities to query",
  64            "coerce_type": int,
  65            "min": 0,
  66            "default": 25,
  67            "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to "
  68                       "disable limit."
  69        },
  70        "telegram-search.max_crawl_depth": {
  71            "type": UserInput.OPTION_TEXT,
  72            "help": "Max crawl depth",
  73            "coerce_type": int,
  74            "min": 0,
  75            "default": 0,
  76            "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded "
  77                       "messages. Recommended to leave at 0 for most users since this can exponentially increase "
  78                       "dataset sizes."
  79        }
  80    }
  81
  82    @classmethod
  83    def get_options(cls, parent_dataset=None, config=None):
  84        """
  85        Get processor options
  86
  87        Just updates the description of the entities field based on the
  88        configured max entities.
  89
  90        :param DataSet parent_dataset:  An object representing the dataset that
  91          the processor would be run on
  92        :param ConfigManager|None config:  Configuration reader (context-aware)
  93        """
  94
  95        max_entities = config.get("telegram-search.max_entities", 25)
  96        options = {
  97            "intro": {
  98                "type": UserInput.OPTION_INFO,
  99                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
 100                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
 101                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
 102                        "authentication for Telegram."
 103            },
 104            "api_id": {
 105                "type": UserInput.OPTION_TEXT,
 106                "help": "API ID",
 107                "cache": True,
 108            },
 109            "api_hash": {
 110                "type": UserInput.OPTION_TEXT,
 111                "help": "API Hash",
 112                "cache": True,
 113            },
 114            "api_phone": {
 115                "type": UserInput.OPTION_TEXT,
 116                "help": "Phone number",
 117                "cache": True,
 118                "default": "+xxxxxxxxxx"
 119            },
 120            "divider": {
 121                "type": UserInput.OPTION_DIVIDER
 122            },
 123            "query-intro": {
 124                "type": UserInput.OPTION_INFO,
 125                "help": "Separate with commas or line breaks."
 126            },
 127            "query": {
 128                "type": UserInput.OPTION_TEXT_LARGE,
 129                "help": "Entities to scrape",
 130                "tooltip": "Separate with commas or line breaks."
 131            },
 132            "max_posts": {
 133                "type": UserInput.OPTION_TEXT,
 134                "help": "Messages per group",
 135                "min": 1,
 136                "max": 50000,
 137                "default": 10
 138            },
 139            "daterange": {
 140                "type": UserInput.OPTION_DATERANGE,
 141                "help": "Date range"
 142            },
 143            "divider-2": {
 144                "type": UserInput.OPTION_DIVIDER
 145            },
 146            "info-sensitive": {
 147                "type": UserInput.OPTION_INFO,
 148                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
 149                        "there while data is fetched. After the dataset has been created your credentials will be "
 150                        "deleted from the server, unless you enable the option below. If you want to download images "
 151                        "attached to the messages in your collected data, you need to enable this option. Your "
 152                        "credentials will never be visible to other users and can be erased later via the result page."
 153            },
 154            "save-session": {
 155                "type": UserInput.OPTION_TOGGLE,
 156                "help": "Save session:",
 157                "default": False
 158            },
 159            "resolve-entities-intro": {
 160                "type": UserInput.OPTION_INFO,
 161                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
 162                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
 163                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
 164                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
 165                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
 166                        "data, so only enable this if you really need it!"
 167            },
 168            "resolve-entities": {
 169                "type": UserInput.OPTION_TOGGLE,
 170                "help": "Resolve references",
 171                "default": False,
 172            }
 173        }
 174
 175        if max_entities:
 176            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
 177                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
 178
 179        all_messages = config.get("telegram-search.can_query_all_messages", False)
 180        if all_messages:
 181            if "max" in options["max_posts"]:
 182                del options["max_posts"]["max"]
 183        else:
 184            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
 185                                             f"{options['max_posts']['max']:,} messages per entity.")
 186
 187        if config.get("telegram-search.max_crawl_depth", 0) > 0:
 188            options["crawl_intro"] = {
 189                "type": UserInput.OPTION_INFO,
 190                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
 191                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
 192                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
 193                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
 194                        "progress cannot be accurately tracked when you use this feature."
 195            }
 196            options["crawl-depth"] = {
 197                "type": UserInput.OPTION_TEXT,
 198                "coerce_type": int,
 199                "min": 0,
 200                "max": config.get("telegram-search.max_crawl_depth"),
 201                "default": 0,
 202                "help": "Crawl depth",
 203                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
 204                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
 205                           "starting entities."
 206            }
 207            options["crawl-threshold"] = {
 208                "type": UserInput.OPTION_TEXT,
 209                "coerce_type": int,
 210                "min": 0,
 211                "default": 5,
 212                "help": "Crawl threshold",
 213                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
 214                           "references discovered below the max crawl depth are taken into account."
 215            }
 216            options["crawl-via-links"] = {
 217                "type": UserInput.OPTION_TOGGLE,
 218                "default": False,
 219                "help": "Extract new groups from links",
 220                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
 221                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
 222                           "links that would otherwise remain undetected."
 223            }
 224
 225        return options
 226
 227
 228    def get_items(self, query):
 229        """
 230        Execute a query; get messages for given parameters
 231
 232        Basically a wrapper around execute_queries() to call it with asyncio.
 233
 234        :param dict query:  Query parameters, as part of the DataSet object
 235        :return list:  Posts, sorted by thread and post ID, in ascending order
 236        """
 237        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
 238            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
 239                                       "creating it again from scratch.", is_final=True)
 240            return None
 241
 242        self.details_cache = {}
 243        self.failures_cache = set()
 244        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
 245        results = asyncio.run(self.execute_queries())
 246
 247        if not query.get("save-session"):
 248            self.dataset.delete_parameter("api_hash", instant=True)
 249            self.dataset.delete_parameter("api_phone", instant=True)
 250            self.dataset.delete_parameter("api_id", instant=True)
 251
 252        if self.flawless:
 253            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
 254                                       "been private). View the log file for details.", is_final=True)
 255
 256        return results
 257
 258    async def execute_queries(self):
 259        """
 260        Get messages for queries
 261
 262        This is basically what would be done in get_items(), except due to
 263        Telethon's architecture this needs to be called in an async method,
 264        which is this one.
 265
 266        :return list:  Collected messages
 267        """
 268        # session file has been created earlier, and we can re-use it here in
 269        # order to avoid having to re-enter the security code
 270        query = self.parameters
 271
 272        session_id = SearchTelegram.create_session_id(query["api_phone"].strip(),
 273                                                      query["api_id"].strip(),
 274                                                      query["api_hash"].strip())
 275        self.dataset.log(f'Telegram session id: {session_id}')
 276        session_path = Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), session_id + ".session")
 277
 278        client = None
 279
 280        try:
 281            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
 282                                    loop=self.eventloop)
 283            await client.start(phone=SearchTelegram.cancel_start)
 284        except RuntimeError:
 285            # session is no longer useable, delete file so user will be asked
 286            # for security code again. The RuntimeError is raised by
 287            # `cancel_start()`
 288            self.dataset.update_status(
 289                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
 290                is_final=True)
 291
 292            if client and hasattr(client, "disconnect"):
 293                await client.disconnect()
 294
 295            if session_path.exists():
 296                session_path.unlink()
 297
 298            return []
 299        except Exception as e:
 300            # not sure what exception specifically is triggered here, but it
 301            # always means the connection failed
 302            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
 303            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
 304            if client and hasattr(client, "disconnect"):
 305                await client.disconnect()
 306            return []
 307
 308        # ready our parameters
 309        parameters = self.dataset.get_parameters()
 310        queries = [query.strip() for query in parameters.get("query", "").split(",")]
 311        max_items = convert_to_int(parameters.get("items", 10), 10)
 312
 313        # Telethon requires the offset date to be a datetime date
 314        max_date = parameters.get("max_date")
 315        if max_date:
 316            try:
 317                max_date = datetime.fromtimestamp(int(max_date))
 318            except ValueError:
 319                max_date = None
 320
 321        # min_date can remain an integer
 322        min_date = parameters.get("min_date")
 323        if min_date:
 324            try:
 325                min_date = int(min_date)
 326            except ValueError:
 327                min_date = None
 328
 329        posts = []
 330        try:
 331            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
 332                posts.append(post)
 333            return posts
 334        except ProcessorInterruptedException as e:
 335            raise e
 336        except Exception:
 337            # catch-all so we can disconnect properly
 338            # ...should we?
 339            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
 340            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
 341            # May as well return what was captured, yes?
 342            return posts
 343        finally:
 344            await client.disconnect()
 345
 346    async def gather_posts(self, client, queries, max_items, min_date, max_date):
 347        """
 348        Gather messages for each entity for which messages are requested
 349
 350        :param TelegramClient client:  Telegram Client
 351        :param list queries:  List of entities to query (as string)
 352        :param int max_items:  Messages to scrape per entity
 353        :param int min_date:  Datetime date to get posts after
 354        :param int max_date:  Datetime date to get posts before
 355        :return list:  List of messages, each message a dictionary.
 356        """
 357        resolve_refs = self.parameters.get("resolve-entities")
 358
 359        # Adding flag to stop; using for rate limits
 360        no_additional_queries = False
 361
 362        # This is used for the 'crawl' feature so we know at which depth a
 363        # given entity was discovered
 364        depth_map = {
 365            entity: 0 for entity in queries
 366        }
 367
 368        crawl_max_depth = self.parameters.get("crawl-depth", 0)
 369        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
 370        crawl_via_links = self.parameters.get("crawl-via-links", False)
 371
 372        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
 373        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
 374
 375        # this keeps track of how often an entity not in the original query
 376        # has been mentioned. When crawling is enabled and this exceeds the
 377        # given threshold, the entity is added to the query
 378        crawl_references = {}
 379        full_query = set(queries)
 380        num_queries = len(queries)
 381
 382        # we may not always know the 'entity username' for an entity ID, so
 383        # keep a reference map as we go
 384        entity_id_map = {}
 385
 386        # Collect queries
 387        # Use while instead of for so we can change queries during iteration
 388        # this is needed for the 'crawl' feature which can discover new
 389        # entities during crawl
 390        processed = 0
 391        total_messages = 0
 392        while queries:
 393            query = queries.pop(0)
 394
 395            delay = 10
 396            retries = 0
 397            processed += 1
 398            self.dataset.update_progress(processed / num_queries)
 399
 400            if no_additional_queries:
 401                # Note that we are not completing this query
 402                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
 403                continue
 404
 405            while True:
 406                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
 407                entity_posts = 0
 408                discovered = 0
 409                try:
 410                    async for message in client.iter_messages(entity=query, offset_date=max_date):
 411                        entity_posts += 1
 412                        total_messages += 1
 413                        if self.interrupted:
 414                            raise ProcessorInterruptedException(
 415                                "Interrupted while fetching message data from the Telegram API")
 416
 417                        if entity_posts % 100 == 0:
 418                            self.dataset.update_status(
 419                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
 420
 421                        if message.action is not None:
 422                            # e.g. someone joins the channel - not an actual message
 423                            continue
 424
 425                        # todo: possibly enrich object with e.g. the name of
 426                        # the channel a message was forwarded from (but that
 427                        # needs extra API requests...)
 428                        serialized_message = SearchTelegram.serialize_obj(message)
 429                        if "_chat" in serialized_message:
 430                            # Add query ID to check if queries have been crawled previously
 431                            full_query.add(serialized_message["_chat"]["id"])
 432                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
 433                                # once we know what a channel ID resolves to, use the username instead so it is easier to
 434                                # understand for the user
 435                                entity_id_map[query] = serialized_message["_chat"]["username"]
 436                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
 437
 438                        if resolve_refs:
 439                            serialized_message = await self.resolve_groups(client, serialized_message)
 440
 441                        # Stop if we're below the min date
 442                        if min_date and serialized_message.get("date") < min_date:
 443                            break
 444
 445                        # if crawling is enabled, see if we found something to add to the query
 446                        linked_entities = set()
 447                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
 448                            message_fwd = serialized_message.get("fwd_from")
 449                            fwd_from = None
 450                            fwd_source_type = None
 451                            if message_fwd and message_fwd.get("from_id"):
 452                                if message_fwd["from_id"].get("_type") == "PeerChannel":
 453                                    # Legacy(?) data structure (pre 2024/7/22)
 454                                    # even if we haven't resolved the ID, we can feed the numeric ID
 455                                    # to Telethon! this is nice because it means we don't have to
 456                                    # resolve entities to crawl iteratively
 457                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
 458                                    fwd_source_type = "channel"
 459                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
 460                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
 461                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
 462                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
 463                                    fwd_source_type = "channel"
 464                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
 465                                    # forwards can also come from users
 466                                    # these can never be followed, so don't add these to the crawl, but do document them
 467                                    fwd_source_type = "user"
 468                                else:
 469                                    print(json.dumps(message_fwd))
 470                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
 471                                    fwd_source_type = "unknown"
 472
 473                                if fwd_from:
 474                                    linked_entities.add(fwd_from)
 475
 476
 477                            if crawl_via_links:
 478                                # t.me links
 479                                all_links = ural.urls_from_text(serialized_message["message"])
 480                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 481                                for link in all_links:
 482                                    if link.startswith("+"):
 483                                        # invite links
 484                                        continue
 485
 486                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 487                                    linked_entities.add(entity_name)
 488
 489                                # @references
 490                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
 491                                for reference in references:
 492                                    if reference.startswith("@"):
 493                                        reference = reference[1:]
 494
 495                                    reference = reference.split("/")[0]
 496
 497                                    linked_entities.add(reference)
 498
 499                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
 500                            for link in linked_entities:
 501                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
 502                                    # new entity discovered!
 503                                    # might be discovered (before collection) multiple times, so retain lowest depth
 504                                    # print(f"Potentially crawling {link}")
 505                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
 506                                    if link not in crawl_references:
 507                                        crawl_references[link] = 0
 508                                    crawl_references[link] += 1
 509
 510                                    # Add to queries if it has been referenced enough times
 511                                    if crawl_references[link] >= crawl_msg_threshold:
 512                                        queries.append(link)
 513                                        full_query.add(link)
 514                                        num_queries += 1
 515                                        discovered += 1
 516                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
 517
 518
 519
 520                        serialized_message["4CAT_metadata"] = {
 521                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
 522                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
 523                            "query_depth": depth_map.get(query, 0)
 524                        }
 525                        yield serialized_message
 526
 527                        if entity_posts >= max_items:
 528                            break
 529
 530                except ChannelPrivateError:
 531                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
 532                    self.flawless += 1
 533
 534                except (UsernameInvalidError,):
 535                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
 536                    self.flawless += 1
 537
 538                except FloodWaitError as e:
 539                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
 540                    if e.seconds < self.end_if_rate_limited:
 541                        time.sleep(e.seconds)
 542                        continue
 543                    else:
 544                        self.flawless += 1
 545                        no_additional_queries = True
 546                        self.dataset.update_status(
 547                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
 548                        break
 549
 550                except BadRequestError as e:
 551                    self.dataset.update_status(
 552                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 553                    self.flawless += 1
 554
 555                except ValueError as e:
 556                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
 557                    self.flawless += 1
 558
 559                except ChannelPrivateError as e:
 560                    self.dataset.update_status(
 561                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
 562                    break
 563
 564                except TimeoutError:
 565                    if retries < 3:
 566                        self.dataset.update_status(
 567                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
 568                        self.flawless += 1
 569                        break
 570
 571                    self.dataset.update_status(
 572                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
 573                    time.sleep(delay)
 574                    delay *= 2
 575                    continue
 576
 577                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
 578                break
 579
 580    async def resolve_groups(self, client, message):
 581        """
 582        Recursively resolve references to groups and users
 583
 584        :param client:  Telethon client instance
 585        :param dict message:  Message, as already mapped by serialize_obj
 586        :return:  Resolved dictionary
 587        """
 588        resolved_message = message.copy()
 589        for key, value in message.items():
 590            try:
 591                if type(value) is not dict:
 592                    # if it's not a dict, we never have to resolve it, as it
 593                    # does not represent an entity
 594                    continue
 595
 596                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
 597                    # forwarded from a channel!
 598                    if value["channel_id"] in self.failures_cache:
 599                        continue
 600
 601                    if value["channel_id"] not in self.details_cache:
 602                        channel = await client(GetFullChannelRequest(value["channel_id"]))
 603                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
 604
 605                    resolved_message[key] = self.details_cache[value["channel_id"]]
 606                    resolved_message[key]["channel_id"] = value["channel_id"]
 607
 608                elif "_type" in value and value["_type"] == "PeerUser":
 609                    # a user!
 610                    if value["user_id"] in self.failures_cache:
 611                        continue
 612
 613                    if value["user_id"] not in self.details_cache:
 614                        user = await client(GetFullUserRequest(value["user_id"]))
 615                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
 616
 617                    resolved_message[key] = self.details_cache[value["user_id"]]
 618                    resolved_message[key]["user_id"] = value["user_id"]
 619                else:
 620                    resolved_message[key] = await self.resolve_groups(client, value)
 621
 622            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
 623                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
 624                if type(e) in (ChannelPrivateError, UsernameInvalidError):
 625                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
 626                else:
 627                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
 628
 629        return resolved_message
 630
 631    @staticmethod
 632    def cancel_start():
 633        """
 634        Replace interactive phone number input in Telethon
 635
 636        By default, if Telethon cannot use the given session file to
 637        authenticate, it will interactively prompt the user for a phone
 638        number on the command line. That is not useful here, so instead
 639        raise a RuntimeError. This will be caught below and the user will
 640        be told they need to re-authenticate via 4CAT.
 641        """
 642        raise RuntimeError("Connection cancelled")
 643
 644    @staticmethod
 645    def map_item(message):
 646        """
 647        Convert Message object to 4CAT-ready data object
 648
 649        :param Message message:  Message to parse
 650        :return dict:  4CAT-compatible item object
 651        """
 652        if message["_chat"]["username"]:
 653            # chats can apparently not have usernames???
 654            # truly telegram objects are way too lenient for their own good
 655            thread = message["_chat"]["username"]
 656        elif message["_chat"]["title"]:
 657            thread = re.sub(r"\s", "", message["_chat"]["title"])
 658        else:
 659            # just give up
 660            thread = "unknown"
 661
 662        # determine username
 663        # API responses only include the user *ID*, not the username, and to
 664        # complicate things further not everyone is a user and not everyone
 665        # has a username. If no username is available, try the first and
 666        # last name someone has supplied
 667        fullname = ""
 668        username = ""
 669        user_id = message["_sender"]["id"] if message.get("_sender") else ""
 670        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
 671
 672        if message.get("_sender") and message["_sender"].get("username"):
 673            username = message["_sender"]["username"]
 674
 675        if message.get("_sender") and message["_sender"].get("first_name"):
 676            fullname += message["_sender"]["first_name"]
 677
 678        if message.get("_sender") and message["_sender"].get("last_name"):
 679            fullname += " " + message["_sender"]["last_name"]
 680
 681        fullname = fullname.strip()
 682
 683        # determine media type
 684        # these store some extra information of the attachment in
 685        # attachment_data. Since the final result will be serialised as a csv
 686        # file, we can only store text content. As such some media data is
 687        # serialised as JSON.
 688        attachment_type = SearchTelegram.get_media_type(message["media"])
 689        attachment_filename = ""
 690
 691        if attachment_type == "contact":
 692            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
 693            if message["media"].get('contact', False):
 694                # Old datastructure
 695                attachment = message["media"]["contact"]
 696            elif all([property in message["media"].keys() for property in contact_data]):
 697                # New datastructure 2022/7/25
 698                attachment = message["media"]
 699            else:
 700                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
 701            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
 702
 703        elif attachment_type == "document":
 704            # videos, etc
 705            # This could add a separate routine for videos to make them a
 706            # separate type, which could then be scraped later, etc
 707            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
 708            if attachment_type == "video":
 709                attachment = message["media"]["document"]
 710                attachment_data = json.dumps({
 711                    "id": attachment["id"],
 712                    "dc_id": attachment["dc_id"],
 713                    "file_reference": attachment["file_reference"],
 714                })
 715            else:
 716                attachment_data = ""
 717
 718        # elif attachment_type in ("geo", "geo_live"):
 719        # untested whether geo_live is significantly different from geo
 720        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
 721
 722        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
 723            # we don't actually store any metadata about the photo, since very
 724            # little of the metadata attached is of interest. Instead, the
 725            # actual photos may be downloaded via a processor that is run on the
 726            # search results
 727            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
 728            attachment_data = json.dumps({
 729                "id": attachment["id"],
 730                "dc_id": attachment["dc_id"],
 731                "file_reference": attachment["file_reference"],
 732            })
 733            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
 734
 735        elif attachment_type == "poll":
 736            # unfortunately poll results are only available when someone has
 737            # actually voted on the poll - that will usually not be the case,
 738            # so we store -1 as the vote count
 739            attachment = message["media"]
 740            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
 741            attachment_data = json.dumps({
 742                "question": attachment["poll"]["question"],
 743                "voters": attachment["results"]["total_voters"],
 744                "answers": [{
 745                    "answer": options[answer["option"]],
 746                    "votes": answer["voters"]
 747                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
 748                    "answer": options[option],
 749                    "votes": -1
 750                } for option in options]
 751            })
 752
 753        else:
 754            attachment_data = ""
 755
 756        # was the message forwarded from somewhere and if so when?
 757        forwarded_timestamp = ""
 758        forwarded_name = ""
 759        forwarded_id = ""
 760        forwarded_username = ""
 761        if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int:
 762            # forward information is spread out over a lot of places
 763            # we can identify, in order of usefulness: username, full name,
 764            # and ID. But not all of these are always available, and not
 765            # always in the same place either
 766            forwarded_timestamp = int(message["fwd_from"]["date"])
 767            from_data = message["fwd_from"]["from_id"]
 768
 769            if from_data:
 770                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
 771
 772            if message["fwd_from"].get("from_name"):
 773                forwarded_name = message["fwd_from"].get("from_name")
 774
 775            if from_data and from_data.get("from_name"):
 776                forwarded_name = message["fwd_from"]["from_name"]
 777
 778            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
 779                from_data["user"] = from_data["users"][0]
 780
 781            if from_data and ("user" in from_data or "chats" in from_data):
 782                # 'resolve entities' was enabled for this dataset
 783                if "user" in from_data:
 784                    if from_data["user"].get("username"):
 785                        forwarded_username = from_data["user"]["username"]
 786
 787                    if from_data["user"].get("first_name"):
 788                        forwarded_name = from_data["user"]["first_name"]
 789                    if message["fwd_from"].get("last_name"):
 790                        forwarded_name += "  " + from_data["user"]["last_name"]
 791
 792                    forwarded_name = forwarded_name.strip()
 793
 794                elif "chats" in from_data:
 795                    channel_id = from_data.get("channel_id")
 796                    for chat in from_data["chats"]:
 797                        if chat["id"] == channel_id or channel_id is None:
 798                            forwarded_username = chat["username"]
 799
 800            elif message.get("_forward") and message["_forward"].get("_chat"):
 801                if message["_forward"]["_chat"].get("username"):
 802                    forwarded_username = message["_forward"]["_chat"]["username"]
 803
 804                if message["_forward"]["_chat"].get("title"):
 805                    forwarded_name = message["_forward"]["_chat"]["title"]
 806
 807        link_title = ""
 808        link_attached = ""
 809        link_description = ""
 810        reactions = ""
 811
 812        if message.get("media") and message["media"].get("webpage"):
 813            link_title = message["media"]["webpage"].get("title")
 814            link_attached = message["media"]["webpage"].get("url")
 815            link_description = message["media"]["webpage"].get("description")
 816
 817        if message.get("reactions") and message["reactions"].get("results"):
 818            for reaction in message["reactions"]["results"]:
 819                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
 820                    # Updated to support new reaction datastructure
 821                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
 822                elif type(reaction["reaction"]) is str and "count" in reaction:
 823                    reactions += reaction["reaction"] * reaction["count"]
 824                else:
 825                    # Failsafe; can be updated to support formatting of new datastructures in the future
 826                    reactions += f"{reaction}, "
 827
 828        is_reply = False
 829        reply_to = ""
 830        if message.get("reply_to"):
 831            is_reply = True
 832            reply_to = message["reply_to"].get("reply_to_msg_id", "")
 833
 834        # t.me links
 835        linked_entities = set()
 836        all_links = ural.urls_from_text(message["message"])
 837        all_links = [link.split("t.me/")[1] for link in all_links if
 838                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
 839
 840        for link in all_links:
 841            if link.startswith("+"):
 842                # invite links
 843                continue
 844
 845            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
 846            linked_entities.add(entity_name)
 847
 848        # @references
 849        # in execute_queries we use MessageEntityMention to get these
 850        # however, after serializing these objects we only have the offsets of
 851        # the mentioned username, and telegram does weird unicode things to its
 852        # offsets meaning we can't just substring the message. So use a regex
 853        # as a 'good enough' solution
 854        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
 855
 856        # make this case-insensitive since people may use different casing in
 857        # messages than the 'official' username for example
 858        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
 859        all_ci_connections = set()
 860        seen = set()
 861        for connection in all_connections:
 862            if connection.lower() not in seen:
 863                all_ci_connections.add(connection)
 864                seen.add(connection.lower())
 865
 866        return MappedItem({
 867            "id": f"{message['_chat']['username']}-{message['id']}",
 868            "thread_id": thread,
 869            "chat": message["_chat"]["username"],
 870            "author": user_id,
 871            "author_username": username,
 872            "author_name": fullname,
 873            "author_is_bot": "yes" if user_is_bot else "no",
 874            "body": message["message"],
 875            "body_markdown": message.get("message_markdown", MissingMappedField("")),
 876            "is_reply": is_reply,
 877            "reply_to": reply_to,
 878            "views": message["views"] if message["views"] else "",
 879            # "forwards": message.get("forwards", MissingMappedField(0)),
 880            "reactions": reactions,
 881            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
 882            "unix_timestamp": int(message["date"]),
 883            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
 884                "edit_date"] else "",
 885            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
 886            "author_forwarded_from_name": forwarded_name,
 887            "author_forwarded_from_username": forwarded_username,
 888            "author_forwarded_from_id": forwarded_id,
 889            "entities_linked": ",".join(linked_entities),
 890            "entities_mentioned": ",".join(all_mentions),
 891            "all_connections": ",".join(all_ci_connections),
 892            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
 893                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
 894            "unix_timestamp_forwarded_from": forwarded_timestamp,
 895            "link_title": link_title,
 896            "link_description": link_description,
 897            "link_attached": link_attached,
 898            "attachment_type": attachment_type,
 899            "attachment_data": attachment_data,
 900            "attachment_filename": attachment_filename
 901        })
 902
 903    @staticmethod
 904    def get_media_type(media):
 905        """
 906        Get media type for a Telegram attachment
 907
 908        :param media:  Media object
 909        :return str:  Textual identifier of the media type
 910        """
 911        try:
 912            return {
 913                "NoneType": "",
 914                "MessageMediaContact": "contact",
 915                "MessageMediaDocument": "document",
 916                "MessageMediaEmpty": "",
 917                "MessageMediaGame": "game",
 918                "MessageMediaGeo": "geo",
 919                "MessageMediaGeoLive": "geo_live",
 920                "MessageMediaInvoice": "invoice",
 921                "MessageMediaPhoto": "photo",
 922                "MessageMediaPoll": "poll",
 923                "MessageMediaUnsupported": "unsupported",
 924                "MessageMediaVenue": "venue",
 925                "MessageMediaWebPage": "url"
 926            }[media.get("_type", None)]
 927        except (AttributeError, KeyError):
 928            return ""
 929
 930    @staticmethod
 931    def serialize_obj(input_obj):
 932        """
 933        Serialize an object as a dictionary
 934
 935        Telethon message objects are not serializable by themselves, but most
 936        relevant attributes are simply struct classes. This function replaces
 937        those that are not with placeholders and then returns a dictionary that
 938        can be serialized as JSON.
 939
 940        :param obj:  Object to serialize
 941        :return:  Serialized object
 942        """
 943        scalars = (int, str, float, list, tuple, set, bool)
 944
 945        if type(input_obj) in scalars or input_obj is None:
 946            return input_obj
 947
 948        if type(input_obj) is not dict:
 949            obj = input_obj.__dict__
 950        else:
 951            obj = input_obj.copy()
 952
 953        mapped_obj = {}
 954        for item, value in obj.items():
 955            if type(value) is datetime:
 956                mapped_obj[item] = value.timestamp()
 957            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 958                mapped_obj[item] = SearchTelegram.serialize_obj(value)
 959            elif type(value) is list:
 960                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
 961            elif type(value) is bytes:
 962                mapped_obj[item] = value.hex()
 963            elif type(value) not in scalars and value is not None:
 964                # type we can't make sense of here
 965                continue
 966            else:
 967                mapped_obj[item] = value
 968
 969        # Add the _type if the original object was a telethon type
 970        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
 971            mapped_obj["_type"] = type(input_obj).__name__
 972
 973        # Store the markdown-formatted text
 974        if type(input_obj).__name__ == "Message":
 975            mapped_obj["message_markdown"] = input_obj.text
 976
 977        return mapped_obj
 978
 979    @staticmethod
 980    def validate_query(query, request, config):
 981        """
 982        Validate Telegram query
 983
 984        :param config:
 985        :param dict query:  Query parameters, from client-side.
 986        :param request:  Flask request
 987        :param User user:  User object of user who has submitted the query
 988        :param ConfigManager config:  Configuration reader (context-aware)
 989        :return dict:  Safe query parameters
 990        """
 991        # no query 4 u
 992        if not query.get("query", "").strip():
 993            raise QueryParametersException("You must provide a search query.")
 994
 995        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 996            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 997
 998        all_posts = config.get("telegram-search.can_query_all_messages", False)
 999        max_entities = config.get("telegram-search.max_entities", 25)
1000
1001        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options(
1002            config=config)["max_posts"]["max"])
1003
1004        # reformat queries to be a comma-separated list with no wrapping
1005        # whitespace
1006        whitespace = re.compile(r"\s+")
1007        items = whitespace.sub("", query.get("query").replace("\n", ","))
1008        if max_entities > 0 and len(items.split(",")) > max_entities:
1009            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
1010
1011        sanitized_items = []
1012        # handle telegram URLs
1013        for item in items.split(","):
1014            if not item.strip():
1015                continue
1016            item = re.sub(r"^https?://t\.me/", "", item)
1017            item = re.sub(r"^/?s/", "", item)
1018            item = re.sub(r"[/]*$", "", item)
1019            sanitized_items.append(item)
1020
1021        # the dates need to make sense as a range to search within
1022        min_date, max_date = query.get("daterange")
1023
1024        # now check if there is an active API session
1025        if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous:
1026            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1027                                           "accounts.")
1028
1029        # check for the information we need
1030        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1031                                                      query.get("api_hash"))
1032        config.user.set_value("telegram.session", session_id)
1033        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session")
1034
1035        client = None
1036
1037        # API ID is always a number, if it's not, we can immediately fail
1038        try:
1039            api_id = int(query.get("api_id"))
1040        except ValueError:
1041            raise QueryParametersException("Invalid API ID.")
1042
1043        # maybe we've entered a code already and submitted it with the request
1044        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1045            code_callback = lambda: request.form.get("option-security-code")  # noqa: E731
1046            max_attempts = 1
1047        else:
1048            code_callback = lambda: -1  # noqa: E731
1049            # max_attempts = 0 because authing will always fail: we can't wait for
1050            # the code to be entered interactively, we'll need to do a new request
1051            # but we can't just immediately return, we still need to call start()
1052            # to get telegram to send us a code
1053            max_attempts = 0
1054
1055        # now try authenticating
1056        needs_code = False
1057        try:
1058            loop = asyncio.new_event_loop()
1059            asyncio.set_event_loop(loop)
1060            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1061
1062            try:
1063                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1064
1065            except ValueError as e:
1066                # this happens if 2FA is required
1067                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1068                                               f"does not support this authentication mode for Telegram. ({e})")
1069            except RuntimeError:
1070                # A code was sent to the given phone number
1071                needs_code = True
1072        except FloodWaitError as e:
1073            # uh oh, we got rate-limited
1074            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1075                                           str(e).split("(")[0] + ".")
1076        except ApiIdInvalidError:
1077            # wrong credentials
1078            raise QueryParametersException("Your API credentials are invalid.")
1079        except PhoneNumberInvalidError:
1080            # wrong phone number
1081            raise QueryParametersException(
1082                "The phone number provided is not a valid phone number for these credentials.")
1083        except RPCError as e:
1084            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1085            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1086                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1087        except Exception as e:
1088            # ?
1089            raise QueryParametersException(
1090                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1091        finally:
1092            if client:
1093                client.disconnect()
1094
1095        if needs_code:
1096            raise QueryNeedsFurtherInputException(config={
1097                "code-info": {
1098                    "type": UserInput.OPTION_INFO,
1099                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1100                },
1101                "security-code": {
1102                    "type": UserInput.OPTION_TEXT,
1103                    "help": "Security code",
1104                    "sensitive": True
1105                }})
1106
1107        # simple!
1108        return {
1109            "items": num_items,
1110            "query": ",".join(sanitized_items),
1111            "api_id": query.get("api_id"),
1112            "api_hash": query.get("api_hash"),
1113            "api_phone": query.get("api_phone"),
1114            "save-session": query.get("save-session"),
1115            "resolve-entities": query.get("resolve-entities"),
1116            "min_date": min_date,
1117            "max_date": max_date,
1118            "crawl-depth": query.get("crawl-depth"),
1119            "crawl-threshold": query.get("crawl-threshold"),
1120            "crawl-via-links": query.get("crawl-via-links")
1121        }
1122
1123    @staticmethod
1124    def create_session_id(api_phone, api_id, api_hash):
1125        """
1126        Generate a filename for the session file
1127
1128        This is a combination of phone number and API credentials, but hashed
1129        so that one cannot actually derive someone's phone number from it.
1130
1131        :param str api_phone:  Phone number for API ID
1132        :param int api_id:  Telegram API ID
1133        :param str api_hash:  Telegram API Hash
1134        :return str: A hash value derived from the input
1135        """
1136        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1137        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Search Telegram via API

type = 'telegram-search'
category = 'Search'
title = 'Telegram API search'
description = 'Scrapes messages from open Telegram groups via its API.'
extension = 'ndjson'
is_local = False
is_static = False
details_cache = None
failures_cache = None
eventloop = None
import_issues = 0
end_if_rate_limited = 600
max_workers = 1
max_retries = 3
flawless = 0
config = {'telegram-search.can_query_all_messages': {'type': 'toggle', 'help': 'Remove message amount limit', 'default': False, 'tooltip': 'Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!'}, 'telegram-search.max_entities': {'type': 'string', 'help': 'Max entities to query', 'coerce_type': <class 'int'>, 'min': 0, 'default': 25, 'tooltip': 'Amount of entities that can be queried at a time. Entities are groups or channels. 0 to disable limit.'}, 'telegram-search.max_crawl_depth': {'type': 'string', 'help': 'Max crawl depth', 'coerce_type': <class 'int'>, 'min': 0, 'default': 0, 'tooltip': 'If higher than 0, 4CAT can automatically add new entities to the query based on forwarded messages. Recommended to leave at 0 for most users since this can exponentially increase dataset sizes.'}}
@classmethod
def get_options(cls, parent_dataset=None, config=None):
 82    @classmethod
 83    def get_options(cls, parent_dataset=None, config=None):
 84        """
 85        Get processor options
 86
 87        Just updates the description of the entities field based on the
 88        configured max entities.
 89
 90        :param DataSet parent_dataset:  An object representing the dataset that
 91          the processor would be run on
 92        :param ConfigManager|None config:  Configuration reader (context-aware)
 93        """
 94
 95        max_entities = config.get("telegram-search.max_entities", 25)
 96        options = {
 97            "intro": {
 98                "type": UserInput.OPTION_INFO,
 99                "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity "
100                        "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API "
101                        "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor "
102                        "authentication for Telegram."
103            },
104            "api_id": {
105                "type": UserInput.OPTION_TEXT,
106                "help": "API ID",
107                "cache": True,
108            },
109            "api_hash": {
110                "type": UserInput.OPTION_TEXT,
111                "help": "API Hash",
112                "cache": True,
113            },
114            "api_phone": {
115                "type": UserInput.OPTION_TEXT,
116                "help": "Phone number",
117                "cache": True,
118                "default": "+xxxxxxxxxx"
119            },
120            "divider": {
121                "type": UserInput.OPTION_DIVIDER
122            },
123            "query-intro": {
124                "type": UserInput.OPTION_INFO,
125                "help": "Separate with commas or line breaks."
126            },
127            "query": {
128                "type": UserInput.OPTION_TEXT_LARGE,
129                "help": "Entities to scrape",
130                "tooltip": "Separate with commas or line breaks."
131            },
132            "max_posts": {
133                "type": UserInput.OPTION_TEXT,
134                "help": "Messages per group",
135                "min": 1,
136                "max": 50000,
137                "default": 10
138            },
139            "daterange": {
140                "type": UserInput.OPTION_DATERANGE,
141                "help": "Date range"
142            },
143            "divider-2": {
144                "type": UserInput.OPTION_DIVIDER
145            },
146            "info-sensitive": {
147                "type": UserInput.OPTION_INFO,
148                "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored "
149                        "there while data is fetched. After the dataset has been created your credentials will be "
150                        "deleted from the server, unless you enable the option below. If you want to download images "
151                        "attached to the messages in your collected data, you need to enable this option. Your "
152                        "credentials will never be visible to other users and can be erased later via the result page."
153            },
154            "save-session": {
155                "type": UserInput.OPTION_TOGGLE,
156                "help": "Save session:",
157                "default": False
158            },
159            "resolve-entities-intro": {
160                "type": UserInput.OPTION_INFO,
161                "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full "
162                        "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and "
163                        "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and "
164                        "for large datasets, increases the chance you will be rate-limited and your dataset isn't able "
165                        "to finish capturing. It will also dramatically increase the disk space needed to store the "
166                        "data, so only enable this if you really need it!"
167            },
168            "resolve-entities": {
169                "type": UserInput.OPTION_TOGGLE,
170                "help": "Resolve references",
171                "default": False,
172            }
173        }
174
175        if max_entities:
176            options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities "
177                                              f"(channels or groups) at a time. Separate with line breaks or commas.")
178
179        all_messages = config.get("telegram-search.can_query_all_messages", False)
180        if all_messages:
181            if "max" in options["max_posts"]:
182                del options["max_posts"]["max"]
183        else:
184            options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to "
185                                             f"{options['max_posts']['max']:,} messages per entity.")
186
187        if config.get("telegram-search.max_crawl_depth", 0) > 0:
188            options["crawl_intro"] = {
189                "type": UserInput.OPTION_INFO,
190                "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a "
191                        "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can "
192                        "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can "
193                        "rapidly grow when adding newly discovered entities to the query this way. Note that dataset "
194                        "progress cannot be accurately tracked when you use this feature."
195            }
196            options["crawl-depth"] = {
197                "type": UserInput.OPTION_TEXT,
198                "coerce_type": int,
199                "min": 0,
200                "max": config.get("telegram-search.max_crawl_depth"),
201                "default": 0,
202                "help": "Crawl depth",
203                "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial "
204                           "query, i.e. at most this many hops can be needed to reach the entity from one of the "
205                           "starting entities."
206            }
207            options["crawl-threshold"] = {
208                "type": UserInput.OPTION_TEXT,
209                "coerce_type": int,
210                "min": 0,
211                "default": 5,
212                "help": "Crawl threshold",
213                "tooltip": "Entities need to be references at least this many times to be added to the query. Only "
214                           "references discovered below the max crawl depth are taken into account."
215            }
216            options["crawl-via-links"] = {
217                "type": UserInput.OPTION_TOGGLE,
218                "default": False,
219                "help": "Extract new groups from links",
220                "tooltip": "Look for references to other groups in message content via t.me links and @references. "
221                           "This is more error-prone than crawling only via forwards, but can be a way to discover "
222                           "links that would otherwise remain undetected."
223            }
224
225        return options

Get processor options

Just updates the description of the entities field based on the configured max entities.

Parameters
  • DataSet parent_dataset: An object representing the dataset that the processor would be run on
  • ConfigManager|None config: Configuration reader (context-aware)
def get_items(self, query):
228    def get_items(self, query):
229        """
230        Execute a query; get messages for given parameters
231
232        Basically a wrapper around execute_queries() to call it with asyncio.
233
234        :param dict query:  Query parameters, as part of the DataSet object
235        :return list:  Posts, sorted by thread and post ID, in ascending order
236        """
237        if "api_phone" not in query or "api_hash" not in query or "api_id" not in query:
238            self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try "
239                                       "creating it again from scratch.", is_final=True)
240            return None
241
242        self.details_cache = {}
243        self.failures_cache = set()
244        #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this
245        results = asyncio.run(self.execute_queries())
246
247        if not query.get("save-session"):
248            self.dataset.delete_parameter("api_hash", instant=True)
249            self.dataset.delete_parameter("api_phone", instant=True)
250            self.dataset.delete_parameter("api_id", instant=True)
251
252        if self.flawless:
253            self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have "
254                                       "been private). View the log file for details.", is_final=True)
255
256        return results

Execute a query; get messages for given parameters

Basically a wrapper around execute_queries() to call it with asyncio.

Parameters
  • dict query: Query parameters, as part of the DataSet object
Returns

Posts, sorted by thread and post ID, in ascending order

async def execute_queries(self):
258    async def execute_queries(self):
259        """
260        Get messages for queries
261
262        This is basically what would be done in get_items(), except due to
263        Telethon's architecture this needs to be called in an async method,
264        which is this one.
265
266        :return list:  Collected messages
267        """
268        # session file has been created earlier, and we can re-use it here in
269        # order to avoid having to re-enter the security code
270        query = self.parameters
271
272        session_id = SearchTelegram.create_session_id(query["api_phone"].strip(),
273                                                      query["api_id"].strip(),
274                                                      query["api_hash"].strip())
275        self.dataset.log(f'Telegram session id: {session_id}')
276        session_path = Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), session_id + ".session")
277
278        client = None
279
280        try:
281            client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"),
282                                    loop=self.eventloop)
283            await client.start(phone=SearchTelegram.cancel_start)
284        except RuntimeError:
285            # session is no longer useable, delete file so user will be asked
286            # for security code again. The RuntimeError is raised by
287            # `cancel_start()`
288            self.dataset.update_status(
289                "Session is not authenticated: login security code may have expired. You need to re-enter the security code.",
290                is_final=True)
291
292            if client and hasattr(client, "disconnect"):
293                await client.disconnect()
294
295            if session_path.exists():
296                session_path.unlink()
297
298            return []
299        except Exception as e:
300            # not sure what exception specifically is triggered here, but it
301            # always means the connection failed
302            self.log.error(f"Telegram: {e}\n{traceback.format_exc()}")
303            self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True)
304            if client and hasattr(client, "disconnect"):
305                await client.disconnect()
306            return []
307
308        # ready our parameters
309        parameters = self.dataset.get_parameters()
310        queries = [query.strip() for query in parameters.get("query", "").split(",")]
311        max_items = convert_to_int(parameters.get("items", 10), 10)
312
313        # Telethon requires the offset date to be a datetime date
314        max_date = parameters.get("max_date")
315        if max_date:
316            try:
317                max_date = datetime.fromtimestamp(int(max_date))
318            except ValueError:
319                max_date = None
320
321        # min_date can remain an integer
322        min_date = parameters.get("min_date")
323        if min_date:
324            try:
325                min_date = int(min_date)
326            except ValueError:
327                min_date = None
328
329        posts = []
330        try:
331            async for post in self.gather_posts(client, queries, max_items, min_date, max_date):
332                posts.append(post)
333            return posts
334        except ProcessorInterruptedException as e:
335            raise e
336        except Exception:
337            # catch-all so we can disconnect properly
338            # ...should we?
339            self.dataset.update_status("Error scraping posts from Telegram; halting collection.")
340            self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}")
341            # May as well return what was captured, yes?
342            return posts
343        finally:
344            await client.disconnect()

Get messages for queries

This is basically what would be done in get_items(), except due to Telethon's architecture this needs to be called in an async method, which is this one.

Returns

Collected messages

async def gather_posts(self, client, queries, max_items, min_date, max_date):
346    async def gather_posts(self, client, queries, max_items, min_date, max_date):
347        """
348        Gather messages for each entity for which messages are requested
349
350        :param TelegramClient client:  Telegram Client
351        :param list queries:  List of entities to query (as string)
352        :param int max_items:  Messages to scrape per entity
353        :param int min_date:  Datetime date to get posts after
354        :param int max_date:  Datetime date to get posts before
355        :return list:  List of messages, each message a dictionary.
356        """
357        resolve_refs = self.parameters.get("resolve-entities")
358
359        # Adding flag to stop; using for rate limits
360        no_additional_queries = False
361
362        # This is used for the 'crawl' feature so we know at which depth a
363        # given entity was discovered
364        depth_map = {
365            entity: 0 for entity in queries
366        }
367
368        crawl_max_depth = self.parameters.get("crawl-depth", 0)
369        crawl_msg_threshold = self.parameters.get("crawl-threshold", 10)
370        crawl_via_links = self.parameters.get("crawl-via-links", False)
371
372        self.dataset.log(f"Max crawl depth: {crawl_max_depth}")
373        self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}")
374
375        # this keeps track of how often an entity not in the original query
376        # has been mentioned. When crawling is enabled and this exceeds the
377        # given threshold, the entity is added to the query
378        crawl_references = {}
379        full_query = set(queries)
380        num_queries = len(queries)
381
382        # we may not always know the 'entity username' for an entity ID, so
383        # keep a reference map as we go
384        entity_id_map = {}
385
386        # Collect queries
387        # Use while instead of for so we can change queries during iteration
388        # this is needed for the 'crawl' feature which can discover new
389        # entities during crawl
390        processed = 0
391        total_messages = 0
392        while queries:
393            query = queries.pop(0)
394
395            delay = 10
396            retries = 0
397            processed += 1
398            self.dataset.update_progress(processed / num_queries)
399
400            if no_additional_queries:
401                # Note that we are not completing this query
402                self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}")
403                continue
404
405            while True:
406                self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'")
407                entity_posts = 0
408                discovered = 0
409                try:
410                    async for message in client.iter_messages(entity=query, offset_date=max_date):
411                        entity_posts += 1
412                        total_messages += 1
413                        if self.interrupted:
414                            raise ProcessorInterruptedException(
415                                "Interrupted while fetching message data from the Telegram API")
416
417                        if entity_posts % 100 == 0:
418                            self.dataset.update_status(
419                                f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)")
420
421                        if message.action is not None:
422                            # e.g. someone joins the channel - not an actual message
423                            continue
424
425                        # todo: possibly enrich object with e.g. the name of
426                        # the channel a message was forwarded from (but that
427                        # needs extra API requests...)
428                        serialized_message = SearchTelegram.serialize_obj(message)
429                        if "_chat" in serialized_message:
430                            # Add query ID to check if queries have been crawled previously
431                            full_query.add(serialized_message["_chat"]["id"])
432                            if query not in entity_id_map and serialized_message["_chat"]["id"] == query:
433                                # once we know what a channel ID resolves to, use the username instead so it is easier to
434                                # understand for the user
435                                entity_id_map[query] = serialized_message["_chat"]["username"]
436                                self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})")
437
438                        if resolve_refs:
439                            serialized_message = await self.resolve_groups(client, serialized_message)
440
441                        # Stop if we're below the min date
442                        if min_date and serialized_message.get("date") < min_date:
443                            break
444
445                        # if crawling is enabled, see if we found something to add to the query
446                        linked_entities = set()
447                        if crawl_max_depth and (depth_map.get(query) < crawl_max_depth):
448                            message_fwd = serialized_message.get("fwd_from")
449                            fwd_from = None
450                            fwd_source_type = None
451                            if message_fwd and message_fwd.get("from_id"):
452                                if message_fwd["from_id"].get("_type") == "PeerChannel":
453                                    # Legacy(?) data structure (pre 2024/7/22)
454                                    # even if we haven't resolved the ID, we can feed the numeric ID
455                                    # to Telethon! this is nice because it means we don't have to
456                                    # resolve entities to crawl iteratively
457                                    fwd_from = int(message_fwd["from_id"]["channel_id"])
458                                    fwd_source_type = "channel"
459                                elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}):
460                                    # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me
461                                    # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far
462                                    fwd_from = int(message_fwd["from_id"]["full_chat"]["id"])
463                                    fwd_source_type = "channel"
464                                elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"):
465                                    # forwards can also come from users
466                                    # these can never be followed, so don't add these to the crawl, but do document them
467                                    fwd_source_type = "user"
468                                else:
469                                    print(json.dumps(message_fwd))
470                                    self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl")
471                                    fwd_source_type = "unknown"
472
473                                if fwd_from:
474                                    linked_entities.add(fwd_from)
475
476
477                            if crawl_via_links:
478                                # t.me links
479                                all_links = ural.urls_from_text(serialized_message["message"])
480                                all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
481                                for link in all_links:
482                                    if link.startswith("+"):
483                                        # invite links
484                                        continue
485
486                                    entity_name = link.split("/")[0].split("?")[0].split("#")[0]
487                                    linked_entities.add(entity_name)
488
489                                # @references
490                                references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention]
491                                for reference in references:
492                                    if reference.startswith("@"):
493                                        reference = reference[1:]
494
495                                    reference = reference.split("/")[0]
496
497                                    linked_entities.add(reference)
498
499                            # Check if fwd_from or the resolved entity ID is already queued or has been queried
500                            for link in linked_entities:
501                                if link not in full_query and link not in queries and fwd_source_type not in ("user",):
502                                    # new entity discovered!
503                                    # might be discovered (before collection) multiple times, so retain lowest depth
504                                    # print(f"Potentially crawling {link}")
505                                    depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1)
506                                    if link not in crawl_references:
507                                        crawl_references[link] = 0
508                                    crawl_references[link] += 1
509
510                                    # Add to queries if it has been referenced enough times
511                                    if crawl_references[link] >= crawl_msg_threshold:
512                                        queries.append(link)
513                                        full_query.add(link)
514                                        num_queries += 1
515                                        discovered += 1
516                                        self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query")
517
518
519
520                        serialized_message["4CAT_metadata"] = {
521                            "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls
522                            "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity
523                            "query_depth": depth_map.get(query, 0)
524                        }
525                        yield serialized_message
526
527                        if entity_posts >= max_items:
528                            break
529
530                except ChannelPrivateError:
531                    self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping")
532                    self.flawless += 1
533
534                except (UsernameInvalidError,):
535                    self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping")
536                    self.flawless += 1
537
538                except FloodWaitError as e:
539                    self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting")
540                    if e.seconds < self.end_if_rate_limited:
541                        time.sleep(e.seconds)
542                        continue
543                    else:
544                        self.flawless += 1
545                        no_additional_queries = True
546                        self.dataset.update_status(
547                            f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending")
548                        break
549
550                except BadRequestError as e:
551                    self.dataset.update_status(
552                        f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping")
553                    self.flawless += 1
554
555                except ValueError as e:
556                    self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping")
557                    self.flawless += 1
558
559                except ChannelPrivateError as e:
560                    self.dataset.update_status(
561                        f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.")
562                    break
563
564                except TimeoutError:
565                    if retries < 3:
566                        self.dataset.update_status(
567                            f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.")
568                        self.flawless += 1
569                        break
570
571                    self.dataset.update_status(
572                        f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.")
573                    time.sleep(delay)
574                    delay *= 2
575                    continue
576
577                self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)")
578                break

Gather messages for each entity for which messages are requested

Parameters
  • TelegramClient client: Telegram Client
  • list queries: List of entities to query (as string)
  • int max_items: Messages to scrape per entity
  • int min_date: Datetime date to get posts after
  • int max_date: Datetime date to get posts before
Returns

List of messages, each message a dictionary.

async def resolve_groups(self, client, message):
580    async def resolve_groups(self, client, message):
581        """
582        Recursively resolve references to groups and users
583
584        :param client:  Telethon client instance
585        :param dict message:  Message, as already mapped by serialize_obj
586        :return:  Resolved dictionary
587        """
588        resolved_message = message.copy()
589        for key, value in message.items():
590            try:
591                if type(value) is not dict:
592                    # if it's not a dict, we never have to resolve it, as it
593                    # does not represent an entity
594                    continue
595
596                elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"):
597                    # forwarded from a channel!
598                    if value["channel_id"] in self.failures_cache:
599                        continue
600
601                    if value["channel_id"] not in self.details_cache:
602                        channel = await client(GetFullChannelRequest(value["channel_id"]))
603                        self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel)
604
605                    resolved_message[key] = self.details_cache[value["channel_id"]]
606                    resolved_message[key]["channel_id"] = value["channel_id"]
607
608                elif "_type" in value and value["_type"] == "PeerUser":
609                    # a user!
610                    if value["user_id"] in self.failures_cache:
611                        continue
612
613                    if value["user_id"] not in self.details_cache:
614                        user = await client(GetFullUserRequest(value["user_id"]))
615                        self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user)
616
617                    resolved_message[key] = self.details_cache[value["user_id"]]
618                    resolved_message[key]["user_id"] = value["user_id"]
619                else:
620                    resolved_message[key] = await self.resolve_groups(client, value)
621
622            except (TypeError, ChannelPrivateError, UsernameInvalidError) as e:
623                self.failures_cache.add(value.get("channel_id", value.get("user_id")))
624                if type(e) in (ChannelPrivateError, UsernameInvalidError):
625                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is")
626                else:
627                    self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is")
628
629        return resolved_message

Recursively resolve references to groups and users

Parameters
  • client: Telethon client instance
  • dict message: Message, as already mapped by serialize_obj
Returns

Resolved dictionary

@staticmethod
def cancel_start():
631    @staticmethod
632    def cancel_start():
633        """
634        Replace interactive phone number input in Telethon
635
636        By default, if Telethon cannot use the given session file to
637        authenticate, it will interactively prompt the user for a phone
638        number on the command line. That is not useful here, so instead
639        raise a RuntimeError. This will be caught below and the user will
640        be told they need to re-authenticate via 4CAT.
641        """
642        raise RuntimeError("Connection cancelled")

Replace interactive phone number input in Telethon

By default, if Telethon cannot use the given session file to authenticate, it will interactively prompt the user for a phone number on the command line. That is not useful here, so instead raise a RuntimeError. This will be caught below and the user will be told they need to re-authenticate via 4CAT.

@staticmethod
def map_item(message):
644    @staticmethod
645    def map_item(message):
646        """
647        Convert Message object to 4CAT-ready data object
648
649        :param Message message:  Message to parse
650        :return dict:  4CAT-compatible item object
651        """
652        if message["_chat"]["username"]:
653            # chats can apparently not have usernames???
654            # truly telegram objects are way too lenient for their own good
655            thread = message["_chat"]["username"]
656        elif message["_chat"]["title"]:
657            thread = re.sub(r"\s", "", message["_chat"]["title"])
658        else:
659            # just give up
660            thread = "unknown"
661
662        # determine username
663        # API responses only include the user *ID*, not the username, and to
664        # complicate things further not everyone is a user and not everyone
665        # has a username. If no username is available, try the first and
666        # last name someone has supplied
667        fullname = ""
668        username = ""
669        user_id = message["_sender"]["id"] if message.get("_sender") else ""
670        user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else ""
671
672        if message.get("_sender") and message["_sender"].get("username"):
673            username = message["_sender"]["username"]
674
675        if message.get("_sender") and message["_sender"].get("first_name"):
676            fullname += message["_sender"]["first_name"]
677
678        if message.get("_sender") and message["_sender"].get("last_name"):
679            fullname += " " + message["_sender"]["last_name"]
680
681        fullname = fullname.strip()
682
683        # determine media type
684        # these store some extra information of the attachment in
685        # attachment_data. Since the final result will be serialised as a csv
686        # file, we can only store text content. As such some media data is
687        # serialised as JSON.
688        attachment_type = SearchTelegram.get_media_type(message["media"])
689        attachment_filename = ""
690
691        if attachment_type == "contact":
692            contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"]
693            if message["media"].get('contact', False):
694                # Old datastructure
695                attachment = message["media"]["contact"]
696            elif all([property in message["media"].keys() for property in contact_data]):
697                # New datastructure 2022/7/25
698                attachment = message["media"]
699            else:
700                raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed')
701            attachment_data = json.dumps({property: attachment.get(property) for property in contact_data})
702
703        elif attachment_type == "document":
704            # videos, etc
705            # This could add a separate routine for videos to make them a
706            # separate type, which could then be scraped later, etc
707            attachment_type = message["media"]["document"]["mime_type"].split("/")[0]
708            if attachment_type == "video":
709                attachment = message["media"]["document"]
710                attachment_data = json.dumps({
711                    "id": attachment["id"],
712                    "dc_id": attachment["dc_id"],
713                    "file_reference": attachment["file_reference"],
714                })
715            else:
716                attachment_data = ""
717
718        # elif attachment_type in ("geo", "geo_live"):
719        # untested whether geo_live is significantly different from geo
720        #    attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"])
721
722        elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"):
723            # we don't actually store any metadata about the photo, since very
724            # little of the metadata attached is of interest. Instead, the
725            # actual photos may be downloaded via a processor that is run on the
726            # search results
727            attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"]
728            attachment_data = json.dumps({
729                "id": attachment["id"],
730                "dc_id": attachment["dc_id"],
731                "file_reference": attachment["file_reference"],
732            })
733            attachment_filename = thread + "-" + str(message["id"]) + ".jpeg"
734
735        elif attachment_type == "poll":
736            # unfortunately poll results are only available when someone has
737            # actually voted on the poll - that will usually not be the case,
738            # so we store -1 as the vote count
739            attachment = message["media"]
740            options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]}
741            attachment_data = json.dumps({
742                "question": attachment["poll"]["question"],
743                "voters": attachment["results"]["total_voters"],
744                "answers": [{
745                    "answer": options[answer["option"]],
746                    "votes": answer["voters"]
747                } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{
748                    "answer": options[option],
749                    "votes": -1
750                } for option in options]
751            })
752
753        else:
754            attachment_data = ""
755
756        # was the message forwarded from somewhere and if so when?
757        forwarded_timestamp = ""
758        forwarded_name = ""
759        forwarded_id = ""
760        forwarded_username = ""
761        if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int:
762            # forward information is spread out over a lot of places
763            # we can identify, in order of usefulness: username, full name,
764            # and ID. But not all of these are always available, and not
765            # always in the same place either
766            forwarded_timestamp = int(message["fwd_from"]["date"])
767            from_data = message["fwd_from"]["from_id"]
768
769            if from_data:
770                forwarded_id = from_data.get("channel_id", from_data.get("user_id", ""))
771
772            if message["fwd_from"].get("from_name"):
773                forwarded_name = message["fwd_from"].get("from_name")
774
775            if from_data and from_data.get("from_name"):
776                forwarded_name = message["fwd_from"]["from_name"]
777
778            if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data:
779                from_data["user"] = from_data["users"][0]
780
781            if from_data and ("user" in from_data or "chats" in from_data):
782                # 'resolve entities' was enabled for this dataset
783                if "user" in from_data:
784                    if from_data["user"].get("username"):
785                        forwarded_username = from_data["user"]["username"]
786
787                    if from_data["user"].get("first_name"):
788                        forwarded_name = from_data["user"]["first_name"]
789                    if message["fwd_from"].get("last_name"):
790                        forwarded_name += "  " + from_data["user"]["last_name"]
791
792                    forwarded_name = forwarded_name.strip()
793
794                elif "chats" in from_data:
795                    channel_id = from_data.get("channel_id")
796                    for chat in from_data["chats"]:
797                        if chat["id"] == channel_id or channel_id is None:
798                            forwarded_username = chat["username"]
799
800            elif message.get("_forward") and message["_forward"].get("_chat"):
801                if message["_forward"]["_chat"].get("username"):
802                    forwarded_username = message["_forward"]["_chat"]["username"]
803
804                if message["_forward"]["_chat"].get("title"):
805                    forwarded_name = message["_forward"]["_chat"]["title"]
806
807        link_title = ""
808        link_attached = ""
809        link_description = ""
810        reactions = ""
811
812        if message.get("media") and message["media"].get("webpage"):
813            link_title = message["media"]["webpage"].get("title")
814            link_attached = message["media"]["webpage"].get("url")
815            link_description = message["media"]["webpage"].get("description")
816
817        if message.get("reactions") and message["reactions"].get("results"):
818            for reaction in message["reactions"]["results"]:
819                if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]:
820                    # Updated to support new reaction datastructure
821                    reactions += reaction["reaction"]["emoticon"] * reaction["count"]
822                elif type(reaction["reaction"]) is str and "count" in reaction:
823                    reactions += reaction["reaction"] * reaction["count"]
824                else:
825                    # Failsafe; can be updated to support formatting of new datastructures in the future
826                    reactions += f"{reaction}, "
827
828        is_reply = False
829        reply_to = ""
830        if message.get("reply_to"):
831            is_reply = True
832            reply_to = message["reply_to"].get("reply_to_msg_id", "")
833
834        # t.me links
835        linked_entities = set()
836        all_links = ural.urls_from_text(message["message"])
837        all_links = [link.split("t.me/")[1] for link in all_links if
838                     ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1]
839
840        for link in all_links:
841            if link.startswith("+"):
842                # invite links
843                continue
844
845            entity_name = link.split("/")[0].split("?")[0].split("#")[0]
846            linked_entities.add(entity_name)
847
848        # @references
849        # in execute_queries we use MessageEntityMention to get these
850        # however, after serializing these objects we only have the offsets of
851        # the mentioned username, and telegram does weird unicode things to its
852        # offsets meaning we can't just substring the message. So use a regex
853        # as a 'good enough' solution
854        all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"]))
855
856        # make this case-insensitive since people may use different casing in
857        # messages than the 'official' username for example
858        all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v])
859        all_ci_connections = set()
860        seen = set()
861        for connection in all_connections:
862            if connection.lower() not in seen:
863                all_ci_connections.add(connection)
864                seen.add(connection.lower())
865
866        return MappedItem({
867            "id": f"{message['_chat']['username']}-{message['id']}",
868            "thread_id": thread,
869            "chat": message["_chat"]["username"],
870            "author": user_id,
871            "author_username": username,
872            "author_name": fullname,
873            "author_is_bot": "yes" if user_is_bot else "no",
874            "body": message["message"],
875            "body_markdown": message.get("message_markdown", MissingMappedField("")),
876            "is_reply": is_reply,
877            "reply_to": reply_to,
878            "views": message["views"] if message["views"] else "",
879            # "forwards": message.get("forwards", MissingMappedField(0)),
880            "reactions": reactions,
881            "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"),
882            "unix_timestamp": int(message["date"]),
883            "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[
884                "edit_date"] else "",
885            "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "",
886            "author_forwarded_from_name": forwarded_name,
887            "author_forwarded_from_username": forwarded_username,
888            "author_forwarded_from_id": forwarded_id,
889            "entities_linked": ",".join(linked_entities),
890            "entities_mentioned": ",".join(all_mentions),
891            "all_connections": ",".join(all_ci_connections),
892            "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime(
893                "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "",
894            "unix_timestamp_forwarded_from": forwarded_timestamp,
895            "link_title": link_title,
896            "link_description": link_description,
897            "link_attached": link_attached,
898            "attachment_type": attachment_type,
899            "attachment_data": attachment_data,
900            "attachment_filename": attachment_filename
901        })

Convert Message object to 4CAT-ready data object

Parameters
  • Message message: Message to parse
Returns

4CAT-compatible item object

@staticmethod
def get_media_type(media):
903    @staticmethod
904    def get_media_type(media):
905        """
906        Get media type for a Telegram attachment
907
908        :param media:  Media object
909        :return str:  Textual identifier of the media type
910        """
911        try:
912            return {
913                "NoneType": "",
914                "MessageMediaContact": "contact",
915                "MessageMediaDocument": "document",
916                "MessageMediaEmpty": "",
917                "MessageMediaGame": "game",
918                "MessageMediaGeo": "geo",
919                "MessageMediaGeoLive": "geo_live",
920                "MessageMediaInvoice": "invoice",
921                "MessageMediaPhoto": "photo",
922                "MessageMediaPoll": "poll",
923                "MessageMediaUnsupported": "unsupported",
924                "MessageMediaVenue": "venue",
925                "MessageMediaWebPage": "url"
926            }[media.get("_type", None)]
927        except (AttributeError, KeyError):
928            return ""

Get media type for a Telegram attachment

Parameters
  • media: Media object
Returns

Textual identifier of the media type

@staticmethod
def serialize_obj(input_obj):
930    @staticmethod
931    def serialize_obj(input_obj):
932        """
933        Serialize an object as a dictionary
934
935        Telethon message objects are not serializable by themselves, but most
936        relevant attributes are simply struct classes. This function replaces
937        those that are not with placeholders and then returns a dictionary that
938        can be serialized as JSON.
939
940        :param obj:  Object to serialize
941        :return:  Serialized object
942        """
943        scalars = (int, str, float, list, tuple, set, bool)
944
945        if type(input_obj) in scalars or input_obj is None:
946            return input_obj
947
948        if type(input_obj) is not dict:
949            obj = input_obj.__dict__
950        else:
951            obj = input_obj.copy()
952
953        mapped_obj = {}
954        for item, value in obj.items():
955            if type(value) is datetime:
956                mapped_obj[item] = value.timestamp()
957            elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
958                mapped_obj[item] = SearchTelegram.serialize_obj(value)
959            elif type(value) is list:
960                mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value]
961            elif type(value) is bytes:
962                mapped_obj[item] = value.hex()
963            elif type(value) not in scalars and value is not None:
964                # type we can't make sense of here
965                continue
966            else:
967                mapped_obj[item] = value
968
969        # Add the _type if the original object was a telethon type
970        if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"):
971            mapped_obj["_type"] = type(input_obj).__name__
972
973        # Store the markdown-formatted text
974        if type(input_obj).__name__ == "Message":
975            mapped_obj["message_markdown"] = input_obj.text
976
977        return mapped_obj

Serialize an object as a dictionary

Telethon message objects are not serializable by themselves, but most relevant attributes are simply struct classes. This function replaces those that are not with placeholders and then returns a dictionary that can be serialized as JSON.

Parameters
  • obj: Object to serialize
Returns

Serialized object

@staticmethod
def validate_query(query, request, config):
 979    @staticmethod
 980    def validate_query(query, request, config):
 981        """
 982        Validate Telegram query
 983
 984        :param config:
 985        :param dict query:  Query parameters, from client-side.
 986        :param request:  Flask request
 987        :param User user:  User object of user who has submitted the query
 988        :param ConfigManager config:  Configuration reader (context-aware)
 989        :return dict:  Safe query parameters
 990        """
 991        # no query 4 u
 992        if not query.get("query", "").strip():
 993            raise QueryParametersException("You must provide a search query.")
 994
 995        if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None):
 996            raise QueryParametersException("You need to provide valid Telegram API credentials first.")
 997
 998        all_posts = config.get("telegram-search.can_query_all_messages", False)
 999        max_entities = config.get("telegram-search.max_entities", 25)
1000
1001        num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options(
1002            config=config)["max_posts"]["max"])
1003
1004        # reformat queries to be a comma-separated list with no wrapping
1005        # whitespace
1006        whitespace = re.compile(r"\s+")
1007        items = whitespace.sub("", query.get("query").replace("\n", ","))
1008        if max_entities > 0 and len(items.split(",")) > max_entities:
1009            raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.")
1010
1011        sanitized_items = []
1012        # handle telegram URLs
1013        for item in items.split(","):
1014            if not item.strip():
1015                continue
1016            item = re.sub(r"^https?://t\.me/", "", item)
1017            item = re.sub(r"^/?s/", "", item)
1018            item = re.sub(r"[/]*$", "", item)
1019            sanitized_items.append(item)
1020
1021        # the dates need to make sense as a range to search within
1022        min_date, max_date = query.get("daterange")
1023
1024        # now check if there is an active API session
1025        if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous:
1026            raise QueryParametersException("Telegram scraping is only available to logged-in users with personal "
1027                                           "accounts.")
1028
1029        # check for the information we need
1030        session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"),
1031                                                      query.get("api_hash"))
1032        config.user.set_value("telegram.session", session_id)
1033        session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session")
1034
1035        client = None
1036
1037        # API ID is always a number, if it's not, we can immediately fail
1038        try:
1039            api_id = int(query.get("api_id"))
1040        except ValueError:
1041            raise QueryParametersException("Invalid API ID.")
1042
1043        # maybe we've entered a code already and submitted it with the request
1044        if "option-security-code" in request.form and request.form.get("option-security-code").strip():
1045            code_callback = lambda: request.form.get("option-security-code")  # noqa: E731
1046            max_attempts = 1
1047        else:
1048            code_callback = lambda: -1  # noqa: E731
1049            # max_attempts = 0 because authing will always fail: we can't wait for
1050            # the code to be entered interactively, we'll need to do a new request
1051            # but we can't just immediately return, we still need to call start()
1052            # to get telegram to send us a code
1053            max_attempts = 0
1054
1055        # now try authenticating
1056        needs_code = False
1057        try:
1058            loop = asyncio.new_event_loop()
1059            asyncio.set_event_loop(loop)
1060            client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop)
1061
1062            try:
1063                client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback)
1064
1065            except ValueError as e:
1066                # this happens if 2FA is required
1067                raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time "
1068                                               f"does not support this authentication mode for Telegram. ({e})")
1069            except RuntimeError:
1070                # A code was sent to the given phone number
1071                needs_code = True
1072        except FloodWaitError as e:
1073            # uh oh, we got rate-limited
1074            raise QueryParametersException("You were rate-limited and should wait a while before trying again. " +
1075                                           str(e).split("(")[0] + ".")
1076        except ApiIdInvalidError:
1077            # wrong credentials
1078            raise QueryParametersException("Your API credentials are invalid.")
1079        except PhoneNumberInvalidError:
1080            # wrong phone number
1081            raise QueryParametersException(
1082                "The phone number provided is not a valid phone number for these credentials.")
1083        except RPCError as e:
1084            # only seen this with an 'UPDATE_APP_TO_LOGIN' status
1085            raise QueryParametersException(f"Could not verify your authentication. You may need to update your "
1086                                           f"Telegram app(s) to the latest version to proceed ({e}).")
1087        except Exception as e:
1088            # ?
1089            raise QueryParametersException(
1090                f"An unexpected error ({e}) occurred and your authentication could not be verified.")
1091        finally:
1092            if client:
1093                client.disconnect()
1094
1095        if needs_code:
1096            raise QueryNeedsFurtherInputException(config={
1097                "code-info": {
1098                    "type": UserInput.OPTION_INFO,
1099                    "help": "Please enter the security code that was sent to your Telegram app to continue."
1100                },
1101                "security-code": {
1102                    "type": UserInput.OPTION_TEXT,
1103                    "help": "Security code",
1104                    "sensitive": True
1105                }})
1106
1107        # simple!
1108        return {
1109            "items": num_items,
1110            "query": ",".join(sanitized_items),
1111            "api_id": query.get("api_id"),
1112            "api_hash": query.get("api_hash"),
1113            "api_phone": query.get("api_phone"),
1114            "save-session": query.get("save-session"),
1115            "resolve-entities": query.get("resolve-entities"),
1116            "min_date": min_date,
1117            "max_date": max_date,
1118            "crawl-depth": query.get("crawl-depth"),
1119            "crawl-threshold": query.get("crawl-threshold"),
1120            "crawl-via-links": query.get("crawl-via-links")
1121        }

Validate Telegram query

Parameters
  • config:
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
  • ConfigManager config: Configuration reader (context-aware)
Returns

Safe query parameters

@staticmethod
def create_session_id(api_phone, api_id, api_hash):
1123    @staticmethod
1124    def create_session_id(api_phone, api_id, api_hash):
1125        """
1126        Generate a filename for the session file
1127
1128        This is a combination of phone number and API credentials, but hashed
1129        so that one cannot actually derive someone's phone number from it.
1130
1131        :param str api_phone:  Phone number for API ID
1132        :param int api_id:  Telegram API ID
1133        :param str api_hash:  Telegram API Hash
1134        :return str: A hash value derived from the input
1135        """
1136        hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip()
1137        return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()

Generate a filename for the session file

This is a combination of phone number and API credentials, but hashed so that one cannot actually derive someone's phone number from it.

Parameters
  • str api_phone: Phone number for API ID
  • int api_id: Telegram API ID
  • str api_hash: Telegram API Hash
Returns

A hash value derived from the input