datasources.telegram.search_telegram
Search Telegram via API
1""" 2Search Telegram via API 3""" 4import traceback 5import hashlib 6import asyncio 7import json 8import ural 9import time 10import re 11 12from pathlib import Path 13 14from backend.lib.search import Search 15from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException, \ 16 QueryNeedsFurtherInputException 17from common.lib.helpers import convert_to_int, UserInput 18from common.lib.item_mapping import MappedItem, MissingMappedField 19 20from datetime import datetime 21from telethon import TelegramClient 22from telethon.errors.rpcerrorlist import UsernameInvalidError, TimeoutError, ChannelPrivateError, BadRequestError, \ 23 FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError 24from telethon.tl.functions.channels import GetFullChannelRequest 25from telethon.tl.functions.users import GetFullUserRequest 26from telethon.tl.types import MessageEntityMention 27 28 29 30class SearchTelegram(Search): 31 """ 32 Search Telegram via API 33 """ 34 type = "telegram-search" # job ID 35 category = "Search" # category 36 title = "Telegram API search" # title displayed in UI 37 description = "Scrapes messages from open Telegram groups via its API." # description displayed in UI 38 extension = "ndjson" # extension of result file, used internally and in UI 39 is_local = False # Whether this datasource is locally scraped 40 is_static = False # Whether this datasource is still updated 41 42 # cache 43 details_cache = None 44 failures_cache = None 45 eventloop = None 46 import_issues = 0 47 end_if_rate_limited = 600 # break if Telegram requires wait time above number of seconds 48 49 max_workers = 1 50 max_retries = 3 51 flawless = 0 52 53 config = { 54 "telegram-search.can_query_all_messages": { 55 "type": UserInput.OPTION_TOGGLE, 56 "help": "Remove message amount limit", 57 "default": False, 58 "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!" 59 }, 60 "telegram-search.max_entities": { 61 "type": UserInput.OPTION_TEXT, 62 "help": "Max entities to query", 63 "coerce_type": int, 64 "min": 0, 65 "default": 25, 66 "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to " 67 "disable limit." 68 }, 69 "telegram-search.max_crawl_depth": { 70 "type": UserInput.OPTION_TEXT, 71 "help": "Max crawl depth", 72 "coerce_type": int, 73 "min": 0, 74 "default": 0, 75 "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded " 76 "messages. Recommended to leave at 0 for most users since this can exponentially increase " 77 "dataset sizes." 78 } 79 } 80 81 @classmethod 82 def get_options(cls, parent_dataset=None, config=None): 83 """ 84 Get processor options 85 86 Just updates the description of the entities field based on the 87 configured max entities. 88 89 :param DataSet parent_dataset: An object representing the dataset that 90 the processor would be run on 91 :param ConfigManager|None config: Configuration reader (context-aware) 92 """ 93 94 max_entities = config.get("telegram-search.max_entities", 25) 95 options = { 96 "intro": { 97 "type": UserInput.OPTION_INFO, 98 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 99 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 100 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 101 "authentication for Telegram." 102 }, 103 "api_id": { 104 "type": UserInput.OPTION_TEXT, 105 "help": "API ID", 106 "cache": True, 107 }, 108 "api_hash": { 109 "type": UserInput.OPTION_TEXT, 110 "help": "API Hash", 111 "cache": True, 112 }, 113 "api_phone": { 114 "type": UserInput.OPTION_TEXT, 115 "help": "Phone number", 116 "cache": True, 117 "default": "+xxxxxxxxxx" 118 }, 119 "divider": { 120 "type": UserInput.OPTION_DIVIDER 121 }, 122 "query-intro": { 123 "type": UserInput.OPTION_INFO, 124 "help": "Separate with commas or line breaks." 125 }, 126 "query": { 127 "type": UserInput.OPTION_TEXT_LARGE, 128 "help": "Entities to scrape", 129 "tooltip": "Separate with commas or line breaks." 130 }, 131 "max_posts": { 132 "type": UserInput.OPTION_TEXT, 133 "help": "Messages per group", 134 "min": 1, 135 "max": 50000, 136 "default": 10 137 }, 138 "daterange": { 139 "type": UserInput.OPTION_DATERANGE, 140 "help": "Date range" 141 }, 142 "divider-2": { 143 "type": UserInput.OPTION_DIVIDER 144 }, 145 "info-sensitive": { 146 "type": UserInput.OPTION_INFO, 147 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 148 "there while data is fetched. After the dataset has been created your credentials will be " 149 "deleted from the server, unless you enable the option below. If you want to download images " 150 "attached to the messages in your collected data, you need to enable this option. Your " 151 "credentials will never be visible to other users and can be erased later via the result page." 152 }, 153 "save-session": { 154 "type": UserInput.OPTION_TOGGLE, 155 "help": "Save session:", 156 "default": False 157 }, 158 "resolve-entities-intro": { 159 "type": UserInput.OPTION_INFO, 160 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 161 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 162 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 163 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 164 "to finish capturing. It will also dramatically increase the disk space needed to store the " 165 "data, so only enable this if you really need it!" 166 }, 167 "resolve-entities": { 168 "type": UserInput.OPTION_TOGGLE, 169 "help": "Resolve references", 170 "default": False, 171 } 172 } 173 174 if max_entities: 175 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 176 f"(channels or groups) at a time. Separate with line breaks or commas.") 177 178 all_messages = config.get("telegram-search.can_query_all_messages", False) 179 if all_messages: 180 if "max" in options["max_posts"]: 181 del options["max_posts"]["max"] 182 else: 183 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 184 f"{options['max_posts']['max']:,} messages per entity.") 185 186 if config.get("telegram-search.max_crawl_depth", 0) > 0: 187 options["crawl_intro"] = { 188 "type": UserInput.OPTION_INFO, 189 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 190 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 191 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 192 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 193 "progress cannot be accurately tracked when you use this feature." 194 } 195 options["crawl-depth"] = { 196 "type": UserInput.OPTION_TEXT, 197 "coerce_type": int, 198 "min": 0, 199 "max": config.get("telegram-search.max_crawl_depth"), 200 "default": 0, 201 "help": "Crawl depth", 202 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 203 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 204 "starting entities." 205 } 206 options["crawl-threshold"] = { 207 "type": UserInput.OPTION_TEXT, 208 "coerce_type": int, 209 "min": 0, 210 "default": 5, 211 "help": "Crawl threshold", 212 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 213 "references discovered below the max crawl depth are taken into account." 214 } 215 options["crawl-via-links"] = { 216 "type": UserInput.OPTION_TOGGLE, 217 "default": False, 218 "help": "Extract new groups from links", 219 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 220 "This is more error-prone than crawling only via forwards, but can be a way to discover " 221 "links that would otherwise remain undetected." 222 } 223 224 return options 225 226 227 def get_items(self, query): 228 """ 229 Execute a query; get messages for given parameters 230 231 Basically a wrapper around execute_queries() to call it with asyncio. 232 233 :param dict query: Query parameters, as part of the DataSet object 234 :return list: Posts, sorted by thread and post ID, in ascending order 235 """ 236 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 237 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 238 "creating it again from scratch.", is_final=True) 239 return None 240 241 self.details_cache = {} 242 self.failures_cache = set() 243 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 244 results = asyncio.run(self.execute_queries()) 245 246 if not query.get("save-session"): 247 self.dataset.delete_parameter("api_hash", instant=True) 248 self.dataset.delete_parameter("api_phone", instant=True) 249 self.dataset.delete_parameter("api_id", instant=True) 250 251 if self.flawless: 252 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 253 "been private). View the log file for details.", is_final=True) 254 255 return results 256 257 async def execute_queries(self): 258 """ 259 Get messages for queries 260 261 This is basically what would be done in get_items(), except due to 262 Telethon's architecture this needs to be called in an async method, 263 which is this one. 264 265 :return list: Collected messages 266 """ 267 # session file has been created earlier, and we can re-use it here in 268 # order to avoid having to re-enter the security code 269 query = self.parameters 270 271 session_id = SearchTelegram.create_session_id(query["api_phone"].strip(), 272 query["api_id"].strip(), 273 query["api_hash"].strip()) 274 self.dataset.log(f'Telegram session id: {session_id}') 275 session_path = Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), session_id + ".session") 276 277 client = None 278 279 try: 280 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 281 loop=self.eventloop) 282 await client.start(phone=SearchTelegram.cancel_start) 283 except RuntimeError: 284 # session is no longer useable, delete file so user will be asked 285 # for security code again. The RuntimeError is raised by 286 # `cancel_start()` 287 self.dataset.update_status( 288 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 289 is_final=True) 290 291 if client and hasattr(client, "disconnect"): 292 await client.disconnect() 293 294 if session_path.exists(): 295 session_path.unlink() 296 297 return [] 298 except Exception as e: 299 # not sure what exception specifically is triggered here, but it 300 # always means the connection failed 301 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 302 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 303 if client and hasattr(client, "disconnect"): 304 await client.disconnect() 305 return [] 306 307 # ready our parameters 308 parameters = self.dataset.get_parameters() 309 queries = [query.strip() for query in parameters.get("query", "").split(",")] 310 max_items = convert_to_int(parameters.get("items", 10), 10) 311 312 # Telethon requires the offset date to be a datetime date 313 max_date = parameters.get("max_date") 314 if max_date: 315 try: 316 max_date = datetime.fromtimestamp(int(max_date)) 317 except ValueError: 318 max_date = None 319 320 # min_date can remain an integer 321 min_date = parameters.get("min_date") 322 if min_date: 323 try: 324 min_date = int(min_date) 325 except ValueError: 326 min_date = None 327 328 posts = [] 329 try: 330 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 331 posts.append(post) 332 return posts 333 except ProcessorInterruptedException as e: 334 raise e 335 except Exception: 336 # catch-all so we can disconnect properly 337 # ...should we? 338 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 339 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 340 # May as well return what was captured, yes? 341 return posts 342 finally: 343 await client.disconnect() 344 345 async def gather_posts(self, client, queries, max_items, min_date, max_date): 346 """ 347 Gather messages for each entity for which messages are requested 348 349 :param TelegramClient client: Telegram Client 350 :param list queries: List of entities to query (as string) 351 :param int max_items: Messages to scrape per entity 352 :param int min_date: Datetime date to get posts after 353 :param int max_date: Datetime date to get posts before 354 :return list: List of messages, each message a dictionary. 355 """ 356 resolve_refs = self.parameters.get("resolve-entities") 357 358 # Adding flag to stop; using for rate limits 359 no_additional_queries = False 360 361 # This is used for the 'crawl' feature so we know at which depth a 362 # given entity was discovered 363 depth_map = { 364 entity: 0 for entity in queries 365 } 366 367 crawl_max_depth = self.parameters.get("crawl-depth", 0) 368 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 369 crawl_via_links = self.parameters.get("crawl-via-links", False) 370 371 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 372 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 373 374 # this keeps track of how often an entity not in the original query 375 # has been mentioned. When crawling is enabled and this exceeds the 376 # given threshold, the entity is added to the query 377 crawl_references = {} 378 full_query = set(queries) 379 num_queries = len(queries) 380 381 # we may not always know the 'entity username' for an entity ID, so 382 # keep a reference map as we go 383 entity_id_map = {} 384 385 # Collect queries 386 # Use while instead of for so we can change queries during iteration 387 # this is needed for the 'crawl' feature which can discover new 388 # entities during crawl 389 processed = 0 390 total_messages = 0 391 while queries: 392 query = queries.pop(0) 393 394 delay = 10 395 retries = 0 396 processed += 1 397 self.dataset.update_progress(processed / num_queries) 398 399 if no_additional_queries: 400 # Note that we are not completing this query 401 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 402 continue 403 404 while True: 405 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 406 entity_posts = 0 407 discovered = 0 408 try: 409 async for message in client.iter_messages(entity=query, offset_date=max_date): 410 entity_posts += 1 411 total_messages += 1 412 if self.interrupted: 413 raise ProcessorInterruptedException( 414 "Interrupted while fetching message data from the Telegram API") 415 416 if entity_posts % 100 == 0: 417 self.dataset.update_status( 418 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 419 420 if message.action is not None: 421 # e.g. someone joins the channel - not an actual message 422 continue 423 424 # todo: possibly enrich object with e.g. the name of 425 # the channel a message was forwarded from (but that 426 # needs extra API requests...) 427 serialized_message = SearchTelegram.serialize_obj(message) 428 if "_chat" in serialized_message: 429 # Add query ID to check if queries have been crawled previously 430 full_query.add(serialized_message["_chat"]["id"]) 431 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 432 # once we know what a channel ID resolves to, use the username instead so it is easier to 433 # understand for the user 434 entity_id_map[query] = serialized_message["_chat"]["username"] 435 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 436 437 if resolve_refs: 438 serialized_message = await self.resolve_groups(client, serialized_message) 439 440 # Stop if we're below the min date 441 if min_date and serialized_message.get("date") < min_date: 442 break 443 444 # if crawling is enabled, see if we found something to add to the query 445 linked_entities = set() 446 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 447 message_fwd = serialized_message.get("fwd_from") 448 fwd_from = None 449 fwd_source_type = None 450 if message_fwd and message_fwd.get("from_id"): 451 if message_fwd["from_id"].get("_type") == "PeerChannel": 452 # Legacy(?) data structure (pre 2024/7/22) 453 # even if we haven't resolved the ID, we can feed the numeric ID 454 # to Telethon! this is nice because it means we don't have to 455 # resolve entities to crawl iteratively 456 fwd_from = int(message_fwd["from_id"]["channel_id"]) 457 fwd_source_type = "channel" 458 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 459 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 460 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 461 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 462 fwd_source_type = "channel" 463 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 464 # forwards can also come from users 465 # these can never be followed, so don't add these to the crawl, but do document them 466 fwd_source_type = "user" 467 else: 468 print(json.dumps(message_fwd)) 469 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 470 fwd_source_type = "unknown" 471 472 if fwd_from: 473 linked_entities.add(fwd_from) 474 475 476 if crawl_via_links: 477 # t.me links 478 all_links = ural.urls_from_text(serialized_message["message"]) 479 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 480 for link in all_links: 481 if link.startswith("+"): 482 # invite links 483 continue 484 485 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 486 linked_entities.add(entity_name) 487 488 # @references 489 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 490 for reference in references: 491 if reference.startswith("@"): 492 reference = reference[1:] 493 494 reference = reference.split("/")[0] 495 496 linked_entities.add(reference) 497 498 # Check if fwd_from or the resolved entity ID is already queued or has been queried 499 for link in linked_entities: 500 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 501 # new entity discovered! 502 # might be discovered (before collection) multiple times, so retain lowest depth 503 # print(f"Potentially crawling {link}") 504 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 505 if link not in crawl_references: 506 crawl_references[link] = 0 507 crawl_references[link] += 1 508 509 # Add to queries if it has been referenced enough times 510 if crawl_references[link] >= crawl_msg_threshold: 511 queries.append(link) 512 full_query.add(link) 513 num_queries += 1 514 discovered += 1 515 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 516 517 518 519 serialized_message["4CAT_metadata"] = { 520 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 521 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 522 "query_depth": depth_map.get(query, 0) 523 } 524 yield serialized_message 525 526 if entity_posts >= max_items: 527 break 528 529 except ChannelPrivateError: 530 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 531 self.flawless += 1 532 533 except (UsernameInvalidError,): 534 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 535 self.flawless += 1 536 537 except FloodWaitError as e: 538 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 539 if e.seconds < self.end_if_rate_limited: 540 time.sleep(e.seconds) 541 continue 542 else: 543 self.flawless += 1 544 no_additional_queries = True 545 self.dataset.update_status( 546 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 547 break 548 549 except BadRequestError as e: 550 self.dataset.update_status( 551 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 552 self.flawless += 1 553 554 except ValueError as e: 555 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 556 self.flawless += 1 557 558 except ChannelPrivateError as e: 559 self.dataset.update_status( 560 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 561 break 562 563 except TimeoutError: 564 if retries < 3: 565 self.dataset.update_status( 566 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 567 self.flawless += 1 568 break 569 570 self.dataset.update_status( 571 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 572 time.sleep(delay) 573 delay *= 2 574 continue 575 576 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 577 break 578 579 async def resolve_groups(self, client, message): 580 """ 581 Recursively resolve references to groups and users 582 583 :param client: Telethon client instance 584 :param dict message: Message, as already mapped by serialize_obj 585 :return: Resolved dictionary 586 """ 587 resolved_message = message.copy() 588 for key, value in message.items(): 589 try: 590 if type(value) is not dict: 591 # if it's not a dict, we never have to resolve it, as it 592 # does not represent an entity 593 continue 594 595 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 596 # forwarded from a channel! 597 if value["channel_id"] in self.failures_cache: 598 continue 599 600 if value["channel_id"] not in self.details_cache: 601 channel = await client(GetFullChannelRequest(value["channel_id"])) 602 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 603 604 resolved_message[key] = self.details_cache[value["channel_id"]] 605 resolved_message[key]["channel_id"] = value["channel_id"] 606 607 elif "_type" in value and value["_type"] == "PeerUser": 608 # a user! 609 if value["user_id"] in self.failures_cache: 610 continue 611 612 if value["user_id"] not in self.details_cache: 613 user = await client(GetFullUserRequest(value["user_id"])) 614 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 615 616 resolved_message[key] = self.details_cache[value["user_id"]] 617 resolved_message[key]["user_id"] = value["user_id"] 618 else: 619 resolved_message[key] = await self.resolve_groups(client, value) 620 621 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 622 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 623 if type(e) in (ChannelPrivateError, UsernameInvalidError): 624 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 625 else: 626 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 627 628 return resolved_message 629 630 @staticmethod 631 def cancel_start(): 632 """ 633 Replace interactive phone number input in Telethon 634 635 By default, if Telethon cannot use the given session file to 636 authenticate, it will interactively prompt the user for a phone 637 number on the command line. That is not useful here, so instead 638 raise a RuntimeError. This will be caught below and the user will 639 be told they need to re-authenticate via 4CAT. 640 """ 641 raise RuntimeError("Connection cancelled") 642 643 @staticmethod 644 def map_item(message): 645 """ 646 Convert Message object to 4CAT-ready data object 647 648 :param Message message: Message to parse 649 :return dict: 4CAT-compatible item object 650 """ 651 if message["_chat"]["username"]: 652 # chats can apparently not have usernames??? 653 # truly telegram objects are way too lenient for their own good 654 thread = message["_chat"]["username"] 655 elif message["_chat"]["title"]: 656 thread = re.sub(r"\s", "", message["_chat"]["title"]) 657 else: 658 # just give up 659 thread = "unknown" 660 661 # determine username 662 # API responses only include the user *ID*, not the username, and to 663 # complicate things further not everyone is a user and not everyone 664 # has a username. If no username is available, try the first and 665 # last name someone has supplied 666 fullname = "" 667 username = "" 668 user_id = message["_sender"]["id"] if message.get("_sender") else "" 669 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 670 671 if message.get("_sender") and message["_sender"].get("username"): 672 username = message["_sender"]["username"] 673 674 if message.get("_sender") and message["_sender"].get("first_name"): 675 fullname += message["_sender"]["first_name"] 676 677 if message.get("_sender") and message["_sender"].get("last_name"): 678 fullname += " " + message["_sender"]["last_name"] 679 680 fullname = fullname.strip() 681 682 # determine media type 683 # these store some extra information of the attachment in 684 # attachment_data. Since the final result will be serialised as a csv 685 # file, we can only store text content. As such some media data is 686 # serialised as JSON. 687 attachment_type = SearchTelegram.get_media_type(message["media"]) 688 attachment_filename = "" 689 690 if attachment_type == "contact": 691 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 692 if message["media"].get('contact', False): 693 # Old datastructure 694 attachment = message["media"]["contact"] 695 elif all([property in message["media"].keys() for property in contact_data]): 696 # New datastructure 2022/7/25 697 attachment = message["media"] 698 else: 699 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 700 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 701 702 elif attachment_type == "document": 703 # videos, etc 704 # This could add a separate routine for videos to make them a 705 # separate type, which could then be scraped later, etc 706 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 707 if attachment_type == "video": 708 attachment = message["media"]["document"] 709 attachment_data = json.dumps({ 710 "id": attachment["id"], 711 "dc_id": attachment["dc_id"], 712 "file_reference": attachment["file_reference"], 713 }) 714 else: 715 attachment_data = "" 716 717 # elif attachment_type in ("geo", "geo_live"): 718 # untested whether geo_live is significantly different from geo 719 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 720 721 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 722 # we don't actually store any metadata about the photo, since very 723 # little of the metadata attached is of interest. Instead, the 724 # actual photos may be downloaded via a processor that is run on the 725 # search results 726 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 727 attachment_data = json.dumps({ 728 "id": attachment["id"], 729 "dc_id": attachment["dc_id"], 730 "file_reference": attachment["file_reference"], 731 }) 732 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 733 734 elif attachment_type == "poll": 735 # unfortunately poll results are only available when someone has 736 # actually voted on the poll - that will usually not be the case, 737 # so we store -1 as the vote count 738 attachment = message["media"] 739 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 740 attachment_data = json.dumps({ 741 "question": attachment["poll"]["question"], 742 "voters": attachment["results"]["total_voters"], 743 "answers": [{ 744 "answer": options[answer["option"]], 745 "votes": answer["voters"] 746 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 747 "answer": options[option], 748 "votes": -1 749 } for option in options] 750 }) 751 752 else: 753 attachment_data = "" 754 755 # was the message forwarded from somewhere and if so when? 756 forwarded_timestamp = "" 757 forwarded_name = "" 758 forwarded_id = "" 759 forwarded_username = "" 760 if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int: 761 # forward information is spread out over a lot of places 762 # we can identify, in order of usefulness: username, full name, 763 # and ID. But not all of these are always available, and not 764 # always in the same place either 765 forwarded_timestamp = int(message["fwd_from"]["date"]) 766 from_data = message["fwd_from"]["from_id"] 767 768 if from_data: 769 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 770 771 if message["fwd_from"].get("from_name"): 772 forwarded_name = message["fwd_from"].get("from_name") 773 774 if from_data and from_data.get("from_name"): 775 forwarded_name = message["fwd_from"]["from_name"] 776 777 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 778 from_data["user"] = from_data["users"][0] 779 780 if from_data and ("user" in from_data or "chats" in from_data): 781 # 'resolve entities' was enabled for this dataset 782 if "user" in from_data: 783 if from_data["user"].get("username"): 784 forwarded_username = from_data["user"]["username"] 785 786 if from_data["user"].get("first_name"): 787 forwarded_name = from_data["user"]["first_name"] 788 if message["fwd_from"].get("last_name"): 789 forwarded_name += " " + from_data["user"]["last_name"] 790 791 forwarded_name = forwarded_name.strip() 792 793 elif "chats" in from_data: 794 channel_id = from_data.get("channel_id") 795 for chat in from_data["chats"]: 796 if chat["id"] == channel_id or channel_id is None: 797 forwarded_username = chat["username"] 798 799 elif message.get("_forward") and message["_forward"].get("_chat"): 800 if message["_forward"]["_chat"].get("username"): 801 forwarded_username = message["_forward"]["_chat"]["username"] 802 803 if message["_forward"]["_chat"].get("title"): 804 forwarded_name = message["_forward"]["_chat"]["title"] 805 806 link_title = "" 807 link_attached = "" 808 link_description = "" 809 reactions = "" 810 811 if message.get("media") and message["media"].get("webpage"): 812 link_title = message["media"]["webpage"].get("title") 813 link_attached = message["media"]["webpage"].get("url") 814 link_description = message["media"]["webpage"].get("description") 815 816 if message.get("reactions") and message["reactions"].get("results"): 817 for reaction in message["reactions"]["results"]: 818 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 819 # Updated to support new reaction datastructure 820 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 821 elif type(reaction["reaction"]) is str and "count" in reaction: 822 reactions += reaction["reaction"] * reaction["count"] 823 else: 824 # Failsafe; can be updated to support formatting of new datastructures in the future 825 reactions += f"{reaction}, " 826 827 is_reply = False 828 reply_to = "" 829 if message.get("reply_to"): 830 is_reply = True 831 reply_to = message["reply_to"].get("reply_to_msg_id", "") 832 833 # t.me links 834 linked_entities = set() 835 all_links = ural.urls_from_text(message["message"]) 836 all_links = [link.split("t.me/")[1] for link in all_links if 837 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 838 839 for link in all_links: 840 if link.startswith("+"): 841 # invite links 842 continue 843 844 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 845 linked_entities.add(entity_name) 846 847 # @references 848 # in execute_queries we use MessageEntityMention to get these 849 # however, after serializing these objects we only have the offsets of 850 # the mentioned username, and telegram does weird unicode things to its 851 # offsets meaning we can't just substring the message. So use a regex 852 # as a 'good enough' solution 853 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 854 855 # make this case-insensitive since people may use different casing in 856 # messages than the 'official' username for example 857 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 858 all_ci_connections = set() 859 seen = set() 860 for connection in all_connections: 861 if connection.lower() not in seen: 862 all_ci_connections.add(connection) 863 seen.add(connection.lower()) 864 865 return MappedItem({ 866 "id": f"{message['_chat']['username']}-{message['id']}", 867 "thread_id": thread, 868 "chat": message["_chat"]["username"], 869 "author": user_id, 870 "author_username": username, 871 "author_name": fullname, 872 "author_is_bot": "yes" if user_is_bot else "no", 873 "body": message["message"], 874 "body_markdown": message.get("message_markdown", MissingMappedField("")), 875 "is_reply": is_reply, 876 "reply_to": reply_to, 877 "views": message["views"] if message["views"] else "", 878 # "forwards": message.get("forwards", MissingMappedField(0)), 879 "reactions": reactions, 880 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 881 "unix_timestamp": int(message["date"]), 882 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 883 "edit_date"] else "", 884 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 885 "author_forwarded_from_name": forwarded_name, 886 "author_forwarded_from_username": forwarded_username, 887 "author_forwarded_from_id": forwarded_id, 888 "entities_linked": ",".join(linked_entities), 889 "entities_mentioned": ",".join(all_mentions), 890 "all_connections": ",".join(all_ci_connections), 891 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 892 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 893 "unix_timestamp_forwarded_from": forwarded_timestamp, 894 "link_title": link_title, 895 "link_description": link_description, 896 "link_attached": link_attached, 897 "attachment_type": attachment_type, 898 "attachment_data": attachment_data, 899 "attachment_filename": attachment_filename 900 }) 901 902 @staticmethod 903 def get_media_type(media): 904 """ 905 Get media type for a Telegram attachment 906 907 :param media: Media object 908 :return str: Textual identifier of the media type 909 """ 910 try: 911 return { 912 "NoneType": "", 913 "MessageMediaContact": "contact", 914 "MessageMediaDocument": "document", 915 "MessageMediaEmpty": "", 916 "MessageMediaGame": "game", 917 "MessageMediaGeo": "geo", 918 "MessageMediaGeoLive": "geo_live", 919 "MessageMediaInvoice": "invoice", 920 "MessageMediaPhoto": "photo", 921 "MessageMediaPoll": "poll", 922 "MessageMediaUnsupported": "unsupported", 923 "MessageMediaVenue": "venue", 924 "MessageMediaWebPage": "url" 925 }[media.get("_type", None)] 926 except (AttributeError, KeyError): 927 return "" 928 929 @staticmethod 930 def serialize_obj(input_obj): 931 """ 932 Serialize an object as a dictionary 933 934 Telethon message objects are not serializable by themselves, but most 935 relevant attributes are simply struct classes. This function replaces 936 those that are not with placeholders and then returns a dictionary that 937 can be serialized as JSON. 938 939 :param obj: Object to serialize 940 :return: Serialized object 941 """ 942 scalars = (int, str, float, list, tuple, set, bool) 943 944 if type(input_obj) in scalars or input_obj is None: 945 return input_obj 946 947 if type(input_obj) is not dict: 948 obj = input_obj.__dict__ 949 else: 950 obj = input_obj.copy() 951 952 mapped_obj = {} 953 for item, value in obj.items(): 954 if type(value) is datetime: 955 mapped_obj[item] = value.timestamp() 956 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 957 mapped_obj[item] = SearchTelegram.serialize_obj(value) 958 elif type(value) is list: 959 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 960 elif type(value) is bytes: 961 mapped_obj[item] = value.hex() 962 elif type(value) not in scalars and value is not None: 963 # type we can't make sense of here 964 continue 965 else: 966 mapped_obj[item] = value 967 968 # Add the _type if the original object was a telethon type 969 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 970 mapped_obj["_type"] = type(input_obj).__name__ 971 972 # Store the markdown-formatted text 973 if type(input_obj).__name__ == "Message": 974 mapped_obj["message_markdown"] = input_obj.text 975 976 return mapped_obj 977 978 @staticmethod 979 def validate_query(query, request, config): 980 """ 981 Validate Telegram query 982 983 :param config: 984 :param dict query: Query parameters, from client-side. 985 :param request: Flask request 986 :param User user: User object of user who has submitted the query 987 :param ConfigManager config: Configuration reader (context-aware) 988 :return dict: Safe query parameters 989 """ 990 # no query 4 u 991 if not query.get("query", "").strip(): 992 raise QueryParametersException("You must provide a search query.") 993 994 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 995 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 996 997 all_posts = config.get("telegram-search.can_query_all_messages", False) 998 max_entities = config.get("telegram-search.max_entities", 25) 999 1000 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options( 1001 config=config)["max_posts"]["max"]) 1002 1003 # reformat queries to be a comma-separated list with no wrapping 1004 # whitespace 1005 whitespace = re.compile(r"\s+") 1006 items = whitespace.sub("", query.get("query").replace("\n", ",")) 1007 if max_entities > 0 and len(items.split(",")) > max_entities: 1008 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 1009 1010 sanitized_items = [] 1011 # handle telegram URLs 1012 for item in items.split(","): 1013 if not item.strip(): 1014 continue 1015 item = re.sub(r"^https?://t\.me/", "", item) 1016 item = re.sub(r"^/?s/", "", item) 1017 item = re.sub(r"[/]*$", "", item) 1018 sanitized_items.append(item) 1019 1020 # the dates need to make sense as a range to search within 1021 min_date, max_date = query.get("daterange") 1022 1023 # now check if there is an active API session 1024 if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous: 1025 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1026 "accounts.") 1027 1028 # check for the information we need 1029 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1030 query.get("api_hash")) 1031 config.user.set_value("telegram.session", session_id) 1032 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session") 1033 1034 client = None 1035 1036 # API ID is always a number, if it's not, we can immediately fail 1037 try: 1038 api_id = int(query.get("api_id")) 1039 except ValueError: 1040 raise QueryParametersException("Invalid API ID.") 1041 1042 # maybe we've entered a code already and submitted it with the request 1043 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1044 code_callback = lambda: request.form.get("option-security-code") # noqa: E731 1045 max_attempts = 1 1046 else: 1047 code_callback = lambda: -1 # noqa: E731 1048 # max_attempts = 0 because authing will always fail: we can't wait for 1049 # the code to be entered interactively, we'll need to do a new request 1050 # but we can't just immediately return, we still need to call start() 1051 # to get telegram to send us a code 1052 max_attempts = 0 1053 1054 # now try authenticating 1055 needs_code = False 1056 try: 1057 loop = asyncio.new_event_loop() 1058 asyncio.set_event_loop(loop) 1059 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1060 1061 try: 1062 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1063 1064 except ValueError as e: 1065 # this happens if 2FA is required 1066 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1067 f"does not support this authentication mode for Telegram. ({e})") 1068 except RuntimeError: 1069 # A code was sent to the given phone number 1070 needs_code = True 1071 except FloodWaitError as e: 1072 # uh oh, we got rate-limited 1073 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1074 str(e).split("(")[0] + ".") 1075 except ApiIdInvalidError: 1076 # wrong credentials 1077 raise QueryParametersException("Your API credentials are invalid.") 1078 except PhoneNumberInvalidError: 1079 # wrong phone number 1080 raise QueryParametersException( 1081 "The phone number provided is not a valid phone number for these credentials.") 1082 except RPCError as e: 1083 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1084 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1085 f"Telegram app(s) to the latest version to proceed ({e}).") 1086 except Exception as e: 1087 # ? 1088 raise QueryParametersException( 1089 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1090 finally: 1091 if client: 1092 client.disconnect() 1093 1094 if needs_code: 1095 raise QueryNeedsFurtherInputException(config={ 1096 "code-info": { 1097 "type": UserInput.OPTION_INFO, 1098 "help": "Please enter the security code that was sent to your Telegram app to continue." 1099 }, 1100 "security-code": { 1101 "type": UserInput.OPTION_TEXT, 1102 "help": "Security code", 1103 "sensitive": True 1104 }}) 1105 1106 # simple! 1107 return { 1108 "items": num_items, 1109 "query": ",".join(sanitized_items), 1110 "api_id": query.get("api_id"), 1111 "api_hash": query.get("api_hash"), 1112 "api_phone": query.get("api_phone"), 1113 "save-session": query.get("save-session"), 1114 "resolve-entities": query.get("resolve-entities"), 1115 "min_date": min_date, 1116 "max_date": max_date, 1117 "crawl-depth": query.get("crawl-depth"), 1118 "crawl-threshold": query.get("crawl-threshold"), 1119 "crawl-via-links": query.get("crawl-via-links") 1120 } 1121 1122 @staticmethod 1123 def create_session_id(api_phone, api_id, api_hash): 1124 """ 1125 Generate a filename for the session file 1126 1127 This is a combination of phone number and API credentials, but hashed 1128 so that one cannot actually derive someone's phone number from it. 1129 1130 :param str api_phone: Phone number for API ID 1131 :param int api_id: Telegram API ID 1132 :param str api_hash: Telegram API Hash 1133 :return str: A hash value derived from the input 1134 """ 1135 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1136 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
31class SearchTelegram(Search): 32 """ 33 Search Telegram via API 34 """ 35 type = "telegram-search" # job ID 36 category = "Search" # category 37 title = "Telegram API search" # title displayed in UI 38 description = "Scrapes messages from open Telegram groups via its API." # description displayed in UI 39 extension = "ndjson" # extension of result file, used internally and in UI 40 is_local = False # Whether this datasource is locally scraped 41 is_static = False # Whether this datasource is still updated 42 43 # cache 44 details_cache = None 45 failures_cache = None 46 eventloop = None 47 import_issues = 0 48 end_if_rate_limited = 600 # break if Telegram requires wait time above number of seconds 49 50 max_workers = 1 51 max_retries = 3 52 flawless = 0 53 54 config = { 55 "telegram-search.can_query_all_messages": { 56 "type": UserInput.OPTION_TOGGLE, 57 "help": "Remove message amount limit", 58 "default": False, 59 "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!" 60 }, 61 "telegram-search.max_entities": { 62 "type": UserInput.OPTION_TEXT, 63 "help": "Max entities to query", 64 "coerce_type": int, 65 "min": 0, 66 "default": 25, 67 "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to " 68 "disable limit." 69 }, 70 "telegram-search.max_crawl_depth": { 71 "type": UserInput.OPTION_TEXT, 72 "help": "Max crawl depth", 73 "coerce_type": int, 74 "min": 0, 75 "default": 0, 76 "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded " 77 "messages. Recommended to leave at 0 for most users since this can exponentially increase " 78 "dataset sizes." 79 } 80 } 81 82 @classmethod 83 def get_options(cls, parent_dataset=None, config=None): 84 """ 85 Get processor options 86 87 Just updates the description of the entities field based on the 88 configured max entities. 89 90 :param DataSet parent_dataset: An object representing the dataset that 91 the processor would be run on 92 :param ConfigManager|None config: Configuration reader (context-aware) 93 """ 94 95 max_entities = config.get("telegram-search.max_entities", 25) 96 options = { 97 "intro": { 98 "type": UserInput.OPTION_INFO, 99 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 100 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 101 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 102 "authentication for Telegram." 103 }, 104 "api_id": { 105 "type": UserInput.OPTION_TEXT, 106 "help": "API ID", 107 "cache": True, 108 }, 109 "api_hash": { 110 "type": UserInput.OPTION_TEXT, 111 "help": "API Hash", 112 "cache": True, 113 }, 114 "api_phone": { 115 "type": UserInput.OPTION_TEXT, 116 "help": "Phone number", 117 "cache": True, 118 "default": "+xxxxxxxxxx" 119 }, 120 "divider": { 121 "type": UserInput.OPTION_DIVIDER 122 }, 123 "query-intro": { 124 "type": UserInput.OPTION_INFO, 125 "help": "Separate with commas or line breaks." 126 }, 127 "query": { 128 "type": UserInput.OPTION_TEXT_LARGE, 129 "help": "Entities to scrape", 130 "tooltip": "Separate with commas or line breaks." 131 }, 132 "max_posts": { 133 "type": UserInput.OPTION_TEXT, 134 "help": "Messages per group", 135 "min": 1, 136 "max": 50000, 137 "default": 10 138 }, 139 "daterange": { 140 "type": UserInput.OPTION_DATERANGE, 141 "help": "Date range" 142 }, 143 "divider-2": { 144 "type": UserInput.OPTION_DIVIDER 145 }, 146 "info-sensitive": { 147 "type": UserInput.OPTION_INFO, 148 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 149 "there while data is fetched. After the dataset has been created your credentials will be " 150 "deleted from the server, unless you enable the option below. If you want to download images " 151 "attached to the messages in your collected data, you need to enable this option. Your " 152 "credentials will never be visible to other users and can be erased later via the result page." 153 }, 154 "save-session": { 155 "type": UserInput.OPTION_TOGGLE, 156 "help": "Save session:", 157 "default": False 158 }, 159 "resolve-entities-intro": { 160 "type": UserInput.OPTION_INFO, 161 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 162 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 163 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 164 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 165 "to finish capturing. It will also dramatically increase the disk space needed to store the " 166 "data, so only enable this if you really need it!" 167 }, 168 "resolve-entities": { 169 "type": UserInput.OPTION_TOGGLE, 170 "help": "Resolve references", 171 "default": False, 172 } 173 } 174 175 if max_entities: 176 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 177 f"(channels or groups) at a time. Separate with line breaks or commas.") 178 179 all_messages = config.get("telegram-search.can_query_all_messages", False) 180 if all_messages: 181 if "max" in options["max_posts"]: 182 del options["max_posts"]["max"] 183 else: 184 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 185 f"{options['max_posts']['max']:,} messages per entity.") 186 187 if config.get("telegram-search.max_crawl_depth", 0) > 0: 188 options["crawl_intro"] = { 189 "type": UserInput.OPTION_INFO, 190 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 191 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 192 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 193 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 194 "progress cannot be accurately tracked when you use this feature." 195 } 196 options["crawl-depth"] = { 197 "type": UserInput.OPTION_TEXT, 198 "coerce_type": int, 199 "min": 0, 200 "max": config.get("telegram-search.max_crawl_depth"), 201 "default": 0, 202 "help": "Crawl depth", 203 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 204 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 205 "starting entities." 206 } 207 options["crawl-threshold"] = { 208 "type": UserInput.OPTION_TEXT, 209 "coerce_type": int, 210 "min": 0, 211 "default": 5, 212 "help": "Crawl threshold", 213 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 214 "references discovered below the max crawl depth are taken into account." 215 } 216 options["crawl-via-links"] = { 217 "type": UserInput.OPTION_TOGGLE, 218 "default": False, 219 "help": "Extract new groups from links", 220 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 221 "This is more error-prone than crawling only via forwards, but can be a way to discover " 222 "links that would otherwise remain undetected." 223 } 224 225 return options 226 227 228 def get_items(self, query): 229 """ 230 Execute a query; get messages for given parameters 231 232 Basically a wrapper around execute_queries() to call it with asyncio. 233 234 :param dict query: Query parameters, as part of the DataSet object 235 :return list: Posts, sorted by thread and post ID, in ascending order 236 """ 237 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 238 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 239 "creating it again from scratch.", is_final=True) 240 return None 241 242 self.details_cache = {} 243 self.failures_cache = set() 244 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 245 results = asyncio.run(self.execute_queries()) 246 247 if not query.get("save-session"): 248 self.dataset.delete_parameter("api_hash", instant=True) 249 self.dataset.delete_parameter("api_phone", instant=True) 250 self.dataset.delete_parameter("api_id", instant=True) 251 252 if self.flawless: 253 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 254 "been private). View the log file for details.", is_final=True) 255 256 return results 257 258 async def execute_queries(self): 259 """ 260 Get messages for queries 261 262 This is basically what would be done in get_items(), except due to 263 Telethon's architecture this needs to be called in an async method, 264 which is this one. 265 266 :return list: Collected messages 267 """ 268 # session file has been created earlier, and we can re-use it here in 269 # order to avoid having to re-enter the security code 270 query = self.parameters 271 272 session_id = SearchTelegram.create_session_id(query["api_phone"].strip(), 273 query["api_id"].strip(), 274 query["api_hash"].strip()) 275 self.dataset.log(f'Telegram session id: {session_id}') 276 session_path = Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), session_id + ".session") 277 278 client = None 279 280 try: 281 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 282 loop=self.eventloop) 283 await client.start(phone=SearchTelegram.cancel_start) 284 except RuntimeError: 285 # session is no longer useable, delete file so user will be asked 286 # for security code again. The RuntimeError is raised by 287 # `cancel_start()` 288 self.dataset.update_status( 289 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 290 is_final=True) 291 292 if client and hasattr(client, "disconnect"): 293 await client.disconnect() 294 295 if session_path.exists(): 296 session_path.unlink() 297 298 return [] 299 except Exception as e: 300 # not sure what exception specifically is triggered here, but it 301 # always means the connection failed 302 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 303 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 304 if client and hasattr(client, "disconnect"): 305 await client.disconnect() 306 return [] 307 308 # ready our parameters 309 parameters = self.dataset.get_parameters() 310 queries = [query.strip() for query in parameters.get("query", "").split(",")] 311 max_items = convert_to_int(parameters.get("items", 10), 10) 312 313 # Telethon requires the offset date to be a datetime date 314 max_date = parameters.get("max_date") 315 if max_date: 316 try: 317 max_date = datetime.fromtimestamp(int(max_date)) 318 except ValueError: 319 max_date = None 320 321 # min_date can remain an integer 322 min_date = parameters.get("min_date") 323 if min_date: 324 try: 325 min_date = int(min_date) 326 except ValueError: 327 min_date = None 328 329 posts = [] 330 try: 331 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 332 posts.append(post) 333 return posts 334 except ProcessorInterruptedException as e: 335 raise e 336 except Exception: 337 # catch-all so we can disconnect properly 338 # ...should we? 339 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 340 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 341 # May as well return what was captured, yes? 342 return posts 343 finally: 344 await client.disconnect() 345 346 async def gather_posts(self, client, queries, max_items, min_date, max_date): 347 """ 348 Gather messages for each entity for which messages are requested 349 350 :param TelegramClient client: Telegram Client 351 :param list queries: List of entities to query (as string) 352 :param int max_items: Messages to scrape per entity 353 :param int min_date: Datetime date to get posts after 354 :param int max_date: Datetime date to get posts before 355 :return list: List of messages, each message a dictionary. 356 """ 357 resolve_refs = self.parameters.get("resolve-entities") 358 359 # Adding flag to stop; using for rate limits 360 no_additional_queries = False 361 362 # This is used for the 'crawl' feature so we know at which depth a 363 # given entity was discovered 364 depth_map = { 365 entity: 0 for entity in queries 366 } 367 368 crawl_max_depth = self.parameters.get("crawl-depth", 0) 369 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 370 crawl_via_links = self.parameters.get("crawl-via-links", False) 371 372 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 373 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 374 375 # this keeps track of how often an entity not in the original query 376 # has been mentioned. When crawling is enabled and this exceeds the 377 # given threshold, the entity is added to the query 378 crawl_references = {} 379 full_query = set(queries) 380 num_queries = len(queries) 381 382 # we may not always know the 'entity username' for an entity ID, so 383 # keep a reference map as we go 384 entity_id_map = {} 385 386 # Collect queries 387 # Use while instead of for so we can change queries during iteration 388 # this is needed for the 'crawl' feature which can discover new 389 # entities during crawl 390 processed = 0 391 total_messages = 0 392 while queries: 393 query = queries.pop(0) 394 395 delay = 10 396 retries = 0 397 processed += 1 398 self.dataset.update_progress(processed / num_queries) 399 400 if no_additional_queries: 401 # Note that we are not completing this query 402 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 403 continue 404 405 while True: 406 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 407 entity_posts = 0 408 discovered = 0 409 try: 410 async for message in client.iter_messages(entity=query, offset_date=max_date): 411 entity_posts += 1 412 total_messages += 1 413 if self.interrupted: 414 raise ProcessorInterruptedException( 415 "Interrupted while fetching message data from the Telegram API") 416 417 if entity_posts % 100 == 0: 418 self.dataset.update_status( 419 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 420 421 if message.action is not None: 422 # e.g. someone joins the channel - not an actual message 423 continue 424 425 # todo: possibly enrich object with e.g. the name of 426 # the channel a message was forwarded from (but that 427 # needs extra API requests...) 428 serialized_message = SearchTelegram.serialize_obj(message) 429 if "_chat" in serialized_message: 430 # Add query ID to check if queries have been crawled previously 431 full_query.add(serialized_message["_chat"]["id"]) 432 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 433 # once we know what a channel ID resolves to, use the username instead so it is easier to 434 # understand for the user 435 entity_id_map[query] = serialized_message["_chat"]["username"] 436 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 437 438 if resolve_refs: 439 serialized_message = await self.resolve_groups(client, serialized_message) 440 441 # Stop if we're below the min date 442 if min_date and serialized_message.get("date") < min_date: 443 break 444 445 # if crawling is enabled, see if we found something to add to the query 446 linked_entities = set() 447 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 448 message_fwd = serialized_message.get("fwd_from") 449 fwd_from = None 450 fwd_source_type = None 451 if message_fwd and message_fwd.get("from_id"): 452 if message_fwd["from_id"].get("_type") == "PeerChannel": 453 # Legacy(?) data structure (pre 2024/7/22) 454 # even if we haven't resolved the ID, we can feed the numeric ID 455 # to Telethon! this is nice because it means we don't have to 456 # resolve entities to crawl iteratively 457 fwd_from = int(message_fwd["from_id"]["channel_id"]) 458 fwd_source_type = "channel" 459 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 460 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 461 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 462 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 463 fwd_source_type = "channel" 464 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 465 # forwards can also come from users 466 # these can never be followed, so don't add these to the crawl, but do document them 467 fwd_source_type = "user" 468 else: 469 print(json.dumps(message_fwd)) 470 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 471 fwd_source_type = "unknown" 472 473 if fwd_from: 474 linked_entities.add(fwd_from) 475 476 477 if crawl_via_links: 478 # t.me links 479 all_links = ural.urls_from_text(serialized_message["message"]) 480 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 481 for link in all_links: 482 if link.startswith("+"): 483 # invite links 484 continue 485 486 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 487 linked_entities.add(entity_name) 488 489 # @references 490 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 491 for reference in references: 492 if reference.startswith("@"): 493 reference = reference[1:] 494 495 reference = reference.split("/")[0] 496 497 linked_entities.add(reference) 498 499 # Check if fwd_from or the resolved entity ID is already queued or has been queried 500 for link in linked_entities: 501 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 502 # new entity discovered! 503 # might be discovered (before collection) multiple times, so retain lowest depth 504 # print(f"Potentially crawling {link}") 505 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 506 if link not in crawl_references: 507 crawl_references[link] = 0 508 crawl_references[link] += 1 509 510 # Add to queries if it has been referenced enough times 511 if crawl_references[link] >= crawl_msg_threshold: 512 queries.append(link) 513 full_query.add(link) 514 num_queries += 1 515 discovered += 1 516 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 517 518 519 520 serialized_message["4CAT_metadata"] = { 521 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 522 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 523 "query_depth": depth_map.get(query, 0) 524 } 525 yield serialized_message 526 527 if entity_posts >= max_items: 528 break 529 530 except ChannelPrivateError: 531 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 532 self.flawless += 1 533 534 except (UsernameInvalidError,): 535 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 536 self.flawless += 1 537 538 except FloodWaitError as e: 539 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 540 if e.seconds < self.end_if_rate_limited: 541 time.sleep(e.seconds) 542 continue 543 else: 544 self.flawless += 1 545 no_additional_queries = True 546 self.dataset.update_status( 547 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 548 break 549 550 except BadRequestError as e: 551 self.dataset.update_status( 552 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 553 self.flawless += 1 554 555 except ValueError as e: 556 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 557 self.flawless += 1 558 559 except ChannelPrivateError as e: 560 self.dataset.update_status( 561 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 562 break 563 564 except TimeoutError: 565 if retries < 3: 566 self.dataset.update_status( 567 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 568 self.flawless += 1 569 break 570 571 self.dataset.update_status( 572 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 573 time.sleep(delay) 574 delay *= 2 575 continue 576 577 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 578 break 579 580 async def resolve_groups(self, client, message): 581 """ 582 Recursively resolve references to groups and users 583 584 :param client: Telethon client instance 585 :param dict message: Message, as already mapped by serialize_obj 586 :return: Resolved dictionary 587 """ 588 resolved_message = message.copy() 589 for key, value in message.items(): 590 try: 591 if type(value) is not dict: 592 # if it's not a dict, we never have to resolve it, as it 593 # does not represent an entity 594 continue 595 596 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 597 # forwarded from a channel! 598 if value["channel_id"] in self.failures_cache: 599 continue 600 601 if value["channel_id"] not in self.details_cache: 602 channel = await client(GetFullChannelRequest(value["channel_id"])) 603 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 604 605 resolved_message[key] = self.details_cache[value["channel_id"]] 606 resolved_message[key]["channel_id"] = value["channel_id"] 607 608 elif "_type" in value and value["_type"] == "PeerUser": 609 # a user! 610 if value["user_id"] in self.failures_cache: 611 continue 612 613 if value["user_id"] not in self.details_cache: 614 user = await client(GetFullUserRequest(value["user_id"])) 615 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 616 617 resolved_message[key] = self.details_cache[value["user_id"]] 618 resolved_message[key]["user_id"] = value["user_id"] 619 else: 620 resolved_message[key] = await self.resolve_groups(client, value) 621 622 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 623 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 624 if type(e) in (ChannelPrivateError, UsernameInvalidError): 625 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 626 else: 627 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 628 629 return resolved_message 630 631 @staticmethod 632 def cancel_start(): 633 """ 634 Replace interactive phone number input in Telethon 635 636 By default, if Telethon cannot use the given session file to 637 authenticate, it will interactively prompt the user for a phone 638 number on the command line. That is not useful here, so instead 639 raise a RuntimeError. This will be caught below and the user will 640 be told they need to re-authenticate via 4CAT. 641 """ 642 raise RuntimeError("Connection cancelled") 643 644 @staticmethod 645 def map_item(message): 646 """ 647 Convert Message object to 4CAT-ready data object 648 649 :param Message message: Message to parse 650 :return dict: 4CAT-compatible item object 651 """ 652 if message["_chat"]["username"]: 653 # chats can apparently not have usernames??? 654 # truly telegram objects are way too lenient for their own good 655 thread = message["_chat"]["username"] 656 elif message["_chat"]["title"]: 657 thread = re.sub(r"\s", "", message["_chat"]["title"]) 658 else: 659 # just give up 660 thread = "unknown" 661 662 # determine username 663 # API responses only include the user *ID*, not the username, and to 664 # complicate things further not everyone is a user and not everyone 665 # has a username. If no username is available, try the first and 666 # last name someone has supplied 667 fullname = "" 668 username = "" 669 user_id = message["_sender"]["id"] if message.get("_sender") else "" 670 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 671 672 if message.get("_sender") and message["_sender"].get("username"): 673 username = message["_sender"]["username"] 674 675 if message.get("_sender") and message["_sender"].get("first_name"): 676 fullname += message["_sender"]["first_name"] 677 678 if message.get("_sender") and message["_sender"].get("last_name"): 679 fullname += " " + message["_sender"]["last_name"] 680 681 fullname = fullname.strip() 682 683 # determine media type 684 # these store some extra information of the attachment in 685 # attachment_data. Since the final result will be serialised as a csv 686 # file, we can only store text content. As such some media data is 687 # serialised as JSON. 688 attachment_type = SearchTelegram.get_media_type(message["media"]) 689 attachment_filename = "" 690 691 if attachment_type == "contact": 692 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 693 if message["media"].get('contact', False): 694 # Old datastructure 695 attachment = message["media"]["contact"] 696 elif all([property in message["media"].keys() for property in contact_data]): 697 # New datastructure 2022/7/25 698 attachment = message["media"] 699 else: 700 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 701 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 702 703 elif attachment_type == "document": 704 # videos, etc 705 # This could add a separate routine for videos to make them a 706 # separate type, which could then be scraped later, etc 707 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 708 if attachment_type == "video": 709 attachment = message["media"]["document"] 710 attachment_data = json.dumps({ 711 "id": attachment["id"], 712 "dc_id": attachment["dc_id"], 713 "file_reference": attachment["file_reference"], 714 }) 715 else: 716 attachment_data = "" 717 718 # elif attachment_type in ("geo", "geo_live"): 719 # untested whether geo_live is significantly different from geo 720 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 721 722 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 723 # we don't actually store any metadata about the photo, since very 724 # little of the metadata attached is of interest. Instead, the 725 # actual photos may be downloaded via a processor that is run on the 726 # search results 727 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 728 attachment_data = json.dumps({ 729 "id": attachment["id"], 730 "dc_id": attachment["dc_id"], 731 "file_reference": attachment["file_reference"], 732 }) 733 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 734 735 elif attachment_type == "poll": 736 # unfortunately poll results are only available when someone has 737 # actually voted on the poll - that will usually not be the case, 738 # so we store -1 as the vote count 739 attachment = message["media"] 740 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 741 attachment_data = json.dumps({ 742 "question": attachment["poll"]["question"], 743 "voters": attachment["results"]["total_voters"], 744 "answers": [{ 745 "answer": options[answer["option"]], 746 "votes": answer["voters"] 747 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 748 "answer": options[option], 749 "votes": -1 750 } for option in options] 751 }) 752 753 else: 754 attachment_data = "" 755 756 # was the message forwarded from somewhere and if so when? 757 forwarded_timestamp = "" 758 forwarded_name = "" 759 forwarded_id = "" 760 forwarded_username = "" 761 if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int: 762 # forward information is spread out over a lot of places 763 # we can identify, in order of usefulness: username, full name, 764 # and ID. But not all of these are always available, and not 765 # always in the same place either 766 forwarded_timestamp = int(message["fwd_from"]["date"]) 767 from_data = message["fwd_from"]["from_id"] 768 769 if from_data: 770 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 771 772 if message["fwd_from"].get("from_name"): 773 forwarded_name = message["fwd_from"].get("from_name") 774 775 if from_data and from_data.get("from_name"): 776 forwarded_name = message["fwd_from"]["from_name"] 777 778 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 779 from_data["user"] = from_data["users"][0] 780 781 if from_data and ("user" in from_data or "chats" in from_data): 782 # 'resolve entities' was enabled for this dataset 783 if "user" in from_data: 784 if from_data["user"].get("username"): 785 forwarded_username = from_data["user"]["username"] 786 787 if from_data["user"].get("first_name"): 788 forwarded_name = from_data["user"]["first_name"] 789 if message["fwd_from"].get("last_name"): 790 forwarded_name += " " + from_data["user"]["last_name"] 791 792 forwarded_name = forwarded_name.strip() 793 794 elif "chats" in from_data: 795 channel_id = from_data.get("channel_id") 796 for chat in from_data["chats"]: 797 if chat["id"] == channel_id or channel_id is None: 798 forwarded_username = chat["username"] 799 800 elif message.get("_forward") and message["_forward"].get("_chat"): 801 if message["_forward"]["_chat"].get("username"): 802 forwarded_username = message["_forward"]["_chat"]["username"] 803 804 if message["_forward"]["_chat"].get("title"): 805 forwarded_name = message["_forward"]["_chat"]["title"] 806 807 link_title = "" 808 link_attached = "" 809 link_description = "" 810 reactions = "" 811 812 if message.get("media") and message["media"].get("webpage"): 813 link_title = message["media"]["webpage"].get("title") 814 link_attached = message["media"]["webpage"].get("url") 815 link_description = message["media"]["webpage"].get("description") 816 817 if message.get("reactions") and message["reactions"].get("results"): 818 for reaction in message["reactions"]["results"]: 819 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 820 # Updated to support new reaction datastructure 821 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 822 elif type(reaction["reaction"]) is str and "count" in reaction: 823 reactions += reaction["reaction"] * reaction["count"] 824 else: 825 # Failsafe; can be updated to support formatting of new datastructures in the future 826 reactions += f"{reaction}, " 827 828 is_reply = False 829 reply_to = "" 830 if message.get("reply_to"): 831 is_reply = True 832 reply_to = message["reply_to"].get("reply_to_msg_id", "") 833 834 # t.me links 835 linked_entities = set() 836 all_links = ural.urls_from_text(message["message"]) 837 all_links = [link.split("t.me/")[1] for link in all_links if 838 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 839 840 for link in all_links: 841 if link.startswith("+"): 842 # invite links 843 continue 844 845 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 846 linked_entities.add(entity_name) 847 848 # @references 849 # in execute_queries we use MessageEntityMention to get these 850 # however, after serializing these objects we only have the offsets of 851 # the mentioned username, and telegram does weird unicode things to its 852 # offsets meaning we can't just substring the message. So use a regex 853 # as a 'good enough' solution 854 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 855 856 # make this case-insensitive since people may use different casing in 857 # messages than the 'official' username for example 858 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 859 all_ci_connections = set() 860 seen = set() 861 for connection in all_connections: 862 if connection.lower() not in seen: 863 all_ci_connections.add(connection) 864 seen.add(connection.lower()) 865 866 return MappedItem({ 867 "id": f"{message['_chat']['username']}-{message['id']}", 868 "thread_id": thread, 869 "chat": message["_chat"]["username"], 870 "author": user_id, 871 "author_username": username, 872 "author_name": fullname, 873 "author_is_bot": "yes" if user_is_bot else "no", 874 "body": message["message"], 875 "body_markdown": message.get("message_markdown", MissingMappedField("")), 876 "is_reply": is_reply, 877 "reply_to": reply_to, 878 "views": message["views"] if message["views"] else "", 879 # "forwards": message.get("forwards", MissingMappedField(0)), 880 "reactions": reactions, 881 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 882 "unix_timestamp": int(message["date"]), 883 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 884 "edit_date"] else "", 885 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 886 "author_forwarded_from_name": forwarded_name, 887 "author_forwarded_from_username": forwarded_username, 888 "author_forwarded_from_id": forwarded_id, 889 "entities_linked": ",".join(linked_entities), 890 "entities_mentioned": ",".join(all_mentions), 891 "all_connections": ",".join(all_ci_connections), 892 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 893 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 894 "unix_timestamp_forwarded_from": forwarded_timestamp, 895 "link_title": link_title, 896 "link_description": link_description, 897 "link_attached": link_attached, 898 "attachment_type": attachment_type, 899 "attachment_data": attachment_data, 900 "attachment_filename": attachment_filename 901 }) 902 903 @staticmethod 904 def get_media_type(media): 905 """ 906 Get media type for a Telegram attachment 907 908 :param media: Media object 909 :return str: Textual identifier of the media type 910 """ 911 try: 912 return { 913 "NoneType": "", 914 "MessageMediaContact": "contact", 915 "MessageMediaDocument": "document", 916 "MessageMediaEmpty": "", 917 "MessageMediaGame": "game", 918 "MessageMediaGeo": "geo", 919 "MessageMediaGeoLive": "geo_live", 920 "MessageMediaInvoice": "invoice", 921 "MessageMediaPhoto": "photo", 922 "MessageMediaPoll": "poll", 923 "MessageMediaUnsupported": "unsupported", 924 "MessageMediaVenue": "venue", 925 "MessageMediaWebPage": "url" 926 }[media.get("_type", None)] 927 except (AttributeError, KeyError): 928 return "" 929 930 @staticmethod 931 def serialize_obj(input_obj): 932 """ 933 Serialize an object as a dictionary 934 935 Telethon message objects are not serializable by themselves, but most 936 relevant attributes are simply struct classes. This function replaces 937 those that are not with placeholders and then returns a dictionary that 938 can be serialized as JSON. 939 940 :param obj: Object to serialize 941 :return: Serialized object 942 """ 943 scalars = (int, str, float, list, tuple, set, bool) 944 945 if type(input_obj) in scalars or input_obj is None: 946 return input_obj 947 948 if type(input_obj) is not dict: 949 obj = input_obj.__dict__ 950 else: 951 obj = input_obj.copy() 952 953 mapped_obj = {} 954 for item, value in obj.items(): 955 if type(value) is datetime: 956 mapped_obj[item] = value.timestamp() 957 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 958 mapped_obj[item] = SearchTelegram.serialize_obj(value) 959 elif type(value) is list: 960 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 961 elif type(value) is bytes: 962 mapped_obj[item] = value.hex() 963 elif type(value) not in scalars and value is not None: 964 # type we can't make sense of here 965 continue 966 else: 967 mapped_obj[item] = value 968 969 # Add the _type if the original object was a telethon type 970 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 971 mapped_obj["_type"] = type(input_obj).__name__ 972 973 # Store the markdown-formatted text 974 if type(input_obj).__name__ == "Message": 975 mapped_obj["message_markdown"] = input_obj.text 976 977 return mapped_obj 978 979 @staticmethod 980 def validate_query(query, request, config): 981 """ 982 Validate Telegram query 983 984 :param config: 985 :param dict query: Query parameters, from client-side. 986 :param request: Flask request 987 :param User user: User object of user who has submitted the query 988 :param ConfigManager config: Configuration reader (context-aware) 989 :return dict: Safe query parameters 990 """ 991 # no query 4 u 992 if not query.get("query", "").strip(): 993 raise QueryParametersException("You must provide a search query.") 994 995 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 996 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 997 998 all_posts = config.get("telegram-search.can_query_all_messages", False) 999 max_entities = config.get("telegram-search.max_entities", 25) 1000 1001 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options( 1002 config=config)["max_posts"]["max"]) 1003 1004 # reformat queries to be a comma-separated list with no wrapping 1005 # whitespace 1006 whitespace = re.compile(r"\s+") 1007 items = whitespace.sub("", query.get("query").replace("\n", ",")) 1008 if max_entities > 0 and len(items.split(",")) > max_entities: 1009 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 1010 1011 sanitized_items = [] 1012 # handle telegram URLs 1013 for item in items.split(","): 1014 if not item.strip(): 1015 continue 1016 item = re.sub(r"^https?://t\.me/", "", item) 1017 item = re.sub(r"^/?s/", "", item) 1018 item = re.sub(r"[/]*$", "", item) 1019 sanitized_items.append(item) 1020 1021 # the dates need to make sense as a range to search within 1022 min_date, max_date = query.get("daterange") 1023 1024 # now check if there is an active API session 1025 if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous: 1026 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1027 "accounts.") 1028 1029 # check for the information we need 1030 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1031 query.get("api_hash")) 1032 config.user.set_value("telegram.session", session_id) 1033 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session") 1034 1035 client = None 1036 1037 # API ID is always a number, if it's not, we can immediately fail 1038 try: 1039 api_id = int(query.get("api_id")) 1040 except ValueError: 1041 raise QueryParametersException("Invalid API ID.") 1042 1043 # maybe we've entered a code already and submitted it with the request 1044 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1045 code_callback = lambda: request.form.get("option-security-code") # noqa: E731 1046 max_attempts = 1 1047 else: 1048 code_callback = lambda: -1 # noqa: E731 1049 # max_attempts = 0 because authing will always fail: we can't wait for 1050 # the code to be entered interactively, we'll need to do a new request 1051 # but we can't just immediately return, we still need to call start() 1052 # to get telegram to send us a code 1053 max_attempts = 0 1054 1055 # now try authenticating 1056 needs_code = False 1057 try: 1058 loop = asyncio.new_event_loop() 1059 asyncio.set_event_loop(loop) 1060 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1061 1062 try: 1063 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1064 1065 except ValueError as e: 1066 # this happens if 2FA is required 1067 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1068 f"does not support this authentication mode for Telegram. ({e})") 1069 except RuntimeError: 1070 # A code was sent to the given phone number 1071 needs_code = True 1072 except FloodWaitError as e: 1073 # uh oh, we got rate-limited 1074 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1075 str(e).split("(")[0] + ".") 1076 except ApiIdInvalidError: 1077 # wrong credentials 1078 raise QueryParametersException("Your API credentials are invalid.") 1079 except PhoneNumberInvalidError: 1080 # wrong phone number 1081 raise QueryParametersException( 1082 "The phone number provided is not a valid phone number for these credentials.") 1083 except RPCError as e: 1084 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1085 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1086 f"Telegram app(s) to the latest version to proceed ({e}).") 1087 except Exception as e: 1088 # ? 1089 raise QueryParametersException( 1090 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1091 finally: 1092 if client: 1093 client.disconnect() 1094 1095 if needs_code: 1096 raise QueryNeedsFurtherInputException(config={ 1097 "code-info": { 1098 "type": UserInput.OPTION_INFO, 1099 "help": "Please enter the security code that was sent to your Telegram app to continue." 1100 }, 1101 "security-code": { 1102 "type": UserInput.OPTION_TEXT, 1103 "help": "Security code", 1104 "sensitive": True 1105 }}) 1106 1107 # simple! 1108 return { 1109 "items": num_items, 1110 "query": ",".join(sanitized_items), 1111 "api_id": query.get("api_id"), 1112 "api_hash": query.get("api_hash"), 1113 "api_phone": query.get("api_phone"), 1114 "save-session": query.get("save-session"), 1115 "resolve-entities": query.get("resolve-entities"), 1116 "min_date": min_date, 1117 "max_date": max_date, 1118 "crawl-depth": query.get("crawl-depth"), 1119 "crawl-threshold": query.get("crawl-threshold"), 1120 "crawl-via-links": query.get("crawl-via-links") 1121 } 1122 1123 @staticmethod 1124 def create_session_id(api_phone, api_id, api_hash): 1125 """ 1126 Generate a filename for the session file 1127 1128 This is a combination of phone number and API credentials, but hashed 1129 so that one cannot actually derive someone's phone number from it. 1130 1131 :param str api_phone: Phone number for API ID 1132 :param int api_id: Telegram API ID 1133 :param str api_hash: Telegram API Hash 1134 :return str: A hash value derived from the input 1135 """ 1136 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1137 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Search Telegram via API
82 @classmethod 83 def get_options(cls, parent_dataset=None, config=None): 84 """ 85 Get processor options 86 87 Just updates the description of the entities field based on the 88 configured max entities. 89 90 :param DataSet parent_dataset: An object representing the dataset that 91 the processor would be run on 92 :param ConfigManager|None config: Configuration reader (context-aware) 93 """ 94 95 max_entities = config.get("telegram-search.max_entities", 25) 96 options = { 97 "intro": { 98 "type": UserInput.OPTION_INFO, 99 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 100 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 101 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 102 "authentication for Telegram." 103 }, 104 "api_id": { 105 "type": UserInput.OPTION_TEXT, 106 "help": "API ID", 107 "cache": True, 108 }, 109 "api_hash": { 110 "type": UserInput.OPTION_TEXT, 111 "help": "API Hash", 112 "cache": True, 113 }, 114 "api_phone": { 115 "type": UserInput.OPTION_TEXT, 116 "help": "Phone number", 117 "cache": True, 118 "default": "+xxxxxxxxxx" 119 }, 120 "divider": { 121 "type": UserInput.OPTION_DIVIDER 122 }, 123 "query-intro": { 124 "type": UserInput.OPTION_INFO, 125 "help": "Separate with commas or line breaks." 126 }, 127 "query": { 128 "type": UserInput.OPTION_TEXT_LARGE, 129 "help": "Entities to scrape", 130 "tooltip": "Separate with commas or line breaks." 131 }, 132 "max_posts": { 133 "type": UserInput.OPTION_TEXT, 134 "help": "Messages per group", 135 "min": 1, 136 "max": 50000, 137 "default": 10 138 }, 139 "daterange": { 140 "type": UserInput.OPTION_DATERANGE, 141 "help": "Date range" 142 }, 143 "divider-2": { 144 "type": UserInput.OPTION_DIVIDER 145 }, 146 "info-sensitive": { 147 "type": UserInput.OPTION_INFO, 148 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 149 "there while data is fetched. After the dataset has been created your credentials will be " 150 "deleted from the server, unless you enable the option below. If you want to download images " 151 "attached to the messages in your collected data, you need to enable this option. Your " 152 "credentials will never be visible to other users and can be erased later via the result page." 153 }, 154 "save-session": { 155 "type": UserInput.OPTION_TOGGLE, 156 "help": "Save session:", 157 "default": False 158 }, 159 "resolve-entities-intro": { 160 "type": UserInput.OPTION_INFO, 161 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 162 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 163 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 164 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 165 "to finish capturing. It will also dramatically increase the disk space needed to store the " 166 "data, so only enable this if you really need it!" 167 }, 168 "resolve-entities": { 169 "type": UserInput.OPTION_TOGGLE, 170 "help": "Resolve references", 171 "default": False, 172 } 173 } 174 175 if max_entities: 176 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 177 f"(channels or groups) at a time. Separate with line breaks or commas.") 178 179 all_messages = config.get("telegram-search.can_query_all_messages", False) 180 if all_messages: 181 if "max" in options["max_posts"]: 182 del options["max_posts"]["max"] 183 else: 184 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 185 f"{options['max_posts']['max']:,} messages per entity.") 186 187 if config.get("telegram-search.max_crawl_depth", 0) > 0: 188 options["crawl_intro"] = { 189 "type": UserInput.OPTION_INFO, 190 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 191 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 192 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 193 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 194 "progress cannot be accurately tracked when you use this feature." 195 } 196 options["crawl-depth"] = { 197 "type": UserInput.OPTION_TEXT, 198 "coerce_type": int, 199 "min": 0, 200 "max": config.get("telegram-search.max_crawl_depth"), 201 "default": 0, 202 "help": "Crawl depth", 203 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 204 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 205 "starting entities." 206 } 207 options["crawl-threshold"] = { 208 "type": UserInput.OPTION_TEXT, 209 "coerce_type": int, 210 "min": 0, 211 "default": 5, 212 "help": "Crawl threshold", 213 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 214 "references discovered below the max crawl depth are taken into account." 215 } 216 options["crawl-via-links"] = { 217 "type": UserInput.OPTION_TOGGLE, 218 "default": False, 219 "help": "Extract new groups from links", 220 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 221 "This is more error-prone than crawling only via forwards, but can be a way to discover " 222 "links that would otherwise remain undetected." 223 } 224 225 return options
Get processor options
Just updates the description of the entities field based on the configured max entities.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
- ConfigManager|None config: Configuration reader (context-aware)
228 def get_items(self, query): 229 """ 230 Execute a query; get messages for given parameters 231 232 Basically a wrapper around execute_queries() to call it with asyncio. 233 234 :param dict query: Query parameters, as part of the DataSet object 235 :return list: Posts, sorted by thread and post ID, in ascending order 236 """ 237 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 238 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 239 "creating it again from scratch.", is_final=True) 240 return None 241 242 self.details_cache = {} 243 self.failures_cache = set() 244 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 245 results = asyncio.run(self.execute_queries()) 246 247 if not query.get("save-session"): 248 self.dataset.delete_parameter("api_hash", instant=True) 249 self.dataset.delete_parameter("api_phone", instant=True) 250 self.dataset.delete_parameter("api_id", instant=True) 251 252 if self.flawless: 253 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 254 "been private). View the log file for details.", is_final=True) 255 256 return results
Execute a query; get messages for given parameters
Basically a wrapper around execute_queries() to call it with asyncio.
Parameters
- dict query: Query parameters, as part of the DataSet object
Returns
Posts, sorted by thread and post ID, in ascending order
258 async def execute_queries(self): 259 """ 260 Get messages for queries 261 262 This is basically what would be done in get_items(), except due to 263 Telethon's architecture this needs to be called in an async method, 264 which is this one. 265 266 :return list: Collected messages 267 """ 268 # session file has been created earlier, and we can re-use it here in 269 # order to avoid having to re-enter the security code 270 query = self.parameters 271 272 session_id = SearchTelegram.create_session_id(query["api_phone"].strip(), 273 query["api_id"].strip(), 274 query["api_hash"].strip()) 275 self.dataset.log(f'Telegram session id: {session_id}') 276 session_path = Path(self.config.get("PATH_ROOT")).joinpath(self.config.get("PATH_SESSIONS"), session_id + ".session") 277 278 client = None 279 280 try: 281 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 282 loop=self.eventloop) 283 await client.start(phone=SearchTelegram.cancel_start) 284 except RuntimeError: 285 # session is no longer useable, delete file so user will be asked 286 # for security code again. The RuntimeError is raised by 287 # `cancel_start()` 288 self.dataset.update_status( 289 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 290 is_final=True) 291 292 if client and hasattr(client, "disconnect"): 293 await client.disconnect() 294 295 if session_path.exists(): 296 session_path.unlink() 297 298 return [] 299 except Exception as e: 300 # not sure what exception specifically is triggered here, but it 301 # always means the connection failed 302 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 303 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 304 if client and hasattr(client, "disconnect"): 305 await client.disconnect() 306 return [] 307 308 # ready our parameters 309 parameters = self.dataset.get_parameters() 310 queries = [query.strip() for query in parameters.get("query", "").split(",")] 311 max_items = convert_to_int(parameters.get("items", 10), 10) 312 313 # Telethon requires the offset date to be a datetime date 314 max_date = parameters.get("max_date") 315 if max_date: 316 try: 317 max_date = datetime.fromtimestamp(int(max_date)) 318 except ValueError: 319 max_date = None 320 321 # min_date can remain an integer 322 min_date = parameters.get("min_date") 323 if min_date: 324 try: 325 min_date = int(min_date) 326 except ValueError: 327 min_date = None 328 329 posts = [] 330 try: 331 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 332 posts.append(post) 333 return posts 334 except ProcessorInterruptedException as e: 335 raise e 336 except Exception: 337 # catch-all so we can disconnect properly 338 # ...should we? 339 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 340 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 341 # May as well return what was captured, yes? 342 return posts 343 finally: 344 await client.disconnect()
Get messages for queries
This is basically what would be done in get_items(), except due to Telethon's architecture this needs to be called in an async method, which is this one.
Returns
Collected messages
346 async def gather_posts(self, client, queries, max_items, min_date, max_date): 347 """ 348 Gather messages for each entity for which messages are requested 349 350 :param TelegramClient client: Telegram Client 351 :param list queries: List of entities to query (as string) 352 :param int max_items: Messages to scrape per entity 353 :param int min_date: Datetime date to get posts after 354 :param int max_date: Datetime date to get posts before 355 :return list: List of messages, each message a dictionary. 356 """ 357 resolve_refs = self.parameters.get("resolve-entities") 358 359 # Adding flag to stop; using for rate limits 360 no_additional_queries = False 361 362 # This is used for the 'crawl' feature so we know at which depth a 363 # given entity was discovered 364 depth_map = { 365 entity: 0 for entity in queries 366 } 367 368 crawl_max_depth = self.parameters.get("crawl-depth", 0) 369 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 370 crawl_via_links = self.parameters.get("crawl-via-links", False) 371 372 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 373 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 374 375 # this keeps track of how often an entity not in the original query 376 # has been mentioned. When crawling is enabled and this exceeds the 377 # given threshold, the entity is added to the query 378 crawl_references = {} 379 full_query = set(queries) 380 num_queries = len(queries) 381 382 # we may not always know the 'entity username' for an entity ID, so 383 # keep a reference map as we go 384 entity_id_map = {} 385 386 # Collect queries 387 # Use while instead of for so we can change queries during iteration 388 # this is needed for the 'crawl' feature which can discover new 389 # entities during crawl 390 processed = 0 391 total_messages = 0 392 while queries: 393 query = queries.pop(0) 394 395 delay = 10 396 retries = 0 397 processed += 1 398 self.dataset.update_progress(processed / num_queries) 399 400 if no_additional_queries: 401 # Note that we are not completing this query 402 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 403 continue 404 405 while True: 406 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 407 entity_posts = 0 408 discovered = 0 409 try: 410 async for message in client.iter_messages(entity=query, offset_date=max_date): 411 entity_posts += 1 412 total_messages += 1 413 if self.interrupted: 414 raise ProcessorInterruptedException( 415 "Interrupted while fetching message data from the Telegram API") 416 417 if entity_posts % 100 == 0: 418 self.dataset.update_status( 419 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 420 421 if message.action is not None: 422 # e.g. someone joins the channel - not an actual message 423 continue 424 425 # todo: possibly enrich object with e.g. the name of 426 # the channel a message was forwarded from (but that 427 # needs extra API requests...) 428 serialized_message = SearchTelegram.serialize_obj(message) 429 if "_chat" in serialized_message: 430 # Add query ID to check if queries have been crawled previously 431 full_query.add(serialized_message["_chat"]["id"]) 432 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 433 # once we know what a channel ID resolves to, use the username instead so it is easier to 434 # understand for the user 435 entity_id_map[query] = serialized_message["_chat"]["username"] 436 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 437 438 if resolve_refs: 439 serialized_message = await self.resolve_groups(client, serialized_message) 440 441 # Stop if we're below the min date 442 if min_date and serialized_message.get("date") < min_date: 443 break 444 445 # if crawling is enabled, see if we found something to add to the query 446 linked_entities = set() 447 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 448 message_fwd = serialized_message.get("fwd_from") 449 fwd_from = None 450 fwd_source_type = None 451 if message_fwd and message_fwd.get("from_id"): 452 if message_fwd["from_id"].get("_type") == "PeerChannel": 453 # Legacy(?) data structure (pre 2024/7/22) 454 # even if we haven't resolved the ID, we can feed the numeric ID 455 # to Telethon! this is nice because it means we don't have to 456 # resolve entities to crawl iteratively 457 fwd_from = int(message_fwd["from_id"]["channel_id"]) 458 fwd_source_type = "channel" 459 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 460 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 461 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 462 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 463 fwd_source_type = "channel" 464 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 465 # forwards can also come from users 466 # these can never be followed, so don't add these to the crawl, but do document them 467 fwd_source_type = "user" 468 else: 469 print(json.dumps(message_fwd)) 470 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 471 fwd_source_type = "unknown" 472 473 if fwd_from: 474 linked_entities.add(fwd_from) 475 476 477 if crawl_via_links: 478 # t.me links 479 all_links = ural.urls_from_text(serialized_message["message"]) 480 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 481 for link in all_links: 482 if link.startswith("+"): 483 # invite links 484 continue 485 486 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 487 linked_entities.add(entity_name) 488 489 # @references 490 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 491 for reference in references: 492 if reference.startswith("@"): 493 reference = reference[1:] 494 495 reference = reference.split("/")[0] 496 497 linked_entities.add(reference) 498 499 # Check if fwd_from or the resolved entity ID is already queued or has been queried 500 for link in linked_entities: 501 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 502 # new entity discovered! 503 # might be discovered (before collection) multiple times, so retain lowest depth 504 # print(f"Potentially crawling {link}") 505 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 506 if link not in crawl_references: 507 crawl_references[link] = 0 508 crawl_references[link] += 1 509 510 # Add to queries if it has been referenced enough times 511 if crawl_references[link] >= crawl_msg_threshold: 512 queries.append(link) 513 full_query.add(link) 514 num_queries += 1 515 discovered += 1 516 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 517 518 519 520 serialized_message["4CAT_metadata"] = { 521 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 522 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 523 "query_depth": depth_map.get(query, 0) 524 } 525 yield serialized_message 526 527 if entity_posts >= max_items: 528 break 529 530 except ChannelPrivateError: 531 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 532 self.flawless += 1 533 534 except (UsernameInvalidError,): 535 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 536 self.flawless += 1 537 538 except FloodWaitError as e: 539 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 540 if e.seconds < self.end_if_rate_limited: 541 time.sleep(e.seconds) 542 continue 543 else: 544 self.flawless += 1 545 no_additional_queries = True 546 self.dataset.update_status( 547 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 548 break 549 550 except BadRequestError as e: 551 self.dataset.update_status( 552 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 553 self.flawless += 1 554 555 except ValueError as e: 556 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 557 self.flawless += 1 558 559 except ChannelPrivateError as e: 560 self.dataset.update_status( 561 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 562 break 563 564 except TimeoutError: 565 if retries < 3: 566 self.dataset.update_status( 567 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 568 self.flawless += 1 569 break 570 571 self.dataset.update_status( 572 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 573 time.sleep(delay) 574 delay *= 2 575 continue 576 577 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 578 break
Gather messages for each entity for which messages are requested
Parameters
- TelegramClient client: Telegram Client
- list queries: List of entities to query (as string)
- int max_items: Messages to scrape per entity
- int min_date: Datetime date to get posts after
- int max_date: Datetime date to get posts before
Returns
List of messages, each message a dictionary.
580 async def resolve_groups(self, client, message): 581 """ 582 Recursively resolve references to groups and users 583 584 :param client: Telethon client instance 585 :param dict message: Message, as already mapped by serialize_obj 586 :return: Resolved dictionary 587 """ 588 resolved_message = message.copy() 589 for key, value in message.items(): 590 try: 591 if type(value) is not dict: 592 # if it's not a dict, we never have to resolve it, as it 593 # does not represent an entity 594 continue 595 596 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 597 # forwarded from a channel! 598 if value["channel_id"] in self.failures_cache: 599 continue 600 601 if value["channel_id"] not in self.details_cache: 602 channel = await client(GetFullChannelRequest(value["channel_id"])) 603 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 604 605 resolved_message[key] = self.details_cache[value["channel_id"]] 606 resolved_message[key]["channel_id"] = value["channel_id"] 607 608 elif "_type" in value and value["_type"] == "PeerUser": 609 # a user! 610 if value["user_id"] in self.failures_cache: 611 continue 612 613 if value["user_id"] not in self.details_cache: 614 user = await client(GetFullUserRequest(value["user_id"])) 615 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 616 617 resolved_message[key] = self.details_cache[value["user_id"]] 618 resolved_message[key]["user_id"] = value["user_id"] 619 else: 620 resolved_message[key] = await self.resolve_groups(client, value) 621 622 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 623 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 624 if type(e) in (ChannelPrivateError, UsernameInvalidError): 625 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 626 else: 627 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 628 629 return resolved_message
Recursively resolve references to groups and users
Parameters
- client: Telethon client instance
- dict message: Message, as already mapped by serialize_obj
Returns
Resolved dictionary
631 @staticmethod 632 def cancel_start(): 633 """ 634 Replace interactive phone number input in Telethon 635 636 By default, if Telethon cannot use the given session file to 637 authenticate, it will interactively prompt the user for a phone 638 number on the command line. That is not useful here, so instead 639 raise a RuntimeError. This will be caught below and the user will 640 be told they need to re-authenticate via 4CAT. 641 """ 642 raise RuntimeError("Connection cancelled")
Replace interactive phone number input in Telethon
By default, if Telethon cannot use the given session file to authenticate, it will interactively prompt the user for a phone number on the command line. That is not useful here, so instead raise a RuntimeError. This will be caught below and the user will be told they need to re-authenticate via 4CAT.
644 @staticmethod 645 def map_item(message): 646 """ 647 Convert Message object to 4CAT-ready data object 648 649 :param Message message: Message to parse 650 :return dict: 4CAT-compatible item object 651 """ 652 if message["_chat"]["username"]: 653 # chats can apparently not have usernames??? 654 # truly telegram objects are way too lenient for their own good 655 thread = message["_chat"]["username"] 656 elif message["_chat"]["title"]: 657 thread = re.sub(r"\s", "", message["_chat"]["title"]) 658 else: 659 # just give up 660 thread = "unknown" 661 662 # determine username 663 # API responses only include the user *ID*, not the username, and to 664 # complicate things further not everyone is a user and not everyone 665 # has a username. If no username is available, try the first and 666 # last name someone has supplied 667 fullname = "" 668 username = "" 669 user_id = message["_sender"]["id"] if message.get("_sender") else "" 670 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 671 672 if message.get("_sender") and message["_sender"].get("username"): 673 username = message["_sender"]["username"] 674 675 if message.get("_sender") and message["_sender"].get("first_name"): 676 fullname += message["_sender"]["first_name"] 677 678 if message.get("_sender") and message["_sender"].get("last_name"): 679 fullname += " " + message["_sender"]["last_name"] 680 681 fullname = fullname.strip() 682 683 # determine media type 684 # these store some extra information of the attachment in 685 # attachment_data. Since the final result will be serialised as a csv 686 # file, we can only store text content. As such some media data is 687 # serialised as JSON. 688 attachment_type = SearchTelegram.get_media_type(message["media"]) 689 attachment_filename = "" 690 691 if attachment_type == "contact": 692 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 693 if message["media"].get('contact', False): 694 # Old datastructure 695 attachment = message["media"]["contact"] 696 elif all([property in message["media"].keys() for property in contact_data]): 697 # New datastructure 2022/7/25 698 attachment = message["media"] 699 else: 700 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 701 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 702 703 elif attachment_type == "document": 704 # videos, etc 705 # This could add a separate routine for videos to make them a 706 # separate type, which could then be scraped later, etc 707 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 708 if attachment_type == "video": 709 attachment = message["media"]["document"] 710 attachment_data = json.dumps({ 711 "id": attachment["id"], 712 "dc_id": attachment["dc_id"], 713 "file_reference": attachment["file_reference"], 714 }) 715 else: 716 attachment_data = "" 717 718 # elif attachment_type in ("geo", "geo_live"): 719 # untested whether geo_live is significantly different from geo 720 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 721 722 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 723 # we don't actually store any metadata about the photo, since very 724 # little of the metadata attached is of interest. Instead, the 725 # actual photos may be downloaded via a processor that is run on the 726 # search results 727 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 728 attachment_data = json.dumps({ 729 "id": attachment["id"], 730 "dc_id": attachment["dc_id"], 731 "file_reference": attachment["file_reference"], 732 }) 733 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 734 735 elif attachment_type == "poll": 736 # unfortunately poll results are only available when someone has 737 # actually voted on the poll - that will usually not be the case, 738 # so we store -1 as the vote count 739 attachment = message["media"] 740 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 741 attachment_data = json.dumps({ 742 "question": attachment["poll"]["question"], 743 "voters": attachment["results"]["total_voters"], 744 "answers": [{ 745 "answer": options[answer["option"]], 746 "votes": answer["voters"] 747 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 748 "answer": options[option], 749 "votes": -1 750 } for option in options] 751 }) 752 753 else: 754 attachment_data = "" 755 756 # was the message forwarded from somewhere and if so when? 757 forwarded_timestamp = "" 758 forwarded_name = "" 759 forwarded_id = "" 760 forwarded_username = "" 761 if message.get("fwd_from") and "from_id" in message["fwd_from"] and type(message["fwd_from"]["from_id"]) is not int: 762 # forward information is spread out over a lot of places 763 # we can identify, in order of usefulness: username, full name, 764 # and ID. But not all of these are always available, and not 765 # always in the same place either 766 forwarded_timestamp = int(message["fwd_from"]["date"]) 767 from_data = message["fwd_from"]["from_id"] 768 769 if from_data: 770 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 771 772 if message["fwd_from"].get("from_name"): 773 forwarded_name = message["fwd_from"].get("from_name") 774 775 if from_data and from_data.get("from_name"): 776 forwarded_name = message["fwd_from"]["from_name"] 777 778 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 779 from_data["user"] = from_data["users"][0] 780 781 if from_data and ("user" in from_data or "chats" in from_data): 782 # 'resolve entities' was enabled for this dataset 783 if "user" in from_data: 784 if from_data["user"].get("username"): 785 forwarded_username = from_data["user"]["username"] 786 787 if from_data["user"].get("first_name"): 788 forwarded_name = from_data["user"]["first_name"] 789 if message["fwd_from"].get("last_name"): 790 forwarded_name += " " + from_data["user"]["last_name"] 791 792 forwarded_name = forwarded_name.strip() 793 794 elif "chats" in from_data: 795 channel_id = from_data.get("channel_id") 796 for chat in from_data["chats"]: 797 if chat["id"] == channel_id or channel_id is None: 798 forwarded_username = chat["username"] 799 800 elif message.get("_forward") and message["_forward"].get("_chat"): 801 if message["_forward"]["_chat"].get("username"): 802 forwarded_username = message["_forward"]["_chat"]["username"] 803 804 if message["_forward"]["_chat"].get("title"): 805 forwarded_name = message["_forward"]["_chat"]["title"] 806 807 link_title = "" 808 link_attached = "" 809 link_description = "" 810 reactions = "" 811 812 if message.get("media") and message["media"].get("webpage"): 813 link_title = message["media"]["webpage"].get("title") 814 link_attached = message["media"]["webpage"].get("url") 815 link_description = message["media"]["webpage"].get("description") 816 817 if message.get("reactions") and message["reactions"].get("results"): 818 for reaction in message["reactions"]["results"]: 819 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 820 # Updated to support new reaction datastructure 821 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 822 elif type(reaction["reaction"]) is str and "count" in reaction: 823 reactions += reaction["reaction"] * reaction["count"] 824 else: 825 # Failsafe; can be updated to support formatting of new datastructures in the future 826 reactions += f"{reaction}, " 827 828 is_reply = False 829 reply_to = "" 830 if message.get("reply_to"): 831 is_reply = True 832 reply_to = message["reply_to"].get("reply_to_msg_id", "") 833 834 # t.me links 835 linked_entities = set() 836 all_links = ural.urls_from_text(message["message"]) 837 all_links = [link.split("t.me/")[1] for link in all_links if 838 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 839 840 for link in all_links: 841 if link.startswith("+"): 842 # invite links 843 continue 844 845 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 846 linked_entities.add(entity_name) 847 848 # @references 849 # in execute_queries we use MessageEntityMention to get these 850 # however, after serializing these objects we only have the offsets of 851 # the mentioned username, and telegram does weird unicode things to its 852 # offsets meaning we can't just substring the message. So use a regex 853 # as a 'good enough' solution 854 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 855 856 # make this case-insensitive since people may use different casing in 857 # messages than the 'official' username for example 858 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 859 all_ci_connections = set() 860 seen = set() 861 for connection in all_connections: 862 if connection.lower() not in seen: 863 all_ci_connections.add(connection) 864 seen.add(connection.lower()) 865 866 return MappedItem({ 867 "id": f"{message['_chat']['username']}-{message['id']}", 868 "thread_id": thread, 869 "chat": message["_chat"]["username"], 870 "author": user_id, 871 "author_username": username, 872 "author_name": fullname, 873 "author_is_bot": "yes" if user_is_bot else "no", 874 "body": message["message"], 875 "body_markdown": message.get("message_markdown", MissingMappedField("")), 876 "is_reply": is_reply, 877 "reply_to": reply_to, 878 "views": message["views"] if message["views"] else "", 879 # "forwards": message.get("forwards", MissingMappedField(0)), 880 "reactions": reactions, 881 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 882 "unix_timestamp": int(message["date"]), 883 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 884 "edit_date"] else "", 885 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 886 "author_forwarded_from_name": forwarded_name, 887 "author_forwarded_from_username": forwarded_username, 888 "author_forwarded_from_id": forwarded_id, 889 "entities_linked": ",".join(linked_entities), 890 "entities_mentioned": ",".join(all_mentions), 891 "all_connections": ",".join(all_ci_connections), 892 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 893 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 894 "unix_timestamp_forwarded_from": forwarded_timestamp, 895 "link_title": link_title, 896 "link_description": link_description, 897 "link_attached": link_attached, 898 "attachment_type": attachment_type, 899 "attachment_data": attachment_data, 900 "attachment_filename": attachment_filename 901 })
Convert Message object to 4CAT-ready data object
Parameters
- Message message: Message to parse
Returns
4CAT-compatible item object
903 @staticmethod 904 def get_media_type(media): 905 """ 906 Get media type for a Telegram attachment 907 908 :param media: Media object 909 :return str: Textual identifier of the media type 910 """ 911 try: 912 return { 913 "NoneType": "", 914 "MessageMediaContact": "contact", 915 "MessageMediaDocument": "document", 916 "MessageMediaEmpty": "", 917 "MessageMediaGame": "game", 918 "MessageMediaGeo": "geo", 919 "MessageMediaGeoLive": "geo_live", 920 "MessageMediaInvoice": "invoice", 921 "MessageMediaPhoto": "photo", 922 "MessageMediaPoll": "poll", 923 "MessageMediaUnsupported": "unsupported", 924 "MessageMediaVenue": "venue", 925 "MessageMediaWebPage": "url" 926 }[media.get("_type", None)] 927 except (AttributeError, KeyError): 928 return ""
Get media type for a Telegram attachment
Parameters
- media: Media object
Returns
Textual identifier of the media type
930 @staticmethod 931 def serialize_obj(input_obj): 932 """ 933 Serialize an object as a dictionary 934 935 Telethon message objects are not serializable by themselves, but most 936 relevant attributes are simply struct classes. This function replaces 937 those that are not with placeholders and then returns a dictionary that 938 can be serialized as JSON. 939 940 :param obj: Object to serialize 941 :return: Serialized object 942 """ 943 scalars = (int, str, float, list, tuple, set, bool) 944 945 if type(input_obj) in scalars or input_obj is None: 946 return input_obj 947 948 if type(input_obj) is not dict: 949 obj = input_obj.__dict__ 950 else: 951 obj = input_obj.copy() 952 953 mapped_obj = {} 954 for item, value in obj.items(): 955 if type(value) is datetime: 956 mapped_obj[item] = value.timestamp() 957 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 958 mapped_obj[item] = SearchTelegram.serialize_obj(value) 959 elif type(value) is list: 960 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 961 elif type(value) is bytes: 962 mapped_obj[item] = value.hex() 963 elif type(value) not in scalars and value is not None: 964 # type we can't make sense of here 965 continue 966 else: 967 mapped_obj[item] = value 968 969 # Add the _type if the original object was a telethon type 970 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 971 mapped_obj["_type"] = type(input_obj).__name__ 972 973 # Store the markdown-formatted text 974 if type(input_obj).__name__ == "Message": 975 mapped_obj["message_markdown"] = input_obj.text 976 977 return mapped_obj
Serialize an object as a dictionary
Telethon message objects are not serializable by themselves, but most relevant attributes are simply struct classes. This function replaces those that are not with placeholders and then returns a dictionary that can be serialized as JSON.
Parameters
- obj: Object to serialize
Returns
Serialized object
979 @staticmethod 980 def validate_query(query, request, config): 981 """ 982 Validate Telegram query 983 984 :param config: 985 :param dict query: Query parameters, from client-side. 986 :param request: Flask request 987 :param User user: User object of user who has submitted the query 988 :param ConfigManager config: Configuration reader (context-aware) 989 :return dict: Safe query parameters 990 """ 991 # no query 4 u 992 if not query.get("query", "").strip(): 993 raise QueryParametersException("You must provide a search query.") 994 995 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 996 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 997 998 all_posts = config.get("telegram-search.can_query_all_messages", False) 999 max_entities = config.get("telegram-search.max_entities", 25) 1000 1001 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options( 1002 config=config)["max_posts"]["max"]) 1003 1004 # reformat queries to be a comma-separated list with no wrapping 1005 # whitespace 1006 whitespace = re.compile(r"\s+") 1007 items = whitespace.sub("", query.get("query").replace("\n", ",")) 1008 if max_entities > 0 and len(items.split(",")) > max_entities: 1009 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 1010 1011 sanitized_items = [] 1012 # handle telegram URLs 1013 for item in items.split(","): 1014 if not item.strip(): 1015 continue 1016 item = re.sub(r"^https?://t\.me/", "", item) 1017 item = re.sub(r"^/?s/", "", item) 1018 item = re.sub(r"[/]*$", "", item) 1019 sanitized_items.append(item) 1020 1021 # the dates need to make sense as a range to search within 1022 min_date, max_date = query.get("daterange") 1023 1024 # now check if there is an active API session 1025 if not hasattr(config, "user") or not config.user.is_authenticated or config.user.is_anonymous: 1026 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1027 "accounts.") 1028 1029 # check for the information we need 1030 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1031 query.get("api_hash")) 1032 config.user.set_value("telegram.session", session_id) 1033 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session") 1034 1035 client = None 1036 1037 # API ID is always a number, if it's not, we can immediately fail 1038 try: 1039 api_id = int(query.get("api_id")) 1040 except ValueError: 1041 raise QueryParametersException("Invalid API ID.") 1042 1043 # maybe we've entered a code already and submitted it with the request 1044 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1045 code_callback = lambda: request.form.get("option-security-code") # noqa: E731 1046 max_attempts = 1 1047 else: 1048 code_callback = lambda: -1 # noqa: E731 1049 # max_attempts = 0 because authing will always fail: we can't wait for 1050 # the code to be entered interactively, we'll need to do a new request 1051 # but we can't just immediately return, we still need to call start() 1052 # to get telegram to send us a code 1053 max_attempts = 0 1054 1055 # now try authenticating 1056 needs_code = False 1057 try: 1058 loop = asyncio.new_event_loop() 1059 asyncio.set_event_loop(loop) 1060 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1061 1062 try: 1063 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1064 1065 except ValueError as e: 1066 # this happens if 2FA is required 1067 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1068 f"does not support this authentication mode for Telegram. ({e})") 1069 except RuntimeError: 1070 # A code was sent to the given phone number 1071 needs_code = True 1072 except FloodWaitError as e: 1073 # uh oh, we got rate-limited 1074 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1075 str(e).split("(")[0] + ".") 1076 except ApiIdInvalidError: 1077 # wrong credentials 1078 raise QueryParametersException("Your API credentials are invalid.") 1079 except PhoneNumberInvalidError: 1080 # wrong phone number 1081 raise QueryParametersException( 1082 "The phone number provided is not a valid phone number for these credentials.") 1083 except RPCError as e: 1084 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1085 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1086 f"Telegram app(s) to the latest version to proceed ({e}).") 1087 except Exception as e: 1088 # ? 1089 raise QueryParametersException( 1090 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1091 finally: 1092 if client: 1093 client.disconnect() 1094 1095 if needs_code: 1096 raise QueryNeedsFurtherInputException(config={ 1097 "code-info": { 1098 "type": UserInput.OPTION_INFO, 1099 "help": "Please enter the security code that was sent to your Telegram app to continue." 1100 }, 1101 "security-code": { 1102 "type": UserInput.OPTION_TEXT, 1103 "help": "Security code", 1104 "sensitive": True 1105 }}) 1106 1107 # simple! 1108 return { 1109 "items": num_items, 1110 "query": ",".join(sanitized_items), 1111 "api_id": query.get("api_id"), 1112 "api_hash": query.get("api_hash"), 1113 "api_phone": query.get("api_phone"), 1114 "save-session": query.get("save-session"), 1115 "resolve-entities": query.get("resolve-entities"), 1116 "min_date": min_date, 1117 "max_date": max_date, 1118 "crawl-depth": query.get("crawl-depth"), 1119 "crawl-threshold": query.get("crawl-threshold"), 1120 "crawl-via-links": query.get("crawl-via-links") 1121 }
Validate Telegram query
Parameters
- config:
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
- ConfigManager config: Configuration reader (context-aware)
Returns
Safe query parameters
1123 @staticmethod 1124 def create_session_id(api_phone, api_id, api_hash): 1125 """ 1126 Generate a filename for the session file 1127 1128 This is a combination of phone number and API credentials, but hashed 1129 so that one cannot actually derive someone's phone number from it. 1130 1131 :param str api_phone: Phone number for API ID 1132 :param int api_id: Telegram API ID 1133 :param str api_hash: Telegram API Hash 1134 :return str: A hash value derived from the input 1135 """ 1136 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1137 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Generate a filename for the session file
This is a combination of phone number and API credentials, but hashed so that one cannot actually derive someone's phone number from it.
Parameters
- str api_phone: Phone number for API ID
- int api_id: Telegram API ID
- str api_hash: Telegram API Hash
Returns
A hash value derived from the input
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor