datasources.telegram.search_telegram
Search Telegram via API
1""" 2Search Telegram via API 3""" 4import traceback 5import hashlib 6import asyncio 7import json 8import ural 9import time 10import re 11 12from backend.lib.search import Search 13from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException, \ 14 QueryNeedsFurtherInputException 15from common.lib.helpers import convert_to_int, UserInput 16from common.lib.item_mapping import MappedItem, MissingMappedField 17 18from datetime import datetime 19from telethon import TelegramClient 20from telethon.errors.rpcerrorlist import UsernameInvalidError, TimeoutError, ChannelPrivateError, BadRequestError, \ 21 FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError 22from telethon.tl.functions.channels import GetFullChannelRequest 23from telethon.tl.functions.users import GetFullUserRequest 24from telethon.tl.types import MessageEntityMention 25 26 27 28class SearchTelegram(Search): 29 """ 30 Search Telegram via API 31 """ 32 type = "telegram-search" # job ID 33 category = "Search" # category 34 title = "Telegram API search" # title displayed in UI 35 description = "Scrapes messages from open Telegram groups via its API." # description displayed in UI 36 extension = "ndjson" # extension of result file, used internally and in UI 37 is_local = False # Whether this datasource is locally scraped 38 is_static = False # Whether this datasource is still updated 39 40 # cache 41 details_cache = None 42 failures_cache = None 43 eventloop = None 44 import_issues = 0 45 end_if_rate_limited = 600 # break if Telegram requires wait time above number of seconds 46 47 max_workers = 1 48 max_retries = 3 49 flawless = 0 50 51 config = { 52 "telegram-search.can_query_all_messages": { 53 "type": UserInput.OPTION_TOGGLE, 54 "help": "Remove message amount limit", 55 "default": False, 56 "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!" 57 }, 58 "telegram-search.max_entities": { 59 "type": UserInput.OPTION_TEXT, 60 "help": "Max entities to query", 61 "coerce_type": int, 62 "min": 0, 63 "default": 25, 64 "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to " 65 "disable limit." 66 }, 67 "telegram-search.max_crawl_depth": { 68 "type": UserInput.OPTION_TEXT, 69 "help": "Max crawl depth", 70 "coerce_type": int, 71 "min": 0, 72 "default": 0, 73 "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded " 74 "messages. Recommended to leave at 0 for most users since this can exponentially increase " 75 "dataset sizes." 76 } 77 } 78 79 @classmethod 80 def get_options(cls, parent_dataset=None, config=None): 81 """ 82 Get processor options 83 84 Just updates the description of the entities field based on the 85 configured max entities. 86 87 :param DataSet parent_dataset: An object representing the dataset that 88 the processor would be run on 89 :param ConfigManager|None config: Configuration reader (context-aware) 90 """ 91 92 max_entities = config.get("telegram-search.max_entities", 25) 93 options = { 94 "intro": { 95 "type": UserInput.OPTION_INFO, 96 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 97 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 98 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 99 "authentication for Telegram." 100 }, 101 "api_id": { 102 "type": UserInput.OPTION_TEXT, 103 "help": "API ID", 104 "cache": True, 105 }, 106 "api_hash": { 107 "type": UserInput.OPTION_TEXT, 108 "help": "API Hash", 109 "cache": True, 110 }, 111 "api_phone": { 112 "type": UserInput.OPTION_TEXT, 113 "help": "Phone number", 114 "cache": True, 115 "default": "+xxxxxxxxxx" 116 }, 117 "divider": { 118 "type": UserInput.OPTION_DIVIDER 119 }, 120 "query-intro": { 121 "type": UserInput.OPTION_INFO, 122 "help": "Separate with commas or line breaks." 123 }, 124 "query": { 125 "type": UserInput.OPTION_TEXT_LARGE, 126 "help": "Entities to scrape", 127 "tooltip": "Separate with commas or line breaks." 128 }, 129 "max_posts": { 130 "type": UserInput.OPTION_TEXT, 131 "help": "Messages per group", 132 "min": 1, 133 "max": 50000, 134 "default": 10 135 }, 136 "daterange": { 137 "type": UserInput.OPTION_DATERANGE, 138 "help": "Date range" 139 }, 140 "divider-2": { 141 "type": UserInput.OPTION_DIVIDER 142 }, 143 "info-sensitive": { 144 "type": UserInput.OPTION_INFO, 145 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 146 "there while data is fetched. After the dataset has been created your credentials will be " 147 "deleted from the server, unless you enable the option below. If you want to download images " 148 "attached to the messages in your collected data, you need to enable this option. Your " 149 "credentials will never be visible to other users and can be erased later via the result page." 150 }, 151 "save-session": { 152 "type": UserInput.OPTION_TOGGLE, 153 "help": "Save session:", 154 "default": False 155 }, 156 "resolve-entities-intro": { 157 "type": UserInput.OPTION_INFO, 158 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 159 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 160 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 161 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 162 "to finish capturing. It will also dramatically increase the disk space needed to store the " 163 "data, so only enable this if you really need it!" 164 }, 165 "resolve-entities": { 166 "type": UserInput.OPTION_TOGGLE, 167 "help": "Resolve references", 168 "default": False, 169 } 170 } 171 172 if max_entities: 173 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 174 f"(channels or groups) at a time. Separate with line breaks or commas.") 175 176 all_messages = config.get("telegram-search.can_query_all_messages", False) 177 if all_messages: 178 if "max" in options["max_posts"]: 179 del options["max_posts"]["max"] 180 else: 181 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 182 f"{options['max_posts']['max']:,} messages per entity.") 183 184 if config.get("telegram-search.max_crawl_depth", 0) > 0: 185 options["crawl_intro"] = { 186 "type": UserInput.OPTION_INFO, 187 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 188 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 189 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 190 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 191 "progress cannot be accurately tracked when you use this feature." 192 } 193 options["crawl-depth"] = { 194 "type": UserInput.OPTION_TEXT, 195 "coerce_type": int, 196 "min": 0, 197 "max": config.get("telegram-search.max_crawl_depth"), 198 "default": 0, 199 "help": "Crawl depth", 200 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 201 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 202 "starting entities." 203 } 204 options["crawl-threshold"] = { 205 "type": UserInput.OPTION_TEXT, 206 "coerce_type": int, 207 "min": 0, 208 "default": 5, 209 "help": "Crawl threshold", 210 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 211 "references discovered below the max crawl depth are taken into account." 212 } 213 options["crawl-via-links"] = { 214 "type": UserInput.OPTION_TOGGLE, 215 "default": False, 216 "help": "Extract new groups from links", 217 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 218 "This is more error-prone than crawling only via forwards, but can be a way to discover " 219 "links that would otherwise remain undetected." 220 } 221 222 return options 223 224 225 def get_items(self, query): 226 """ 227 Execute a query; get messages for given parameters 228 229 Basically a wrapper around execute_queries() to call it with asyncio. 230 231 :param dict query: Query parameters, as part of the DataSet object 232 :return list: Posts, sorted by thread and post ID, in ascending order 233 """ 234 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 235 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 236 "creating it again from scratch.", is_final=True) 237 return None 238 239 self.details_cache = {} 240 self.failures_cache = set() 241 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 242 results = asyncio.run(self.execute_queries()) 243 244 if not query.get("save-session"): 245 self.dataset.delete_parameter("api_hash", instant=True) 246 self.dataset.delete_parameter("api_phone", instant=True) 247 self.dataset.delete_parameter("api_id", instant=True) 248 249 if self.flawless: 250 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 251 "been private). View the log file for details.", is_final=True) 252 253 return results 254 255 async def execute_queries(self): 256 """ 257 Get messages for queries 258 259 This is basically what would be done in get_items(), except due to 260 Telethon's architecture this needs to be called in an async method, 261 which is this one. 262 263 :return list: Collected messages 264 """ 265 # session file has been created earlier, and we can re-use it here in 266 # order to avoid having to re-enter the security code 267 query = self.parameters 268 269 session_id = SearchTelegram.create_session_id(query["api_phone"].strip(), 270 query["api_id"].strip(), 271 query["api_hash"].strip()) 272 self.dataset.log(f'Telegram session id: {session_id}') 273 session_path = self.config.get("PATH_SESSIONS").joinpath(session_id + ".session") 274 275 client = None 276 277 try: 278 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 279 loop=self.eventloop) 280 await client.start(phone=SearchTelegram.cancel_start) 281 except RuntimeError: 282 # session is no longer useable, delete file so user will be asked 283 # for security code again. The RuntimeError is raised by 284 # `cancel_start()` 285 self.dataset.update_status( 286 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 287 is_final=True) 288 289 if client and hasattr(client, "disconnect"): 290 await client.disconnect() 291 292 if session_path.exists(): 293 session_path.unlink() 294 295 return [] 296 except Exception as e: 297 # not sure what exception specifically is triggered here, but it 298 # always means the connection failed 299 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 300 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 301 if client and hasattr(client, "disconnect"): 302 await client.disconnect() 303 return [] 304 305 # ready our parameters 306 parameters = self.dataset.get_parameters() 307 queries = [query.strip() for query in parameters.get("query", "").split(",")] 308 max_items = convert_to_int(parameters.get("items", 10), 10) 309 310 # Telethon requires the offset date to be a datetime date 311 max_date = parameters.get("max_date") 312 if max_date: 313 try: 314 max_date = datetime.fromtimestamp(int(max_date)) 315 except ValueError: 316 max_date = None 317 318 # min_date can remain an integer 319 min_date = parameters.get("min_date") 320 if min_date: 321 try: 322 min_date = int(min_date) 323 except ValueError: 324 min_date = None 325 326 posts = [] 327 try: 328 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 329 posts.append(post) 330 return posts 331 except ProcessorInterruptedException as e: 332 raise e 333 except Exception: 334 # catch-all so we can disconnect properly 335 # ...should we? 336 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 337 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 338 # May as well return what was captured, yes? 339 return posts 340 finally: 341 await client.disconnect() 342 343 async def gather_posts(self, client, queries, max_items, min_date, max_date): 344 """ 345 Gather messages for each entity for which messages are requested 346 347 :param TelegramClient client: Telegram Client 348 :param list queries: List of entities to query (as string) 349 :param int max_items: Messages to scrape per entity 350 :param int min_date: Datetime date to get posts after 351 :param int max_date: Datetime date to get posts before 352 :return list: List of messages, each message a dictionary. 353 """ 354 resolve_refs = self.parameters.get("resolve-entities") 355 356 # Adding flag to stop; using for rate limits 357 no_additional_queries = False 358 359 # This is used for the 'crawl' feature so we know at which depth a 360 # given entity was discovered 361 depth_map = { 362 entity: 0 for entity in queries 363 } 364 365 crawl_max_depth = self.parameters.get("crawl-depth", 0) 366 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 367 crawl_via_links = self.parameters.get("crawl-via-links", False) 368 369 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 370 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 371 372 # this keeps track of how often an entity not in the original query 373 # has been mentioned. When crawling is enabled and this exceeds the 374 # given threshold, the entity is added to the query 375 crawl_references = {} 376 full_query = set(queries) 377 num_queries = len(queries) 378 379 # we may not always know the 'entity username' for an entity ID, so 380 # keep a reference map as we go 381 entity_id_map = {} 382 383 # Collect queries 384 # Use while instead of for so we can change queries during iteration 385 # this is needed for the 'crawl' feature which can discover new 386 # entities during crawl 387 processed = 0 388 total_messages = 0 389 while queries: 390 query = queries.pop(0) 391 392 delay = 10 393 retries = 0 394 processed += 1 395 self.dataset.update_progress(processed / num_queries) 396 397 if no_additional_queries: 398 # Note that we are not completing this query 399 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 400 continue 401 402 while True: 403 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 404 entity_posts = 0 405 discovered = 0 406 try: 407 async for message in client.iter_messages(entity=query, offset_date=max_date): 408 entity_posts += 1 409 total_messages += 1 410 if self.interrupted: 411 raise ProcessorInterruptedException( 412 "Interrupted while fetching message data from the Telegram API") 413 414 if entity_posts % 100 == 0: 415 self.dataset.update_status( 416 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 417 418 if message.action is not None: 419 # e.g. someone joins the channel - not an actual message 420 continue 421 422 # todo: possibly enrich object with e.g. the name of 423 # the channel a message was forwarded from (but that 424 # needs extra API requests...) 425 serialized_message = SearchTelegram.serialize_obj(message) 426 if "_chat" in serialized_message: 427 # Add query ID to check if queries have been crawled previously 428 full_query.add(serialized_message["_chat"]["id"]) 429 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 430 # once we know what a channel ID resolves to, use the username instead so it is easier to 431 # understand for the user 432 entity_id_map[query] = serialized_message["_chat"]["username"] 433 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 434 435 if resolve_refs: 436 serialized_message = await self.resolve_groups(client, serialized_message) 437 438 # Stop if we're below the min date 439 if min_date and serialized_message.get("date") < min_date: 440 break 441 442 # if crawling is enabled, see if we found something to add to the query 443 linked_entities = set() 444 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 445 message_fwd = serialized_message.get("fwd_from") 446 fwd_from = None 447 fwd_source_type = None 448 if message_fwd and message_fwd.get("from_id"): 449 if message_fwd["from_id"].get("_type") == "PeerChannel": 450 # Legacy(?) data structure (pre 2024/7/22) 451 # even if we haven't resolved the ID, we can feed the numeric ID 452 # to Telethon! this is nice because it means we don't have to 453 # resolve entities to crawl iteratively 454 fwd_from = int(message_fwd["from_id"]["channel_id"]) 455 fwd_source_type = "channel" 456 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 457 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 458 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 459 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 460 fwd_source_type = "channel" 461 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 462 # forwards can also come from users 463 # these can never be followed, so don't add these to the crawl, but do document them 464 fwd_source_type = "user" 465 else: 466 print(json.dumps(message_fwd)) 467 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 468 fwd_source_type = "unknown" 469 470 if fwd_from: 471 linked_entities.add(fwd_from) 472 473 474 if crawl_via_links: 475 # t.me links 476 all_links = ural.urls_from_text(serialized_message["message"]) 477 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 478 for link in all_links: 479 if link.startswith("+"): 480 # invite links 481 continue 482 483 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 484 linked_entities.add(entity_name) 485 486 # @references 487 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 488 for reference in references: 489 if reference.startswith("@"): 490 reference = reference[1:] 491 492 reference = reference.split("/")[0] 493 494 linked_entities.add(reference) 495 496 # Check if fwd_from or the resolved entity ID is already queued or has been queried 497 for link in linked_entities: 498 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 499 # new entity discovered! 500 # might be discovered (before collection) multiple times, so retain lowest depth 501 # print(f"Potentially crawling {link}") 502 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 503 if link not in crawl_references: 504 crawl_references[link] = 0 505 crawl_references[link] += 1 506 507 # Add to queries if it has been referenced enough times 508 if crawl_references[link] >= crawl_msg_threshold: 509 queries.append(link) 510 full_query.add(link) 511 num_queries += 1 512 discovered += 1 513 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 514 515 516 517 serialized_message["4CAT_metadata"] = { 518 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 519 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 520 "query_depth": depth_map.get(query, 0) 521 } 522 yield serialized_message 523 524 if entity_posts >= max_items: 525 break 526 527 except ChannelPrivateError: 528 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 529 self.flawless += 1 530 531 except (UsernameInvalidError,): 532 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 533 self.flawless += 1 534 535 except FloodWaitError as e: 536 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 537 if e.seconds < self.end_if_rate_limited: 538 time.sleep(e.seconds) 539 continue 540 else: 541 self.flawless += 1 542 no_additional_queries = True 543 self.dataset.update_status( 544 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 545 break 546 547 except BadRequestError as e: 548 self.dataset.update_status( 549 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 550 self.flawless += 1 551 552 except ValueError as e: 553 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 554 self.flawless += 1 555 556 except ChannelPrivateError as e: 557 self.dataset.update_status( 558 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 559 break 560 561 except TimeoutError: 562 if retries < 3: 563 self.dataset.update_status( 564 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 565 self.flawless += 1 566 break 567 568 self.dataset.update_status( 569 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 570 time.sleep(delay) 571 delay *= 2 572 continue 573 574 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 575 break 576 577 async def resolve_groups(self, client, message): 578 """ 579 Recursively resolve references to groups and users 580 581 :param client: Telethon client instance 582 :param dict message: Message, as already mapped by serialize_obj 583 :return: Resolved dictionary 584 """ 585 resolved_message = message.copy() 586 for key, value in message.items(): 587 try: 588 if type(value) is not dict: 589 # if it's not a dict, we never have to resolve it, as it 590 # does not represent an entity 591 continue 592 593 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 594 # forwarded from a channel! 595 if value["channel_id"] in self.failures_cache: 596 continue 597 598 if value["channel_id"] not in self.details_cache: 599 channel = await client(GetFullChannelRequest(value["channel_id"])) 600 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 601 602 resolved_message[key] = self.details_cache[value["channel_id"]] 603 resolved_message[key]["channel_id"] = value["channel_id"] 604 605 elif "_type" in value and value["_type"] == "PeerUser": 606 # a user! 607 if value["user_id"] in self.failures_cache: 608 continue 609 610 if value["user_id"] not in self.details_cache: 611 user = await client(GetFullUserRequest(value["user_id"])) 612 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 613 614 resolved_message[key] = self.details_cache[value["user_id"]] 615 resolved_message[key]["user_id"] = value["user_id"] 616 else: 617 resolved_message[key] = await self.resolve_groups(client, value) 618 619 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 620 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 621 if type(e) in (ChannelPrivateError, UsernameInvalidError): 622 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 623 else: 624 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 625 626 return resolved_message 627 628 @staticmethod 629 def cancel_start(): 630 """ 631 Replace interactive phone number input in Telethon 632 633 By default, if Telethon cannot use the given session file to 634 authenticate, it will interactively prompt the user for a phone 635 number on the command line. That is not useful here, so instead 636 raise a RuntimeError. This will be caught below and the user will 637 be told they need to re-authenticate via 4CAT. 638 """ 639 raise RuntimeError("Connection cancelled") 640 641 @staticmethod 642 def map_item(message): 643 """ 644 Convert Message object to 4CAT-ready data object 645 646 :param Message message: Message to parse 647 :return dict: 4CAT-compatible item object 648 """ 649 if message["_chat"]["username"]: 650 # chats can apparently not have usernames??? 651 # truly telegram objects are way too lenient for their own good 652 thread = message["_chat"]["username"] 653 elif message["_chat"]["title"]: 654 thread = re.sub(r"\s", "", message["_chat"]["title"]) 655 else: 656 # just give up 657 thread = "unknown" 658 659 # determine username 660 # API responses only include the user *ID*, not the username, and to 661 # complicate things further not everyone is a user and not everyone 662 # has a username. If no username is available, try the first and 663 # last name someone has supplied 664 fullname = "" 665 username = "" 666 user_id = message["_sender"]["id"] if message.get("_sender") else "" 667 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 668 669 if message.get("_sender") and message["_sender"].get("username"): 670 username = message["_sender"]["username"] 671 672 if message.get("_sender") and message["_sender"].get("first_name"): 673 fullname += message["_sender"]["first_name"] 674 675 if message.get("_sender") and message["_sender"].get("last_name"): 676 fullname += " " + message["_sender"]["last_name"] 677 678 fullname = fullname.strip() 679 680 # determine media type 681 # these store some extra information of the attachment in 682 # attachment_data. Since the final result will be serialised as a csv 683 # file, we can only store text content. As such some media data is 684 # serialised as JSON. 685 attachment_type = SearchTelegram.get_media_type(message["media"]) 686 attachment_filename = "" 687 688 if attachment_type == "contact": 689 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 690 if message["media"].get('contact', False): 691 # Old datastructure 692 attachment = message["media"]["contact"] 693 elif all([property in message["media"].keys() for property in contact_data]): 694 # New datastructure 2022/7/25 695 attachment = message["media"] 696 else: 697 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 698 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 699 700 elif attachment_type == "document": 701 # videos, etc 702 # This could add a separate routine for videos to make them a 703 # separate type, which could then be scraped later, etc 704 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 705 if attachment_type == "video": 706 attachment = message["media"]["document"] 707 attachment_data = json.dumps({ 708 "id": attachment["id"], 709 "dc_id": attachment["dc_id"], 710 "file_reference": attachment["file_reference"], 711 }) 712 else: 713 attachment_data = "" 714 715 # elif attachment_type in ("geo", "geo_live"): 716 # untested whether geo_live is significantly different from geo 717 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 718 719 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 720 # we don't actually store any metadata about the photo, since very 721 # little of the metadata attached is of interest. Instead, the 722 # actual photos may be downloaded via a processor that is run on the 723 # search results 724 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 725 attachment_data = json.dumps({ 726 "id": attachment["id"], 727 "dc_id": attachment["dc_id"], 728 "file_reference": attachment["file_reference"], 729 }) 730 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 731 732 elif attachment_type == "poll": 733 # unfortunately poll results are only available when someone has 734 # actually voted on the poll - that will usually not be the case, 735 # so we store -1 as the vote count 736 attachment = message["media"] 737 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 738 attachment_data = json.dumps({ 739 "question": attachment["poll"]["question"], 740 "voters": attachment["results"]["total_voters"], 741 "answers": [{ 742 "answer": options[answer["option"]], 743 "votes": answer["voters"] 744 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 745 "answer": options[option], 746 "votes": -1 747 } for option in options] 748 }) 749 750 else: 751 attachment_data = "" 752 753 # was the message forwarded from somewhere and if so when? 754 forwarded_timestamp = "" 755 forwarded_name = "" 756 forwarded_id = "" 757 forwarded_username = "" 758 if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int: 759 # forward information is spread out over a lot of places 760 # we can identify, in order of usefulness: username, full name, 761 # and ID. But not all of these are always available, and not 762 # always in the same place either 763 forwarded_timestamp = int(message["fwd_from"]["date"]) 764 from_data = message["fwd_from"]["from_id"] 765 766 if from_data: 767 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 768 769 if message["fwd_from"].get("from_name"): 770 forwarded_name = message["fwd_from"].get("from_name") 771 772 if from_data and from_data.get("from_name"): 773 forwarded_name = message["fwd_from"]["from_name"] 774 775 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 776 from_data["user"] = from_data["users"][0] 777 778 if from_data and ("user" in from_data or "chats" in from_data): 779 # 'resolve entities' was enabled for this dataset 780 if "user" in from_data: 781 if from_data["user"].get("username"): 782 forwarded_username = from_data["user"]["username"] 783 784 if from_data["user"].get("first_name"): 785 forwarded_name = from_data["user"]["first_name"] 786 if message["fwd_from"].get("last_name"): 787 forwarded_name += " " + from_data["user"]["last_name"] 788 789 forwarded_name = forwarded_name.strip() 790 791 elif "chats" in from_data: 792 channel_id = from_data.get("channel_id") 793 for chat in from_data["chats"]: 794 if chat["id"] == channel_id or channel_id is None: 795 forwarded_username = chat["username"] 796 797 elif message.get("_forward") and message["_forward"].get("_chat"): 798 if message["_forward"]["_chat"].get("username"): 799 forwarded_username = message["_forward"]["_chat"]["username"] 800 801 if message["_forward"]["_chat"].get("title"): 802 forwarded_name = message["_forward"]["_chat"]["title"] 803 804 link_title = "" 805 link_attached = "" 806 link_description = "" 807 reactions = "" 808 809 if message.get("media") and message["media"].get("webpage"): 810 link_title = message["media"]["webpage"].get("title") 811 link_attached = message["media"]["webpage"].get("url") 812 link_description = message["media"]["webpage"].get("description") 813 814 if message.get("reactions") and message["reactions"].get("results"): 815 for reaction in message["reactions"]["results"]: 816 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 817 # Updated to support new reaction datastructure 818 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 819 elif type(reaction["reaction"]) is str and "count" in reaction: 820 reactions += reaction["reaction"] * reaction["count"] 821 else: 822 # Failsafe; can be updated to support formatting of new datastructures in the future 823 reactions += f"{reaction}, " 824 825 is_reply = False 826 reply_to = "" 827 if message.get("reply_to"): 828 is_reply = True 829 reply_to = message["reply_to"].get("reply_to_msg_id", "") 830 831 # t.me links 832 linked_entities = set() 833 all_links = ural.urls_from_text(message["message"]) 834 all_links = [link.split("t.me/")[1] for link in all_links if 835 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 836 837 for link in all_links: 838 if link.startswith("+"): 839 # invite links 840 continue 841 842 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 843 linked_entities.add(entity_name) 844 845 # @references 846 # in execute_queries we use MessageEntityMention to get these 847 # however, after serializing these objects we only have the offsets of 848 # the mentioned username, and telegram does weird unicode things to its 849 # offsets meaning we can't just substring the message. So use a regex 850 # as a 'good enough' solution 851 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 852 853 # make this case-insensitive since people may use different casing in 854 # messages than the 'official' username for example 855 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 856 all_ci_connections = set() 857 seen = set() 858 for connection in all_connections: 859 if connection.lower() not in seen: 860 all_ci_connections.add(connection) 861 seen.add(connection.lower()) 862 863 return MappedItem({ 864 "id": f"{message['_chat']['username']}-{message['id']}", 865 "thread_id": thread, 866 "chat": message["_chat"]["username"], 867 "author": user_id, 868 "author_username": username, 869 "author_name": fullname, 870 "author_is_bot": "yes" if user_is_bot else "no", 871 "body": message["message"], 872 "body_markdown": message.get("message_markdown", MissingMappedField("")), 873 "is_reply": is_reply, 874 "reply_to": reply_to, 875 "views": message["views"] if message["views"] else "", 876 # "forwards": message.get("forwards", MissingMappedField(0)), 877 "reactions": reactions, 878 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 879 "unix_timestamp": int(message["date"]), 880 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 881 "edit_date"] else "", 882 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 883 "author_forwarded_from_name": forwarded_name, 884 "author_forwarded_from_username": forwarded_username, 885 "author_forwarded_from_id": forwarded_id, 886 "entities_linked": ",".join(linked_entities), 887 "entities_mentioned": ",".join(all_mentions), 888 "all_connections": ",".join(all_ci_connections), 889 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 890 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 891 "unix_timestamp_forwarded_from": forwarded_timestamp, 892 "link_title": link_title, 893 "link_description": link_description, 894 "link_attached": link_attached, 895 "attachment_type": attachment_type, 896 "attachment_data": attachment_data, 897 "attachment_filename": attachment_filename 898 }) 899 900 @staticmethod 901 def get_media_type(media): 902 """ 903 Get media type for a Telegram attachment 904 905 :param media: Media object 906 :return str: Textual identifier of the media type 907 """ 908 try: 909 return { 910 "NoneType": "", 911 "MessageMediaContact": "contact", 912 "MessageMediaDocument": "document", 913 "MessageMediaEmpty": "", 914 "MessageMediaGame": "game", 915 "MessageMediaGeo": "geo", 916 "MessageMediaGeoLive": "geo_live", 917 "MessageMediaInvoice": "invoice", 918 "MessageMediaPhoto": "photo", 919 "MessageMediaPoll": "poll", 920 "MessageMediaUnsupported": "unsupported", 921 "MessageMediaVenue": "venue", 922 "MessageMediaWebPage": "url" 923 }[media.get("_type", None)] 924 except (AttributeError, KeyError): 925 return "" 926 927 @staticmethod 928 def serialize_obj(input_obj): 929 """ 930 Serialize an object as a dictionary 931 932 Telethon message objects are not serializable by themselves, but most 933 relevant attributes are simply struct classes. This function replaces 934 those that are not with placeholders and then returns a dictionary that 935 can be serialized as JSON. 936 937 :param obj: Object to serialize 938 :return: Serialized object 939 """ 940 scalars = (int, str, float, list, tuple, set, bool) 941 942 if type(input_obj) in scalars or input_obj is None: 943 return input_obj 944 945 if type(input_obj) is not dict: 946 obj = input_obj.__dict__ 947 else: 948 obj = input_obj.copy() 949 950 mapped_obj = {} 951 for item, value in obj.items(): 952 if type(value) is datetime: 953 mapped_obj[item] = value.timestamp() 954 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 955 mapped_obj[item] = SearchTelegram.serialize_obj(value) 956 elif type(value) is list: 957 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 958 elif type(value) is bytes: 959 mapped_obj[item] = value.hex() 960 elif type(value) not in scalars and value is not None: 961 # type we can't make sense of here 962 continue 963 else: 964 mapped_obj[item] = value 965 966 # Add the _type if the original object was a telethon type 967 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 968 mapped_obj["_type"] = type(input_obj).__name__ 969 970 # Store the markdown-formatted text 971 if type(input_obj).__name__ == "Message": 972 mapped_obj["message_markdown"] = input_obj.text 973 974 return mapped_obj 975 976 @staticmethod 977 def validate_query(query, request, config): 978 """ 979 Validate Telegram query 980 981 :param config: 982 :param dict query: Query parameters, from client-side. 983 :param request: Flask request 984 :param User user: User object of user who has submitted the query 985 :param ConfigManager config: Configuration reader (context-aware) 986 :return dict: Safe query parameters 987 """ 988 # no query 4 u 989 if not query.get("query", "").strip(): 990 raise QueryParametersException("You must provide a search query.") 991 992 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 993 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 994 995 all_posts = config.get("telegram-search.can_query_all_messages", False) 996 max_entities = config.get("telegram-search.max_entities", 25) 997 998 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options( 999 config=config)["max_posts"]["max"]) 1000 1001 # reformat queries to be a comma-separated list with no wrapping 1002 # whitespace 1003 whitespace = re.compile(r"\s+") 1004 items = whitespace.sub("", query.get("query").replace("\n", ",")) 1005 if max_entities > 0 and len(items.split(",")) > max_entities: 1006 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 1007 1008 sanitized_items = [] 1009 # handle telegram URLs 1010 for item in items.split(","): 1011 if not item.strip(): 1012 continue 1013 item = re.sub(r"^https?://t\.me/", "", item) 1014 item = re.sub(r"^/?s/", "", item) 1015 item = re.sub(r"[/]*$", "", item) 1016 sanitized_items.append(item) 1017 1018 # the dates need to make sense as a range to search within 1019 min_date, max_date = query.get("daterange") 1020 1021 # now check if there is an active API session 1022 if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous: 1023 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1024 "accounts.") 1025 1026 # check for the information we need 1027 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1028 query.get("api_hash")) 1029 config.user.set_value("telegram.session", session_id) 1030 session_path = config.get('PATH_SESSIONS').joinpath(session_id + ".session") 1031 1032 client = None 1033 1034 # API ID is always a number, if it's not, we can immediately fail 1035 try: 1036 api_id = int(query.get("api_id")) 1037 except ValueError: 1038 raise QueryParametersException("Invalid API ID.") 1039 1040 # maybe we've entered a code already and submitted it with the request 1041 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1042 code_callback = lambda: request.form.get("option-security-code") # noqa: E731 1043 max_attempts = 1 1044 else: 1045 code_callback = lambda: -1 # noqa: E731 1046 # max_attempts = 0 because authing will always fail: we can't wait for 1047 # the code to be entered interactively, we'll need to do a new request 1048 # but we can't just immediately return, we still need to call start() 1049 # to get telegram to send us a code 1050 max_attempts = 0 1051 1052 # now try authenticating 1053 needs_code = False 1054 try: 1055 loop = asyncio.new_event_loop() 1056 asyncio.set_event_loop(loop) 1057 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1058 1059 try: 1060 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1061 1062 except ValueError as e: 1063 # this happens if 2FA is required 1064 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1065 f"does not support this authentication mode for Telegram. ({e})") 1066 except RuntimeError: 1067 # A code was sent to the given phone number 1068 needs_code = True 1069 except FloodWaitError as e: 1070 # uh oh, we got rate-limited 1071 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1072 str(e).split("(")[0] + ".") 1073 except ApiIdInvalidError: 1074 # wrong credentials 1075 raise QueryParametersException("Your API credentials are invalid.") 1076 except PhoneNumberInvalidError: 1077 # wrong phone number 1078 raise QueryParametersException( 1079 "The phone number provided is not a valid phone number for these credentials.") 1080 except RPCError as e: 1081 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1082 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1083 f"Telegram app(s) to the latest version to proceed ({e}).") 1084 except Exception as e: 1085 # ? 1086 raise QueryParametersException( 1087 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1088 finally: 1089 if client: 1090 client.disconnect() 1091 1092 if needs_code: 1093 raise QueryNeedsFurtherInputException(config={ 1094 "code-info": { 1095 "type": UserInput.OPTION_INFO, 1096 "help": "Please enter the security code that was sent to your Telegram app to continue." 1097 }, 1098 "security-code": { 1099 "type": UserInput.OPTION_TEXT, 1100 "help": "Security code", 1101 "sensitive": True 1102 }}) 1103 1104 # simple! 1105 return { 1106 "items": num_items, 1107 "query": ",".join(sanitized_items), 1108 "api_id": query.get("api_id"), 1109 "api_hash": query.get("api_hash"), 1110 "api_phone": query.get("api_phone"), 1111 "save-session": query.get("save-session"), 1112 "resolve-entities": query.get("resolve-entities"), 1113 "min_date": min_date, 1114 "max_date": max_date, 1115 "crawl-depth": query.get("crawl-depth"), 1116 "crawl-threshold": query.get("crawl-threshold"), 1117 "crawl-via-links": query.get("crawl-via-links") 1118 } 1119 1120 @staticmethod 1121 def create_session_id(api_phone, api_id, api_hash): 1122 """ 1123 Generate a filename for the session file 1124 1125 This is a combination of phone number and API credentials, but hashed 1126 so that one cannot actually derive someone's phone number from it. 1127 1128 :param str api_phone: Phone number for API ID 1129 :param int api_id: Telegram API ID 1130 :param str api_hash: Telegram API Hash 1131 :return str: A hash value derived from the input 1132 """ 1133 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1134 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
29class SearchTelegram(Search): 30 """ 31 Search Telegram via API 32 """ 33 type = "telegram-search" # job ID 34 category = "Search" # category 35 title = "Telegram API search" # title displayed in UI 36 description = "Scrapes messages from open Telegram groups via its API." # description displayed in UI 37 extension = "ndjson" # extension of result file, used internally and in UI 38 is_local = False # Whether this datasource is locally scraped 39 is_static = False # Whether this datasource is still updated 40 41 # cache 42 details_cache = None 43 failures_cache = None 44 eventloop = None 45 import_issues = 0 46 end_if_rate_limited = 600 # break if Telegram requires wait time above number of seconds 47 48 max_workers = 1 49 max_retries = 3 50 flawless = 0 51 52 config = { 53 "telegram-search.can_query_all_messages": { 54 "type": UserInput.OPTION_TOGGLE, 55 "help": "Remove message amount limit", 56 "default": False, 57 "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!" 58 }, 59 "telegram-search.max_entities": { 60 "type": UserInput.OPTION_TEXT, 61 "help": "Max entities to query", 62 "coerce_type": int, 63 "min": 0, 64 "default": 25, 65 "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to " 66 "disable limit." 67 }, 68 "telegram-search.max_crawl_depth": { 69 "type": UserInput.OPTION_TEXT, 70 "help": "Max crawl depth", 71 "coerce_type": int, 72 "min": 0, 73 "default": 0, 74 "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded " 75 "messages. Recommended to leave at 0 for most users since this can exponentially increase " 76 "dataset sizes." 77 } 78 } 79 80 @classmethod 81 def get_options(cls, parent_dataset=None, config=None): 82 """ 83 Get processor options 84 85 Just updates the description of the entities field based on the 86 configured max entities. 87 88 :param DataSet parent_dataset: An object representing the dataset that 89 the processor would be run on 90 :param ConfigManager|None config: Configuration reader (context-aware) 91 """ 92 93 max_entities = config.get("telegram-search.max_entities", 25) 94 options = { 95 "intro": { 96 "type": UserInput.OPTION_INFO, 97 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 98 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 99 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 100 "authentication for Telegram." 101 }, 102 "api_id": { 103 "type": UserInput.OPTION_TEXT, 104 "help": "API ID", 105 "cache": True, 106 }, 107 "api_hash": { 108 "type": UserInput.OPTION_TEXT, 109 "help": "API Hash", 110 "cache": True, 111 }, 112 "api_phone": { 113 "type": UserInput.OPTION_TEXT, 114 "help": "Phone number", 115 "cache": True, 116 "default": "+xxxxxxxxxx" 117 }, 118 "divider": { 119 "type": UserInput.OPTION_DIVIDER 120 }, 121 "query-intro": { 122 "type": UserInput.OPTION_INFO, 123 "help": "Separate with commas or line breaks." 124 }, 125 "query": { 126 "type": UserInput.OPTION_TEXT_LARGE, 127 "help": "Entities to scrape", 128 "tooltip": "Separate with commas or line breaks." 129 }, 130 "max_posts": { 131 "type": UserInput.OPTION_TEXT, 132 "help": "Messages per group", 133 "min": 1, 134 "max": 50000, 135 "default": 10 136 }, 137 "daterange": { 138 "type": UserInput.OPTION_DATERANGE, 139 "help": "Date range" 140 }, 141 "divider-2": { 142 "type": UserInput.OPTION_DIVIDER 143 }, 144 "info-sensitive": { 145 "type": UserInput.OPTION_INFO, 146 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 147 "there while data is fetched. After the dataset has been created your credentials will be " 148 "deleted from the server, unless you enable the option below. If you want to download images " 149 "attached to the messages in your collected data, you need to enable this option. Your " 150 "credentials will never be visible to other users and can be erased later via the result page." 151 }, 152 "save-session": { 153 "type": UserInput.OPTION_TOGGLE, 154 "help": "Save session:", 155 "default": False 156 }, 157 "resolve-entities-intro": { 158 "type": UserInput.OPTION_INFO, 159 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 160 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 161 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 162 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 163 "to finish capturing. It will also dramatically increase the disk space needed to store the " 164 "data, so only enable this if you really need it!" 165 }, 166 "resolve-entities": { 167 "type": UserInput.OPTION_TOGGLE, 168 "help": "Resolve references", 169 "default": False, 170 } 171 } 172 173 if max_entities: 174 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 175 f"(channels or groups) at a time. Separate with line breaks or commas.") 176 177 all_messages = config.get("telegram-search.can_query_all_messages", False) 178 if all_messages: 179 if "max" in options["max_posts"]: 180 del options["max_posts"]["max"] 181 else: 182 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 183 f"{options['max_posts']['max']:,} messages per entity.") 184 185 if config.get("telegram-search.max_crawl_depth", 0) > 0: 186 options["crawl_intro"] = { 187 "type": UserInput.OPTION_INFO, 188 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 189 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 190 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 191 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 192 "progress cannot be accurately tracked when you use this feature." 193 } 194 options["crawl-depth"] = { 195 "type": UserInput.OPTION_TEXT, 196 "coerce_type": int, 197 "min": 0, 198 "max": config.get("telegram-search.max_crawl_depth"), 199 "default": 0, 200 "help": "Crawl depth", 201 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 202 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 203 "starting entities." 204 } 205 options["crawl-threshold"] = { 206 "type": UserInput.OPTION_TEXT, 207 "coerce_type": int, 208 "min": 0, 209 "default": 5, 210 "help": "Crawl threshold", 211 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 212 "references discovered below the max crawl depth are taken into account." 213 } 214 options["crawl-via-links"] = { 215 "type": UserInput.OPTION_TOGGLE, 216 "default": False, 217 "help": "Extract new groups from links", 218 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 219 "This is more error-prone than crawling only via forwards, but can be a way to discover " 220 "links that would otherwise remain undetected." 221 } 222 223 return options 224 225 226 def get_items(self, query): 227 """ 228 Execute a query; get messages for given parameters 229 230 Basically a wrapper around execute_queries() to call it with asyncio. 231 232 :param dict query: Query parameters, as part of the DataSet object 233 :return list: Posts, sorted by thread and post ID, in ascending order 234 """ 235 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 236 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 237 "creating it again from scratch.", is_final=True) 238 return None 239 240 self.details_cache = {} 241 self.failures_cache = set() 242 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 243 results = asyncio.run(self.execute_queries()) 244 245 if not query.get("save-session"): 246 self.dataset.delete_parameter("api_hash", instant=True) 247 self.dataset.delete_parameter("api_phone", instant=True) 248 self.dataset.delete_parameter("api_id", instant=True) 249 250 if self.flawless: 251 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 252 "been private). View the log file for details.", is_final=True) 253 254 return results 255 256 async def execute_queries(self): 257 """ 258 Get messages for queries 259 260 This is basically what would be done in get_items(), except due to 261 Telethon's architecture this needs to be called in an async method, 262 which is this one. 263 264 :return list: Collected messages 265 """ 266 # session file has been created earlier, and we can re-use it here in 267 # order to avoid having to re-enter the security code 268 query = self.parameters 269 270 session_id = SearchTelegram.create_session_id(query["api_phone"].strip(), 271 query["api_id"].strip(), 272 query["api_hash"].strip()) 273 self.dataset.log(f'Telegram session id: {session_id}') 274 session_path = self.config.get("PATH_SESSIONS").joinpath(session_id + ".session") 275 276 client = None 277 278 try: 279 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 280 loop=self.eventloop) 281 await client.start(phone=SearchTelegram.cancel_start) 282 except RuntimeError: 283 # session is no longer useable, delete file so user will be asked 284 # for security code again. The RuntimeError is raised by 285 # `cancel_start()` 286 self.dataset.update_status( 287 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 288 is_final=True) 289 290 if client and hasattr(client, "disconnect"): 291 await client.disconnect() 292 293 if session_path.exists(): 294 session_path.unlink() 295 296 return [] 297 except Exception as e: 298 # not sure what exception specifically is triggered here, but it 299 # always means the connection failed 300 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 301 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 302 if client and hasattr(client, "disconnect"): 303 await client.disconnect() 304 return [] 305 306 # ready our parameters 307 parameters = self.dataset.get_parameters() 308 queries = [query.strip() for query in parameters.get("query", "").split(",")] 309 max_items = convert_to_int(parameters.get("items", 10), 10) 310 311 # Telethon requires the offset date to be a datetime date 312 max_date = parameters.get("max_date") 313 if max_date: 314 try: 315 max_date = datetime.fromtimestamp(int(max_date)) 316 except ValueError: 317 max_date = None 318 319 # min_date can remain an integer 320 min_date = parameters.get("min_date") 321 if min_date: 322 try: 323 min_date = int(min_date) 324 except ValueError: 325 min_date = None 326 327 posts = [] 328 try: 329 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 330 posts.append(post) 331 return posts 332 except ProcessorInterruptedException as e: 333 raise e 334 except Exception: 335 # catch-all so we can disconnect properly 336 # ...should we? 337 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 338 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 339 # May as well return what was captured, yes? 340 return posts 341 finally: 342 await client.disconnect() 343 344 async def gather_posts(self, client, queries, max_items, min_date, max_date): 345 """ 346 Gather messages for each entity for which messages are requested 347 348 :param TelegramClient client: Telegram Client 349 :param list queries: List of entities to query (as string) 350 :param int max_items: Messages to scrape per entity 351 :param int min_date: Datetime date to get posts after 352 :param int max_date: Datetime date to get posts before 353 :return list: List of messages, each message a dictionary. 354 """ 355 resolve_refs = self.parameters.get("resolve-entities") 356 357 # Adding flag to stop; using for rate limits 358 no_additional_queries = False 359 360 # This is used for the 'crawl' feature so we know at which depth a 361 # given entity was discovered 362 depth_map = { 363 entity: 0 for entity in queries 364 } 365 366 crawl_max_depth = self.parameters.get("crawl-depth", 0) 367 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 368 crawl_via_links = self.parameters.get("crawl-via-links", False) 369 370 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 371 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 372 373 # this keeps track of how often an entity not in the original query 374 # has been mentioned. When crawling is enabled and this exceeds the 375 # given threshold, the entity is added to the query 376 crawl_references = {} 377 full_query = set(queries) 378 num_queries = len(queries) 379 380 # we may not always know the 'entity username' for an entity ID, so 381 # keep a reference map as we go 382 entity_id_map = {} 383 384 # Collect queries 385 # Use while instead of for so we can change queries during iteration 386 # this is needed for the 'crawl' feature which can discover new 387 # entities during crawl 388 processed = 0 389 total_messages = 0 390 while queries: 391 query = queries.pop(0) 392 393 delay = 10 394 retries = 0 395 processed += 1 396 self.dataset.update_progress(processed / num_queries) 397 398 if no_additional_queries: 399 # Note that we are not completing this query 400 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 401 continue 402 403 while True: 404 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 405 entity_posts = 0 406 discovered = 0 407 try: 408 async for message in client.iter_messages(entity=query, offset_date=max_date): 409 entity_posts += 1 410 total_messages += 1 411 if self.interrupted: 412 raise ProcessorInterruptedException( 413 "Interrupted while fetching message data from the Telegram API") 414 415 if entity_posts % 100 == 0: 416 self.dataset.update_status( 417 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 418 419 if message.action is not None: 420 # e.g. someone joins the channel - not an actual message 421 continue 422 423 # todo: possibly enrich object with e.g. the name of 424 # the channel a message was forwarded from (but that 425 # needs extra API requests...) 426 serialized_message = SearchTelegram.serialize_obj(message) 427 if "_chat" in serialized_message: 428 # Add query ID to check if queries have been crawled previously 429 full_query.add(serialized_message["_chat"]["id"]) 430 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 431 # once we know what a channel ID resolves to, use the username instead so it is easier to 432 # understand for the user 433 entity_id_map[query] = serialized_message["_chat"]["username"] 434 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 435 436 if resolve_refs: 437 serialized_message = await self.resolve_groups(client, serialized_message) 438 439 # Stop if we're below the min date 440 if min_date and serialized_message.get("date") < min_date: 441 break 442 443 # if crawling is enabled, see if we found something to add to the query 444 linked_entities = set() 445 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 446 message_fwd = serialized_message.get("fwd_from") 447 fwd_from = None 448 fwd_source_type = None 449 if message_fwd and message_fwd.get("from_id"): 450 if message_fwd["from_id"].get("_type") == "PeerChannel": 451 # Legacy(?) data structure (pre 2024/7/22) 452 # even if we haven't resolved the ID, we can feed the numeric ID 453 # to Telethon! this is nice because it means we don't have to 454 # resolve entities to crawl iteratively 455 fwd_from = int(message_fwd["from_id"]["channel_id"]) 456 fwd_source_type = "channel" 457 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 458 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 459 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 460 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 461 fwd_source_type = "channel" 462 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 463 # forwards can also come from users 464 # these can never be followed, so don't add these to the crawl, but do document them 465 fwd_source_type = "user" 466 else: 467 print(json.dumps(message_fwd)) 468 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 469 fwd_source_type = "unknown" 470 471 if fwd_from: 472 linked_entities.add(fwd_from) 473 474 475 if crawl_via_links: 476 # t.me links 477 all_links = ural.urls_from_text(serialized_message["message"]) 478 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 479 for link in all_links: 480 if link.startswith("+"): 481 # invite links 482 continue 483 484 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 485 linked_entities.add(entity_name) 486 487 # @references 488 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 489 for reference in references: 490 if reference.startswith("@"): 491 reference = reference[1:] 492 493 reference = reference.split("/")[0] 494 495 linked_entities.add(reference) 496 497 # Check if fwd_from or the resolved entity ID is already queued or has been queried 498 for link in linked_entities: 499 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 500 # new entity discovered! 501 # might be discovered (before collection) multiple times, so retain lowest depth 502 # print(f"Potentially crawling {link}") 503 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 504 if link not in crawl_references: 505 crawl_references[link] = 0 506 crawl_references[link] += 1 507 508 # Add to queries if it has been referenced enough times 509 if crawl_references[link] >= crawl_msg_threshold: 510 queries.append(link) 511 full_query.add(link) 512 num_queries += 1 513 discovered += 1 514 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 515 516 517 518 serialized_message["4CAT_metadata"] = { 519 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 520 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 521 "query_depth": depth_map.get(query, 0) 522 } 523 yield serialized_message 524 525 if entity_posts >= max_items: 526 break 527 528 except ChannelPrivateError: 529 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 530 self.flawless += 1 531 532 except (UsernameInvalidError,): 533 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 534 self.flawless += 1 535 536 except FloodWaitError as e: 537 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 538 if e.seconds < self.end_if_rate_limited: 539 time.sleep(e.seconds) 540 continue 541 else: 542 self.flawless += 1 543 no_additional_queries = True 544 self.dataset.update_status( 545 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 546 break 547 548 except BadRequestError as e: 549 self.dataset.update_status( 550 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 551 self.flawless += 1 552 553 except ValueError as e: 554 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 555 self.flawless += 1 556 557 except ChannelPrivateError as e: 558 self.dataset.update_status( 559 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 560 break 561 562 except TimeoutError: 563 if retries < 3: 564 self.dataset.update_status( 565 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 566 self.flawless += 1 567 break 568 569 self.dataset.update_status( 570 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 571 time.sleep(delay) 572 delay *= 2 573 continue 574 575 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 576 break 577 578 async def resolve_groups(self, client, message): 579 """ 580 Recursively resolve references to groups and users 581 582 :param client: Telethon client instance 583 :param dict message: Message, as already mapped by serialize_obj 584 :return: Resolved dictionary 585 """ 586 resolved_message = message.copy() 587 for key, value in message.items(): 588 try: 589 if type(value) is not dict: 590 # if it's not a dict, we never have to resolve it, as it 591 # does not represent an entity 592 continue 593 594 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 595 # forwarded from a channel! 596 if value["channel_id"] in self.failures_cache: 597 continue 598 599 if value["channel_id"] not in self.details_cache: 600 channel = await client(GetFullChannelRequest(value["channel_id"])) 601 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 602 603 resolved_message[key] = self.details_cache[value["channel_id"]] 604 resolved_message[key]["channel_id"] = value["channel_id"] 605 606 elif "_type" in value and value["_type"] == "PeerUser": 607 # a user! 608 if value["user_id"] in self.failures_cache: 609 continue 610 611 if value["user_id"] not in self.details_cache: 612 user = await client(GetFullUserRequest(value["user_id"])) 613 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 614 615 resolved_message[key] = self.details_cache[value["user_id"]] 616 resolved_message[key]["user_id"] = value["user_id"] 617 else: 618 resolved_message[key] = await self.resolve_groups(client, value) 619 620 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 621 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 622 if type(e) in (ChannelPrivateError, UsernameInvalidError): 623 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 624 else: 625 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 626 627 return resolved_message 628 629 @staticmethod 630 def cancel_start(): 631 """ 632 Replace interactive phone number input in Telethon 633 634 By default, if Telethon cannot use the given session file to 635 authenticate, it will interactively prompt the user for a phone 636 number on the command line. That is not useful here, so instead 637 raise a RuntimeError. This will be caught below and the user will 638 be told they need to re-authenticate via 4CAT. 639 """ 640 raise RuntimeError("Connection cancelled") 641 642 @staticmethod 643 def map_item(message): 644 """ 645 Convert Message object to 4CAT-ready data object 646 647 :param Message message: Message to parse 648 :return dict: 4CAT-compatible item object 649 """ 650 if message["_chat"]["username"]: 651 # chats can apparently not have usernames??? 652 # truly telegram objects are way too lenient for their own good 653 thread = message["_chat"]["username"] 654 elif message["_chat"]["title"]: 655 thread = re.sub(r"\s", "", message["_chat"]["title"]) 656 else: 657 # just give up 658 thread = "unknown" 659 660 # determine username 661 # API responses only include the user *ID*, not the username, and to 662 # complicate things further not everyone is a user and not everyone 663 # has a username. If no username is available, try the first and 664 # last name someone has supplied 665 fullname = "" 666 username = "" 667 user_id = message["_sender"]["id"] if message.get("_sender") else "" 668 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 669 670 if message.get("_sender") and message["_sender"].get("username"): 671 username = message["_sender"]["username"] 672 673 if message.get("_sender") and message["_sender"].get("first_name"): 674 fullname += message["_sender"]["first_name"] 675 676 if message.get("_sender") and message["_sender"].get("last_name"): 677 fullname += " " + message["_sender"]["last_name"] 678 679 fullname = fullname.strip() 680 681 # determine media type 682 # these store some extra information of the attachment in 683 # attachment_data. Since the final result will be serialised as a csv 684 # file, we can only store text content. As such some media data is 685 # serialised as JSON. 686 attachment_type = SearchTelegram.get_media_type(message["media"]) 687 attachment_filename = "" 688 689 if attachment_type == "contact": 690 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 691 if message["media"].get('contact', False): 692 # Old datastructure 693 attachment = message["media"]["contact"] 694 elif all([property in message["media"].keys() for property in contact_data]): 695 # New datastructure 2022/7/25 696 attachment = message["media"] 697 else: 698 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 699 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 700 701 elif attachment_type == "document": 702 # videos, etc 703 # This could add a separate routine for videos to make them a 704 # separate type, which could then be scraped later, etc 705 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 706 if attachment_type == "video": 707 attachment = message["media"]["document"] 708 attachment_data = json.dumps({ 709 "id": attachment["id"], 710 "dc_id": attachment["dc_id"], 711 "file_reference": attachment["file_reference"], 712 }) 713 else: 714 attachment_data = "" 715 716 # elif attachment_type in ("geo", "geo_live"): 717 # untested whether geo_live is significantly different from geo 718 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 719 720 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 721 # we don't actually store any metadata about the photo, since very 722 # little of the metadata attached is of interest. Instead, the 723 # actual photos may be downloaded via a processor that is run on the 724 # search results 725 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 726 attachment_data = json.dumps({ 727 "id": attachment["id"], 728 "dc_id": attachment["dc_id"], 729 "file_reference": attachment["file_reference"], 730 }) 731 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 732 733 elif attachment_type == "poll": 734 # unfortunately poll results are only available when someone has 735 # actually voted on the poll - that will usually not be the case, 736 # so we store -1 as the vote count 737 attachment = message["media"] 738 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 739 attachment_data = json.dumps({ 740 "question": attachment["poll"]["question"], 741 "voters": attachment["results"]["total_voters"], 742 "answers": [{ 743 "answer": options[answer["option"]], 744 "votes": answer["voters"] 745 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 746 "answer": options[option], 747 "votes": -1 748 } for option in options] 749 }) 750 751 else: 752 attachment_data = "" 753 754 # was the message forwarded from somewhere and if so when? 755 forwarded_timestamp = "" 756 forwarded_name = "" 757 forwarded_id = "" 758 forwarded_username = "" 759 if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int: 760 # forward information is spread out over a lot of places 761 # we can identify, in order of usefulness: username, full name, 762 # and ID. But not all of these are always available, and not 763 # always in the same place either 764 forwarded_timestamp = int(message["fwd_from"]["date"]) 765 from_data = message["fwd_from"]["from_id"] 766 767 if from_data: 768 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 769 770 if message["fwd_from"].get("from_name"): 771 forwarded_name = message["fwd_from"].get("from_name") 772 773 if from_data and from_data.get("from_name"): 774 forwarded_name = message["fwd_from"]["from_name"] 775 776 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 777 from_data["user"] = from_data["users"][0] 778 779 if from_data and ("user" in from_data or "chats" in from_data): 780 # 'resolve entities' was enabled for this dataset 781 if "user" in from_data: 782 if from_data["user"].get("username"): 783 forwarded_username = from_data["user"]["username"] 784 785 if from_data["user"].get("first_name"): 786 forwarded_name = from_data["user"]["first_name"] 787 if message["fwd_from"].get("last_name"): 788 forwarded_name += " " + from_data["user"]["last_name"] 789 790 forwarded_name = forwarded_name.strip() 791 792 elif "chats" in from_data: 793 channel_id = from_data.get("channel_id") 794 for chat in from_data["chats"]: 795 if chat["id"] == channel_id or channel_id is None: 796 forwarded_username = chat["username"] 797 798 elif message.get("_forward") and message["_forward"].get("_chat"): 799 if message["_forward"]["_chat"].get("username"): 800 forwarded_username = message["_forward"]["_chat"]["username"] 801 802 if message["_forward"]["_chat"].get("title"): 803 forwarded_name = message["_forward"]["_chat"]["title"] 804 805 link_title = "" 806 link_attached = "" 807 link_description = "" 808 reactions = "" 809 810 if message.get("media") and message["media"].get("webpage"): 811 link_title = message["media"]["webpage"].get("title") 812 link_attached = message["media"]["webpage"].get("url") 813 link_description = message["media"]["webpage"].get("description") 814 815 if message.get("reactions") and message["reactions"].get("results"): 816 for reaction in message["reactions"]["results"]: 817 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 818 # Updated to support new reaction datastructure 819 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 820 elif type(reaction["reaction"]) is str and "count" in reaction: 821 reactions += reaction["reaction"] * reaction["count"] 822 else: 823 # Failsafe; can be updated to support formatting of new datastructures in the future 824 reactions += f"{reaction}, " 825 826 is_reply = False 827 reply_to = "" 828 if message.get("reply_to"): 829 is_reply = True 830 reply_to = message["reply_to"].get("reply_to_msg_id", "") 831 832 # t.me links 833 linked_entities = set() 834 all_links = ural.urls_from_text(message["message"]) 835 all_links = [link.split("t.me/")[1] for link in all_links if 836 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 837 838 for link in all_links: 839 if link.startswith("+"): 840 # invite links 841 continue 842 843 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 844 linked_entities.add(entity_name) 845 846 # @references 847 # in execute_queries we use MessageEntityMention to get these 848 # however, after serializing these objects we only have the offsets of 849 # the mentioned username, and telegram does weird unicode things to its 850 # offsets meaning we can't just substring the message. So use a regex 851 # as a 'good enough' solution 852 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 853 854 # make this case-insensitive since people may use different casing in 855 # messages than the 'official' username for example 856 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 857 all_ci_connections = set() 858 seen = set() 859 for connection in all_connections: 860 if connection.lower() not in seen: 861 all_ci_connections.add(connection) 862 seen.add(connection.lower()) 863 864 return MappedItem({ 865 "id": f"{message['_chat']['username']}-{message['id']}", 866 "thread_id": thread, 867 "chat": message["_chat"]["username"], 868 "author": user_id, 869 "author_username": username, 870 "author_name": fullname, 871 "author_is_bot": "yes" if user_is_bot else "no", 872 "body": message["message"], 873 "body_markdown": message.get("message_markdown", MissingMappedField("")), 874 "is_reply": is_reply, 875 "reply_to": reply_to, 876 "views": message["views"] if message["views"] else "", 877 # "forwards": message.get("forwards", MissingMappedField(0)), 878 "reactions": reactions, 879 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 880 "unix_timestamp": int(message["date"]), 881 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 882 "edit_date"] else "", 883 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 884 "author_forwarded_from_name": forwarded_name, 885 "author_forwarded_from_username": forwarded_username, 886 "author_forwarded_from_id": forwarded_id, 887 "entities_linked": ",".join(linked_entities), 888 "entities_mentioned": ",".join(all_mentions), 889 "all_connections": ",".join(all_ci_connections), 890 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 891 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 892 "unix_timestamp_forwarded_from": forwarded_timestamp, 893 "link_title": link_title, 894 "link_description": link_description, 895 "link_attached": link_attached, 896 "attachment_type": attachment_type, 897 "attachment_data": attachment_data, 898 "attachment_filename": attachment_filename 899 }) 900 901 @staticmethod 902 def get_media_type(media): 903 """ 904 Get media type for a Telegram attachment 905 906 :param media: Media object 907 :return str: Textual identifier of the media type 908 """ 909 try: 910 return { 911 "NoneType": "", 912 "MessageMediaContact": "contact", 913 "MessageMediaDocument": "document", 914 "MessageMediaEmpty": "", 915 "MessageMediaGame": "game", 916 "MessageMediaGeo": "geo", 917 "MessageMediaGeoLive": "geo_live", 918 "MessageMediaInvoice": "invoice", 919 "MessageMediaPhoto": "photo", 920 "MessageMediaPoll": "poll", 921 "MessageMediaUnsupported": "unsupported", 922 "MessageMediaVenue": "venue", 923 "MessageMediaWebPage": "url" 924 }[media.get("_type", None)] 925 except (AttributeError, KeyError): 926 return "" 927 928 @staticmethod 929 def serialize_obj(input_obj): 930 """ 931 Serialize an object as a dictionary 932 933 Telethon message objects are not serializable by themselves, but most 934 relevant attributes are simply struct classes. This function replaces 935 those that are not with placeholders and then returns a dictionary that 936 can be serialized as JSON. 937 938 :param obj: Object to serialize 939 :return: Serialized object 940 """ 941 scalars = (int, str, float, list, tuple, set, bool) 942 943 if type(input_obj) in scalars or input_obj is None: 944 return input_obj 945 946 if type(input_obj) is not dict: 947 obj = input_obj.__dict__ 948 else: 949 obj = input_obj.copy() 950 951 mapped_obj = {} 952 for item, value in obj.items(): 953 if type(value) is datetime: 954 mapped_obj[item] = value.timestamp() 955 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 956 mapped_obj[item] = SearchTelegram.serialize_obj(value) 957 elif type(value) is list: 958 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 959 elif type(value) is bytes: 960 mapped_obj[item] = value.hex() 961 elif type(value) not in scalars and value is not None: 962 # type we can't make sense of here 963 continue 964 else: 965 mapped_obj[item] = value 966 967 # Add the _type if the original object was a telethon type 968 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 969 mapped_obj["_type"] = type(input_obj).__name__ 970 971 # Store the markdown-formatted text 972 if type(input_obj).__name__ == "Message": 973 mapped_obj["message_markdown"] = input_obj.text 974 975 return mapped_obj 976 977 @staticmethod 978 def validate_query(query, request, config): 979 """ 980 Validate Telegram query 981 982 :param config: 983 :param dict query: Query parameters, from client-side. 984 :param request: Flask request 985 :param User user: User object of user who has submitted the query 986 :param ConfigManager config: Configuration reader (context-aware) 987 :return dict: Safe query parameters 988 """ 989 # no query 4 u 990 if not query.get("query", "").strip(): 991 raise QueryParametersException("You must provide a search query.") 992 993 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 994 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 995 996 all_posts = config.get("telegram-search.can_query_all_messages", False) 997 max_entities = config.get("telegram-search.max_entities", 25) 998 999 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options( 1000 config=config)["max_posts"]["max"]) 1001 1002 # reformat queries to be a comma-separated list with no wrapping 1003 # whitespace 1004 whitespace = re.compile(r"\s+") 1005 items = whitespace.sub("", query.get("query").replace("\n", ",")) 1006 if max_entities > 0 and len(items.split(",")) > max_entities: 1007 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 1008 1009 sanitized_items = [] 1010 # handle telegram URLs 1011 for item in items.split(","): 1012 if not item.strip(): 1013 continue 1014 item = re.sub(r"^https?://t\.me/", "", item) 1015 item = re.sub(r"^/?s/", "", item) 1016 item = re.sub(r"[/]*$", "", item) 1017 sanitized_items.append(item) 1018 1019 # the dates need to make sense as a range to search within 1020 min_date, max_date = query.get("daterange") 1021 1022 # now check if there is an active API session 1023 if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous: 1024 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1025 "accounts.") 1026 1027 # check for the information we need 1028 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1029 query.get("api_hash")) 1030 config.user.set_value("telegram.session", session_id) 1031 session_path = config.get('PATH_SESSIONS').joinpath(session_id + ".session") 1032 1033 client = None 1034 1035 # API ID is always a number, if it's not, we can immediately fail 1036 try: 1037 api_id = int(query.get("api_id")) 1038 except ValueError: 1039 raise QueryParametersException("Invalid API ID.") 1040 1041 # maybe we've entered a code already and submitted it with the request 1042 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1043 code_callback = lambda: request.form.get("option-security-code") # noqa: E731 1044 max_attempts = 1 1045 else: 1046 code_callback = lambda: -1 # noqa: E731 1047 # max_attempts = 0 because authing will always fail: we can't wait for 1048 # the code to be entered interactively, we'll need to do a new request 1049 # but we can't just immediately return, we still need to call start() 1050 # to get telegram to send us a code 1051 max_attempts = 0 1052 1053 # now try authenticating 1054 needs_code = False 1055 try: 1056 loop = asyncio.new_event_loop() 1057 asyncio.set_event_loop(loop) 1058 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1059 1060 try: 1061 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1062 1063 except ValueError as e: 1064 # this happens if 2FA is required 1065 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1066 f"does not support this authentication mode for Telegram. ({e})") 1067 except RuntimeError: 1068 # A code was sent to the given phone number 1069 needs_code = True 1070 except FloodWaitError as e: 1071 # uh oh, we got rate-limited 1072 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1073 str(e).split("(")[0] + ".") 1074 except ApiIdInvalidError: 1075 # wrong credentials 1076 raise QueryParametersException("Your API credentials are invalid.") 1077 except PhoneNumberInvalidError: 1078 # wrong phone number 1079 raise QueryParametersException( 1080 "The phone number provided is not a valid phone number for these credentials.") 1081 except RPCError as e: 1082 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1083 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1084 f"Telegram app(s) to the latest version to proceed ({e}).") 1085 except Exception as e: 1086 # ? 1087 raise QueryParametersException( 1088 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1089 finally: 1090 if client: 1091 client.disconnect() 1092 1093 if needs_code: 1094 raise QueryNeedsFurtherInputException(config={ 1095 "code-info": { 1096 "type": UserInput.OPTION_INFO, 1097 "help": "Please enter the security code that was sent to your Telegram app to continue." 1098 }, 1099 "security-code": { 1100 "type": UserInput.OPTION_TEXT, 1101 "help": "Security code", 1102 "sensitive": True 1103 }}) 1104 1105 # simple! 1106 return { 1107 "items": num_items, 1108 "query": ",".join(sanitized_items), 1109 "api_id": query.get("api_id"), 1110 "api_hash": query.get("api_hash"), 1111 "api_phone": query.get("api_phone"), 1112 "save-session": query.get("save-session"), 1113 "resolve-entities": query.get("resolve-entities"), 1114 "min_date": min_date, 1115 "max_date": max_date, 1116 "crawl-depth": query.get("crawl-depth"), 1117 "crawl-threshold": query.get("crawl-threshold"), 1118 "crawl-via-links": query.get("crawl-via-links") 1119 } 1120 1121 @staticmethod 1122 def create_session_id(api_phone, api_id, api_hash): 1123 """ 1124 Generate a filename for the session file 1125 1126 This is a combination of phone number and API credentials, but hashed 1127 so that one cannot actually derive someone's phone number from it. 1128 1129 :param str api_phone: Phone number for API ID 1130 :param int api_id: Telegram API ID 1131 :param str api_hash: Telegram API Hash 1132 :return str: A hash value derived from the input 1133 """ 1134 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1135 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Search Telegram via API
80 @classmethod 81 def get_options(cls, parent_dataset=None, config=None): 82 """ 83 Get processor options 84 85 Just updates the description of the entities field based on the 86 configured max entities. 87 88 :param DataSet parent_dataset: An object representing the dataset that 89 the processor would be run on 90 :param ConfigManager|None config: Configuration reader (context-aware) 91 """ 92 93 max_entities = config.get("telegram-search.max_entities", 25) 94 options = { 95 "intro": { 96 "type": UserInput.OPTION_INFO, 97 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 98 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 99 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 100 "authentication for Telegram." 101 }, 102 "api_id": { 103 "type": UserInput.OPTION_TEXT, 104 "help": "API ID", 105 "cache": True, 106 }, 107 "api_hash": { 108 "type": UserInput.OPTION_TEXT, 109 "help": "API Hash", 110 "cache": True, 111 }, 112 "api_phone": { 113 "type": UserInput.OPTION_TEXT, 114 "help": "Phone number", 115 "cache": True, 116 "default": "+xxxxxxxxxx" 117 }, 118 "divider": { 119 "type": UserInput.OPTION_DIVIDER 120 }, 121 "query-intro": { 122 "type": UserInput.OPTION_INFO, 123 "help": "Separate with commas or line breaks." 124 }, 125 "query": { 126 "type": UserInput.OPTION_TEXT_LARGE, 127 "help": "Entities to scrape", 128 "tooltip": "Separate with commas or line breaks." 129 }, 130 "max_posts": { 131 "type": UserInput.OPTION_TEXT, 132 "help": "Messages per group", 133 "min": 1, 134 "max": 50000, 135 "default": 10 136 }, 137 "daterange": { 138 "type": UserInput.OPTION_DATERANGE, 139 "help": "Date range" 140 }, 141 "divider-2": { 142 "type": UserInput.OPTION_DIVIDER 143 }, 144 "info-sensitive": { 145 "type": UserInput.OPTION_INFO, 146 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 147 "there while data is fetched. After the dataset has been created your credentials will be " 148 "deleted from the server, unless you enable the option below. If you want to download images " 149 "attached to the messages in your collected data, you need to enable this option. Your " 150 "credentials will never be visible to other users and can be erased later via the result page." 151 }, 152 "save-session": { 153 "type": UserInput.OPTION_TOGGLE, 154 "help": "Save session:", 155 "default": False 156 }, 157 "resolve-entities-intro": { 158 "type": UserInput.OPTION_INFO, 159 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 160 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 161 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 162 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 163 "to finish capturing. It will also dramatically increase the disk space needed to store the " 164 "data, so only enable this if you really need it!" 165 }, 166 "resolve-entities": { 167 "type": UserInput.OPTION_TOGGLE, 168 "help": "Resolve references", 169 "default": False, 170 } 171 } 172 173 if max_entities: 174 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 175 f"(channels or groups) at a time. Separate with line breaks or commas.") 176 177 all_messages = config.get("telegram-search.can_query_all_messages", False) 178 if all_messages: 179 if "max" in options["max_posts"]: 180 del options["max_posts"]["max"] 181 else: 182 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 183 f"{options['max_posts']['max']:,} messages per entity.") 184 185 if config.get("telegram-search.max_crawl_depth", 0) > 0: 186 options["crawl_intro"] = { 187 "type": UserInput.OPTION_INFO, 188 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 189 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 190 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 191 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 192 "progress cannot be accurately tracked when you use this feature." 193 } 194 options["crawl-depth"] = { 195 "type": UserInput.OPTION_TEXT, 196 "coerce_type": int, 197 "min": 0, 198 "max": config.get("telegram-search.max_crawl_depth"), 199 "default": 0, 200 "help": "Crawl depth", 201 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 202 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 203 "starting entities." 204 } 205 options["crawl-threshold"] = { 206 "type": UserInput.OPTION_TEXT, 207 "coerce_type": int, 208 "min": 0, 209 "default": 5, 210 "help": "Crawl threshold", 211 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 212 "references discovered below the max crawl depth are taken into account." 213 } 214 options["crawl-via-links"] = { 215 "type": UserInput.OPTION_TOGGLE, 216 "default": False, 217 "help": "Extract new groups from links", 218 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 219 "This is more error-prone than crawling only via forwards, but can be a way to discover " 220 "links that would otherwise remain undetected." 221 } 222 223 return options
Get processor options
Just updates the description of the entities field based on the configured max entities.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
- ConfigManager|None config: Configuration reader (context-aware)
226 def get_items(self, query): 227 """ 228 Execute a query; get messages for given parameters 229 230 Basically a wrapper around execute_queries() to call it with asyncio. 231 232 :param dict query: Query parameters, as part of the DataSet object 233 :return list: Posts, sorted by thread and post ID, in ascending order 234 """ 235 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 236 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 237 "creating it again from scratch.", is_final=True) 238 return None 239 240 self.details_cache = {} 241 self.failures_cache = set() 242 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 243 results = asyncio.run(self.execute_queries()) 244 245 if not query.get("save-session"): 246 self.dataset.delete_parameter("api_hash", instant=True) 247 self.dataset.delete_parameter("api_phone", instant=True) 248 self.dataset.delete_parameter("api_id", instant=True) 249 250 if self.flawless: 251 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 252 "been private). View the log file for details.", is_final=True) 253 254 return results
Execute a query; get messages for given parameters
Basically a wrapper around execute_queries() to call it with asyncio.
Parameters
- dict query: Query parameters, as part of the DataSet object
Returns
Posts, sorted by thread and post ID, in ascending order
256 async def execute_queries(self): 257 """ 258 Get messages for queries 259 260 This is basically what would be done in get_items(), except due to 261 Telethon's architecture this needs to be called in an async method, 262 which is this one. 263 264 :return list: Collected messages 265 """ 266 # session file has been created earlier, and we can re-use it here in 267 # order to avoid having to re-enter the security code 268 query = self.parameters 269 270 session_id = SearchTelegram.create_session_id(query["api_phone"].strip(), 271 query["api_id"].strip(), 272 query["api_hash"].strip()) 273 self.dataset.log(f'Telegram session id: {session_id}') 274 session_path = self.config.get("PATH_SESSIONS").joinpath(session_id + ".session") 275 276 client = None 277 278 try: 279 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 280 loop=self.eventloop) 281 await client.start(phone=SearchTelegram.cancel_start) 282 except RuntimeError: 283 # session is no longer useable, delete file so user will be asked 284 # for security code again. The RuntimeError is raised by 285 # `cancel_start()` 286 self.dataset.update_status( 287 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 288 is_final=True) 289 290 if client and hasattr(client, "disconnect"): 291 await client.disconnect() 292 293 if session_path.exists(): 294 session_path.unlink() 295 296 return [] 297 except Exception as e: 298 # not sure what exception specifically is triggered here, but it 299 # always means the connection failed 300 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 301 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 302 if client and hasattr(client, "disconnect"): 303 await client.disconnect() 304 return [] 305 306 # ready our parameters 307 parameters = self.dataset.get_parameters() 308 queries = [query.strip() for query in parameters.get("query", "").split(",")] 309 max_items = convert_to_int(parameters.get("items", 10), 10) 310 311 # Telethon requires the offset date to be a datetime date 312 max_date = parameters.get("max_date") 313 if max_date: 314 try: 315 max_date = datetime.fromtimestamp(int(max_date)) 316 except ValueError: 317 max_date = None 318 319 # min_date can remain an integer 320 min_date = parameters.get("min_date") 321 if min_date: 322 try: 323 min_date = int(min_date) 324 except ValueError: 325 min_date = None 326 327 posts = [] 328 try: 329 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 330 posts.append(post) 331 return posts 332 except ProcessorInterruptedException as e: 333 raise e 334 except Exception: 335 # catch-all so we can disconnect properly 336 # ...should we? 337 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 338 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 339 # May as well return what was captured, yes? 340 return posts 341 finally: 342 await client.disconnect()
Get messages for queries
This is basically what would be done in get_items(), except due to Telethon's architecture this needs to be called in an async method, which is this one.
Returns
Collected messages
344 async def gather_posts(self, client, queries, max_items, min_date, max_date): 345 """ 346 Gather messages for each entity for which messages are requested 347 348 :param TelegramClient client: Telegram Client 349 :param list queries: List of entities to query (as string) 350 :param int max_items: Messages to scrape per entity 351 :param int min_date: Datetime date to get posts after 352 :param int max_date: Datetime date to get posts before 353 :return list: List of messages, each message a dictionary. 354 """ 355 resolve_refs = self.parameters.get("resolve-entities") 356 357 # Adding flag to stop; using for rate limits 358 no_additional_queries = False 359 360 # This is used for the 'crawl' feature so we know at which depth a 361 # given entity was discovered 362 depth_map = { 363 entity: 0 for entity in queries 364 } 365 366 crawl_max_depth = self.parameters.get("crawl-depth", 0) 367 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 368 crawl_via_links = self.parameters.get("crawl-via-links", False) 369 370 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 371 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 372 373 # this keeps track of how often an entity not in the original query 374 # has been mentioned. When crawling is enabled and this exceeds the 375 # given threshold, the entity is added to the query 376 crawl_references = {} 377 full_query = set(queries) 378 num_queries = len(queries) 379 380 # we may not always know the 'entity username' for an entity ID, so 381 # keep a reference map as we go 382 entity_id_map = {} 383 384 # Collect queries 385 # Use while instead of for so we can change queries during iteration 386 # this is needed for the 'crawl' feature which can discover new 387 # entities during crawl 388 processed = 0 389 total_messages = 0 390 while queries: 391 query = queries.pop(0) 392 393 delay = 10 394 retries = 0 395 processed += 1 396 self.dataset.update_progress(processed / num_queries) 397 398 if no_additional_queries: 399 # Note that we are not completing this query 400 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 401 continue 402 403 while True: 404 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 405 entity_posts = 0 406 discovered = 0 407 try: 408 async for message in client.iter_messages(entity=query, offset_date=max_date): 409 entity_posts += 1 410 total_messages += 1 411 if self.interrupted: 412 raise ProcessorInterruptedException( 413 "Interrupted while fetching message data from the Telegram API") 414 415 if entity_posts % 100 == 0: 416 self.dataset.update_status( 417 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 418 419 if message.action is not None: 420 # e.g. someone joins the channel - not an actual message 421 continue 422 423 # todo: possibly enrich object with e.g. the name of 424 # the channel a message was forwarded from (but that 425 # needs extra API requests...) 426 serialized_message = SearchTelegram.serialize_obj(message) 427 if "_chat" in serialized_message: 428 # Add query ID to check if queries have been crawled previously 429 full_query.add(serialized_message["_chat"]["id"]) 430 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 431 # once we know what a channel ID resolves to, use the username instead so it is easier to 432 # understand for the user 433 entity_id_map[query] = serialized_message["_chat"]["username"] 434 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 435 436 if resolve_refs: 437 serialized_message = await self.resolve_groups(client, serialized_message) 438 439 # Stop if we're below the min date 440 if min_date and serialized_message.get("date") < min_date: 441 break 442 443 # if crawling is enabled, see if we found something to add to the query 444 linked_entities = set() 445 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 446 message_fwd = serialized_message.get("fwd_from") 447 fwd_from = None 448 fwd_source_type = None 449 if message_fwd and message_fwd.get("from_id"): 450 if message_fwd["from_id"].get("_type") == "PeerChannel": 451 # Legacy(?) data structure (pre 2024/7/22) 452 # even if we haven't resolved the ID, we can feed the numeric ID 453 # to Telethon! this is nice because it means we don't have to 454 # resolve entities to crawl iteratively 455 fwd_from = int(message_fwd["from_id"]["channel_id"]) 456 fwd_source_type = "channel" 457 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 458 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 459 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 460 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 461 fwd_source_type = "channel" 462 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 463 # forwards can also come from users 464 # these can never be followed, so don't add these to the crawl, but do document them 465 fwd_source_type = "user" 466 else: 467 print(json.dumps(message_fwd)) 468 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 469 fwd_source_type = "unknown" 470 471 if fwd_from: 472 linked_entities.add(fwd_from) 473 474 475 if crawl_via_links: 476 # t.me links 477 all_links = ural.urls_from_text(serialized_message["message"]) 478 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 479 for link in all_links: 480 if link.startswith("+"): 481 # invite links 482 continue 483 484 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 485 linked_entities.add(entity_name) 486 487 # @references 488 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 489 for reference in references: 490 if reference.startswith("@"): 491 reference = reference[1:] 492 493 reference = reference.split("/")[0] 494 495 linked_entities.add(reference) 496 497 # Check if fwd_from or the resolved entity ID is already queued or has been queried 498 for link in linked_entities: 499 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 500 # new entity discovered! 501 # might be discovered (before collection) multiple times, so retain lowest depth 502 # print(f"Potentially crawling {link}") 503 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 504 if link not in crawl_references: 505 crawl_references[link] = 0 506 crawl_references[link] += 1 507 508 # Add to queries if it has been referenced enough times 509 if crawl_references[link] >= crawl_msg_threshold: 510 queries.append(link) 511 full_query.add(link) 512 num_queries += 1 513 discovered += 1 514 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 515 516 517 518 serialized_message["4CAT_metadata"] = { 519 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 520 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 521 "query_depth": depth_map.get(query, 0) 522 } 523 yield serialized_message 524 525 if entity_posts >= max_items: 526 break 527 528 except ChannelPrivateError: 529 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 530 self.flawless += 1 531 532 except (UsernameInvalidError,): 533 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 534 self.flawless += 1 535 536 except FloodWaitError as e: 537 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 538 if e.seconds < self.end_if_rate_limited: 539 time.sleep(e.seconds) 540 continue 541 else: 542 self.flawless += 1 543 no_additional_queries = True 544 self.dataset.update_status( 545 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 546 break 547 548 except BadRequestError as e: 549 self.dataset.update_status( 550 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 551 self.flawless += 1 552 553 except ValueError as e: 554 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 555 self.flawless += 1 556 557 except ChannelPrivateError as e: 558 self.dataset.update_status( 559 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 560 break 561 562 except TimeoutError: 563 if retries < 3: 564 self.dataset.update_status( 565 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 566 self.flawless += 1 567 break 568 569 self.dataset.update_status( 570 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 571 time.sleep(delay) 572 delay *= 2 573 continue 574 575 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 576 break
Gather messages for each entity for which messages are requested
Parameters
- TelegramClient client: Telegram Client
- list queries: List of entities to query (as string)
- int max_items: Messages to scrape per entity
- int min_date: Datetime date to get posts after
- int max_date: Datetime date to get posts before
Returns
List of messages, each message a dictionary.
578 async def resolve_groups(self, client, message): 579 """ 580 Recursively resolve references to groups and users 581 582 :param client: Telethon client instance 583 :param dict message: Message, as already mapped by serialize_obj 584 :return: Resolved dictionary 585 """ 586 resolved_message = message.copy() 587 for key, value in message.items(): 588 try: 589 if type(value) is not dict: 590 # if it's not a dict, we never have to resolve it, as it 591 # does not represent an entity 592 continue 593 594 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 595 # forwarded from a channel! 596 if value["channel_id"] in self.failures_cache: 597 continue 598 599 if value["channel_id"] not in self.details_cache: 600 channel = await client(GetFullChannelRequest(value["channel_id"])) 601 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 602 603 resolved_message[key] = self.details_cache[value["channel_id"]] 604 resolved_message[key]["channel_id"] = value["channel_id"] 605 606 elif "_type" in value and value["_type"] == "PeerUser": 607 # a user! 608 if value["user_id"] in self.failures_cache: 609 continue 610 611 if value["user_id"] not in self.details_cache: 612 user = await client(GetFullUserRequest(value["user_id"])) 613 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 614 615 resolved_message[key] = self.details_cache[value["user_id"]] 616 resolved_message[key]["user_id"] = value["user_id"] 617 else: 618 resolved_message[key] = await self.resolve_groups(client, value) 619 620 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 621 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 622 if type(e) in (ChannelPrivateError, UsernameInvalidError): 623 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 624 else: 625 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 626 627 return resolved_message
Recursively resolve references to groups and users
Parameters
- client: Telethon client instance
- dict message: Message, as already mapped by serialize_obj
Returns
Resolved dictionary
629 @staticmethod 630 def cancel_start(): 631 """ 632 Replace interactive phone number input in Telethon 633 634 By default, if Telethon cannot use the given session file to 635 authenticate, it will interactively prompt the user for a phone 636 number on the command line. That is not useful here, so instead 637 raise a RuntimeError. This will be caught below and the user will 638 be told they need to re-authenticate via 4CAT. 639 """ 640 raise RuntimeError("Connection cancelled")
Replace interactive phone number input in Telethon
By default, if Telethon cannot use the given session file to authenticate, it will interactively prompt the user for a phone number on the command line. That is not useful here, so instead raise a RuntimeError. This will be caught below and the user will be told they need to re-authenticate via 4CAT.
642 @staticmethod 643 def map_item(message): 644 """ 645 Convert Message object to 4CAT-ready data object 646 647 :param Message message: Message to parse 648 :return dict: 4CAT-compatible item object 649 """ 650 if message["_chat"]["username"]: 651 # chats can apparently not have usernames??? 652 # truly telegram objects are way too lenient for their own good 653 thread = message["_chat"]["username"] 654 elif message["_chat"]["title"]: 655 thread = re.sub(r"\s", "", message["_chat"]["title"]) 656 else: 657 # just give up 658 thread = "unknown" 659 660 # determine username 661 # API responses only include the user *ID*, not the username, and to 662 # complicate things further not everyone is a user and not everyone 663 # has a username. If no username is available, try the first and 664 # last name someone has supplied 665 fullname = "" 666 username = "" 667 user_id = message["_sender"]["id"] if message.get("_sender") else "" 668 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 669 670 if message.get("_sender") and message["_sender"].get("username"): 671 username = message["_sender"]["username"] 672 673 if message.get("_sender") and message["_sender"].get("first_name"): 674 fullname += message["_sender"]["first_name"] 675 676 if message.get("_sender") and message["_sender"].get("last_name"): 677 fullname += " " + message["_sender"]["last_name"] 678 679 fullname = fullname.strip() 680 681 # determine media type 682 # these store some extra information of the attachment in 683 # attachment_data. Since the final result will be serialised as a csv 684 # file, we can only store text content. As such some media data is 685 # serialised as JSON. 686 attachment_type = SearchTelegram.get_media_type(message["media"]) 687 attachment_filename = "" 688 689 if attachment_type == "contact": 690 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 691 if message["media"].get('contact', False): 692 # Old datastructure 693 attachment = message["media"]["contact"] 694 elif all([property in message["media"].keys() for property in contact_data]): 695 # New datastructure 2022/7/25 696 attachment = message["media"] 697 else: 698 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 699 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 700 701 elif attachment_type == "document": 702 # videos, etc 703 # This could add a separate routine for videos to make them a 704 # separate type, which could then be scraped later, etc 705 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 706 if attachment_type == "video": 707 attachment = message["media"]["document"] 708 attachment_data = json.dumps({ 709 "id": attachment["id"], 710 "dc_id": attachment["dc_id"], 711 "file_reference": attachment["file_reference"], 712 }) 713 else: 714 attachment_data = "" 715 716 # elif attachment_type in ("geo", "geo_live"): 717 # untested whether geo_live is significantly different from geo 718 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 719 720 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 721 # we don't actually store any metadata about the photo, since very 722 # little of the metadata attached is of interest. Instead, the 723 # actual photos may be downloaded via a processor that is run on the 724 # search results 725 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 726 attachment_data = json.dumps({ 727 "id": attachment["id"], 728 "dc_id": attachment["dc_id"], 729 "file_reference": attachment["file_reference"], 730 }) 731 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 732 733 elif attachment_type == "poll": 734 # unfortunately poll results are only available when someone has 735 # actually voted on the poll - that will usually not be the case, 736 # so we store -1 as the vote count 737 attachment = message["media"] 738 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 739 attachment_data = json.dumps({ 740 "question": attachment["poll"]["question"], 741 "voters": attachment["results"]["total_voters"], 742 "answers": [{ 743 "answer": options[answer["option"]], 744 "votes": answer["voters"] 745 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 746 "answer": options[option], 747 "votes": -1 748 } for option in options] 749 }) 750 751 else: 752 attachment_data = "" 753 754 # was the message forwarded from somewhere and if so when? 755 forwarded_timestamp = "" 756 forwarded_name = "" 757 forwarded_id = "" 758 forwarded_username = "" 759 if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int: 760 # forward information is spread out over a lot of places 761 # we can identify, in order of usefulness: username, full name, 762 # and ID. But not all of these are always available, and not 763 # always in the same place either 764 forwarded_timestamp = int(message["fwd_from"]["date"]) 765 from_data = message["fwd_from"]["from_id"] 766 767 if from_data: 768 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 769 770 if message["fwd_from"].get("from_name"): 771 forwarded_name = message["fwd_from"].get("from_name") 772 773 if from_data and from_data.get("from_name"): 774 forwarded_name = message["fwd_from"]["from_name"] 775 776 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 777 from_data["user"] = from_data["users"][0] 778 779 if from_data and ("user" in from_data or "chats" in from_data): 780 # 'resolve entities' was enabled for this dataset 781 if "user" in from_data: 782 if from_data["user"].get("username"): 783 forwarded_username = from_data["user"]["username"] 784 785 if from_data["user"].get("first_name"): 786 forwarded_name = from_data["user"]["first_name"] 787 if message["fwd_from"].get("last_name"): 788 forwarded_name += " " + from_data["user"]["last_name"] 789 790 forwarded_name = forwarded_name.strip() 791 792 elif "chats" in from_data: 793 channel_id = from_data.get("channel_id") 794 for chat in from_data["chats"]: 795 if chat["id"] == channel_id or channel_id is None: 796 forwarded_username = chat["username"] 797 798 elif message.get("_forward") and message["_forward"].get("_chat"): 799 if message["_forward"]["_chat"].get("username"): 800 forwarded_username = message["_forward"]["_chat"]["username"] 801 802 if message["_forward"]["_chat"].get("title"): 803 forwarded_name = message["_forward"]["_chat"]["title"] 804 805 link_title = "" 806 link_attached = "" 807 link_description = "" 808 reactions = "" 809 810 if message.get("media") and message["media"].get("webpage"): 811 link_title = message["media"]["webpage"].get("title") 812 link_attached = message["media"]["webpage"].get("url") 813 link_description = message["media"]["webpage"].get("description") 814 815 if message.get("reactions") and message["reactions"].get("results"): 816 for reaction in message["reactions"]["results"]: 817 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 818 # Updated to support new reaction datastructure 819 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 820 elif type(reaction["reaction"]) is str and "count" in reaction: 821 reactions += reaction["reaction"] * reaction["count"] 822 else: 823 # Failsafe; can be updated to support formatting of new datastructures in the future 824 reactions += f"{reaction}, " 825 826 is_reply = False 827 reply_to = "" 828 if message.get("reply_to"): 829 is_reply = True 830 reply_to = message["reply_to"].get("reply_to_msg_id", "") 831 832 # t.me links 833 linked_entities = set() 834 all_links = ural.urls_from_text(message["message"]) 835 all_links = [link.split("t.me/")[1] for link in all_links if 836 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 837 838 for link in all_links: 839 if link.startswith("+"): 840 # invite links 841 continue 842 843 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 844 linked_entities.add(entity_name) 845 846 # @references 847 # in execute_queries we use MessageEntityMention to get these 848 # however, after serializing these objects we only have the offsets of 849 # the mentioned username, and telegram does weird unicode things to its 850 # offsets meaning we can't just substring the message. So use a regex 851 # as a 'good enough' solution 852 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 853 854 # make this case-insensitive since people may use different casing in 855 # messages than the 'official' username for example 856 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 857 all_ci_connections = set() 858 seen = set() 859 for connection in all_connections: 860 if connection.lower() not in seen: 861 all_ci_connections.add(connection) 862 seen.add(connection.lower()) 863 864 return MappedItem({ 865 "id": f"{message['_chat']['username']}-{message['id']}", 866 "thread_id": thread, 867 "chat": message["_chat"]["username"], 868 "author": user_id, 869 "author_username": username, 870 "author_name": fullname, 871 "author_is_bot": "yes" if user_is_bot else "no", 872 "body": message["message"], 873 "body_markdown": message.get("message_markdown", MissingMappedField("")), 874 "is_reply": is_reply, 875 "reply_to": reply_to, 876 "views": message["views"] if message["views"] else "", 877 # "forwards": message.get("forwards", MissingMappedField(0)), 878 "reactions": reactions, 879 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 880 "unix_timestamp": int(message["date"]), 881 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 882 "edit_date"] else "", 883 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 884 "author_forwarded_from_name": forwarded_name, 885 "author_forwarded_from_username": forwarded_username, 886 "author_forwarded_from_id": forwarded_id, 887 "entities_linked": ",".join(linked_entities), 888 "entities_mentioned": ",".join(all_mentions), 889 "all_connections": ",".join(all_ci_connections), 890 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 891 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 892 "unix_timestamp_forwarded_from": forwarded_timestamp, 893 "link_title": link_title, 894 "link_description": link_description, 895 "link_attached": link_attached, 896 "attachment_type": attachment_type, 897 "attachment_data": attachment_data, 898 "attachment_filename": attachment_filename 899 })
Convert Message object to 4CAT-ready data object
Parameters
- Message message: Message to parse
Returns
4CAT-compatible item object
901 @staticmethod 902 def get_media_type(media): 903 """ 904 Get media type for a Telegram attachment 905 906 :param media: Media object 907 :return str: Textual identifier of the media type 908 """ 909 try: 910 return { 911 "NoneType": "", 912 "MessageMediaContact": "contact", 913 "MessageMediaDocument": "document", 914 "MessageMediaEmpty": "", 915 "MessageMediaGame": "game", 916 "MessageMediaGeo": "geo", 917 "MessageMediaGeoLive": "geo_live", 918 "MessageMediaInvoice": "invoice", 919 "MessageMediaPhoto": "photo", 920 "MessageMediaPoll": "poll", 921 "MessageMediaUnsupported": "unsupported", 922 "MessageMediaVenue": "venue", 923 "MessageMediaWebPage": "url" 924 }[media.get("_type", None)] 925 except (AttributeError, KeyError): 926 return ""
Get media type for a Telegram attachment
Parameters
- media: Media object
Returns
Textual identifier of the media type
928 @staticmethod 929 def serialize_obj(input_obj): 930 """ 931 Serialize an object as a dictionary 932 933 Telethon message objects are not serializable by themselves, but most 934 relevant attributes are simply struct classes. This function replaces 935 those that are not with placeholders and then returns a dictionary that 936 can be serialized as JSON. 937 938 :param obj: Object to serialize 939 :return: Serialized object 940 """ 941 scalars = (int, str, float, list, tuple, set, bool) 942 943 if type(input_obj) in scalars or input_obj is None: 944 return input_obj 945 946 if type(input_obj) is not dict: 947 obj = input_obj.__dict__ 948 else: 949 obj = input_obj.copy() 950 951 mapped_obj = {} 952 for item, value in obj.items(): 953 if type(value) is datetime: 954 mapped_obj[item] = value.timestamp() 955 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 956 mapped_obj[item] = SearchTelegram.serialize_obj(value) 957 elif type(value) is list: 958 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 959 elif type(value) is bytes: 960 mapped_obj[item] = value.hex() 961 elif type(value) not in scalars and value is not None: 962 # type we can't make sense of here 963 continue 964 else: 965 mapped_obj[item] = value 966 967 # Add the _type if the original object was a telethon type 968 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 969 mapped_obj["_type"] = type(input_obj).__name__ 970 971 # Store the markdown-formatted text 972 if type(input_obj).__name__ == "Message": 973 mapped_obj["message_markdown"] = input_obj.text 974 975 return mapped_obj
Serialize an object as a dictionary
Telethon message objects are not serializable by themselves, but most relevant attributes are simply struct classes. This function replaces those that are not with placeholders and then returns a dictionary that can be serialized as JSON.
Parameters
- obj: Object to serialize
Returns
Serialized object
977 @staticmethod 978 def validate_query(query, request, config): 979 """ 980 Validate Telegram query 981 982 :param config: 983 :param dict query: Query parameters, from client-side. 984 :param request: Flask request 985 :param User user: User object of user who has submitted the query 986 :param ConfigManager config: Configuration reader (context-aware) 987 :return dict: Safe query parameters 988 """ 989 # no query 4 u 990 if not query.get("query", "").strip(): 991 raise QueryParametersException("You must provide a search query.") 992 993 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 994 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 995 996 all_posts = config.get("telegram-search.can_query_all_messages", False) 997 max_entities = config.get("telegram-search.max_entities", 25) 998 999 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options( 1000 config=config)["max_posts"]["max"]) 1001 1002 # reformat queries to be a comma-separated list with no wrapping 1003 # whitespace 1004 whitespace = re.compile(r"\s+") 1005 items = whitespace.sub("", query.get("query").replace("\n", ",")) 1006 if max_entities > 0 and len(items.split(",")) > max_entities: 1007 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 1008 1009 sanitized_items = [] 1010 # handle telegram URLs 1011 for item in items.split(","): 1012 if not item.strip(): 1013 continue 1014 item = re.sub(r"^https?://t\.me/", "", item) 1015 item = re.sub(r"^/?s/", "", item) 1016 item = re.sub(r"[/]*$", "", item) 1017 sanitized_items.append(item) 1018 1019 # the dates need to make sense as a range to search within 1020 min_date, max_date = query.get("daterange") 1021 1022 # now check if there is an active API session 1023 if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous: 1024 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1025 "accounts.") 1026 1027 # check for the information we need 1028 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1029 query.get("api_hash")) 1030 config.user.set_value("telegram.session", session_id) 1031 session_path = config.get('PATH_SESSIONS').joinpath(session_id + ".session") 1032 1033 client = None 1034 1035 # API ID is always a number, if it's not, we can immediately fail 1036 try: 1037 api_id = int(query.get("api_id")) 1038 except ValueError: 1039 raise QueryParametersException("Invalid API ID.") 1040 1041 # maybe we've entered a code already and submitted it with the request 1042 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1043 code_callback = lambda: request.form.get("option-security-code") # noqa: E731 1044 max_attempts = 1 1045 else: 1046 code_callback = lambda: -1 # noqa: E731 1047 # max_attempts = 0 because authing will always fail: we can't wait for 1048 # the code to be entered interactively, we'll need to do a new request 1049 # but we can't just immediately return, we still need to call start() 1050 # to get telegram to send us a code 1051 max_attempts = 0 1052 1053 # now try authenticating 1054 needs_code = False 1055 try: 1056 loop = asyncio.new_event_loop() 1057 asyncio.set_event_loop(loop) 1058 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1059 1060 try: 1061 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1062 1063 except ValueError as e: 1064 # this happens if 2FA is required 1065 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1066 f"does not support this authentication mode for Telegram. ({e})") 1067 except RuntimeError: 1068 # A code was sent to the given phone number 1069 needs_code = True 1070 except FloodWaitError as e: 1071 # uh oh, we got rate-limited 1072 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1073 str(e).split("(")[0] + ".") 1074 except ApiIdInvalidError: 1075 # wrong credentials 1076 raise QueryParametersException("Your API credentials are invalid.") 1077 except PhoneNumberInvalidError: 1078 # wrong phone number 1079 raise QueryParametersException( 1080 "The phone number provided is not a valid phone number for these credentials.") 1081 except RPCError as e: 1082 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1083 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1084 f"Telegram app(s) to the latest version to proceed ({e}).") 1085 except Exception as e: 1086 # ? 1087 raise QueryParametersException( 1088 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1089 finally: 1090 if client: 1091 client.disconnect() 1092 1093 if needs_code: 1094 raise QueryNeedsFurtherInputException(config={ 1095 "code-info": { 1096 "type": UserInput.OPTION_INFO, 1097 "help": "Please enter the security code that was sent to your Telegram app to continue." 1098 }, 1099 "security-code": { 1100 "type": UserInput.OPTION_TEXT, 1101 "help": "Security code", 1102 "sensitive": True 1103 }}) 1104 1105 # simple! 1106 return { 1107 "items": num_items, 1108 "query": ",".join(sanitized_items), 1109 "api_id": query.get("api_id"), 1110 "api_hash": query.get("api_hash"), 1111 "api_phone": query.get("api_phone"), 1112 "save-session": query.get("save-session"), 1113 "resolve-entities": query.get("resolve-entities"), 1114 "min_date": min_date, 1115 "max_date": max_date, 1116 "crawl-depth": query.get("crawl-depth"), 1117 "crawl-threshold": query.get("crawl-threshold"), 1118 "crawl-via-links": query.get("crawl-via-links") 1119 }
Validate Telegram query
Parameters
- config:
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
- ConfigManager config: Configuration reader (context-aware)
Returns
Safe query parameters
1121 @staticmethod 1122 def create_session_id(api_phone, api_id, api_hash): 1123 """ 1124 Generate a filename for the session file 1125 1126 This is a combination of phone number and API credentials, but hashed 1127 so that one cannot actually derive someone's phone number from it. 1128 1129 :param str api_phone: Phone number for API ID 1130 :param int api_id: Telegram API ID 1131 :param str api_hash: Telegram API Hash 1132 :return str: A hash value derived from the input 1133 """ 1134 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1135 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Generate a filename for the session file
This is a combination of phone number and API credentials, but hashed so that one cannot actually derive someone's phone number from it.
Parameters
- str api_phone: Phone number for API ID
- int api_id: Telegram API ID
- str api_hash: Telegram API Hash
Returns
A hash value derived from the input
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor