datasources.telegram.search_telegram
Search Telegram via API
1""" 2Search Telegram via API 3""" 4import traceback 5import datetime 6import hashlib 7import asyncio 8import json 9import ural 10import time 11import re 12 13from pathlib import Path 14 15from backend.lib.search import Search 16from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ProcessorException, \ 17 QueryNeedsFurtherInputException 18from common.lib.helpers import convert_to_int, UserInput 19from common.lib.item_mapping import MappedItem, MissingMappedField 20from common.config_manager import config 21 22from datetime import datetime 23from telethon import TelegramClient 24from telethon.errors.rpcerrorlist import UsernameInvalidError, TimeoutError, ChannelPrivateError, BadRequestError, \ 25 FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError 26from telethon.tl.functions.channels import GetFullChannelRequest 27from telethon.tl.functions.users import GetFullUserRequest 28from telethon.tl.types import User, MessageEntityMention 29 30 31 32class SearchTelegram(Search): 33 """ 34 Search Telegram via API 35 """ 36 type = "telegram-search" # job ID 37 category = "Search" # category 38 title = "Telegram API search" # title displayed in UI 39 description = "Scrapes messages from open Telegram groups via its API." # description displayed in UI 40 extension = "ndjson" # extension of result file, used internally and in UI 41 is_local = False # Whether this datasource is locally scraped 42 is_static = False # Whether this datasource is still updated 43 44 # cache 45 details_cache = None 46 failures_cache = None 47 eventloop = None 48 import_issues = 0 49 end_if_rate_limited = 600 # break if Telegram requires wait time above number of seconds 50 51 max_workers = 1 52 max_retries = 3 53 flawless = 0 54 55 config = { 56 "telegram-search.can_query_all_messages": { 57 "type": UserInput.OPTION_TOGGLE, 58 "help": "Remove message amount limit", 59 "default": False, 60 "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!" 61 }, 62 "telegram-search.max_entities": { 63 "type": UserInput.OPTION_TEXT, 64 "help": "Max entities to query", 65 "coerce_type": int, 66 "min": 0, 67 "default": 25, 68 "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to " 69 "disable limit." 70 }, 71 "telegram-search.max_crawl_depth": { 72 "type": UserInput.OPTION_TEXT, 73 "help": "Max crawl depth", 74 "coerce_type": int, 75 "min": 0, 76 "default": 0, 77 "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded " 78 "messages. Recommended to leave at 0 for most users since this can exponentially increase " 79 "dataset sizes." 80 } 81 } 82 83 @classmethod 84 def get_options(cls, parent_dataset=None, user=None): 85 """ 86 Get processor options 87 88 Just updates the description of the entities field based on the 89 configured max entities. 90 91 :param DataSet parent_dataset: An object representing the dataset that 92 the processor would be run on 93 :param User user: Flask user the options will be displayed for, in 94 case they are requested for display in the 4CAT web interface. This can 95 be used to show some options only to privileges users. 96 """ 97 max_entities = config.get("telegram-search.max_entities", 25, user=user) 98 options = { 99 "intro": { 100 "type": UserInput.OPTION_INFO, 101 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 102 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 103 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 104 "authentication for Telegram." 105 }, 106 "api_id": { 107 "type": UserInput.OPTION_TEXT, 108 "help": "API ID", 109 "cache": True, 110 }, 111 "api_hash": { 112 "type": UserInput.OPTION_TEXT, 113 "help": "API Hash", 114 "cache": True, 115 }, 116 "api_phone": { 117 "type": UserInput.OPTION_TEXT, 118 "help": "Phone number", 119 "cache": True, 120 "default": "+xxxxxxxxxx" 121 }, 122 "divider": { 123 "type": UserInput.OPTION_DIVIDER 124 }, 125 "query-intro": { 126 "type": UserInput.OPTION_INFO, 127 "help": "Separate with commas or line breaks." 128 }, 129 "query": { 130 "type": UserInput.OPTION_TEXT_LARGE, 131 "help": "Entities to scrape", 132 "tooltip": "Separate with commas or line breaks." 133 }, 134 "max_posts": { 135 "type": UserInput.OPTION_TEXT, 136 "help": "Messages per group", 137 "min": 1, 138 "max": 50000, 139 "default": 10 140 }, 141 "daterange": { 142 "type": UserInput.OPTION_DATERANGE, 143 "help": "Date range" 144 }, 145 "divider-2": { 146 "type": UserInput.OPTION_DIVIDER 147 }, 148 "info-sensitive": { 149 "type": UserInput.OPTION_INFO, 150 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 151 "there while data is fetched. After the dataset has been created your credentials will be " 152 "deleted from the server, unless you enable the option below. If you want to download images " 153 "attached to the messages in your collected data, you need to enable this option. Your " 154 "credentials will never be visible to other users and can be erased later via the result page." 155 }, 156 "save-session": { 157 "type": UserInput.OPTION_TOGGLE, 158 "help": "Save session:", 159 "default": False 160 }, 161 "resolve-entities-intro": { 162 "type": UserInput.OPTION_INFO, 163 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 164 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 165 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 166 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 167 "to finish capturing. It will also dramatically increase the disk space needed to store the " 168 "data, so only enable this if you really need it!" 169 }, 170 "resolve-entities": { 171 "type": UserInput.OPTION_TOGGLE, 172 "help": "Resolve references", 173 "default": False, 174 } 175 } 176 177 if max_entities: 178 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 179 f"(channels or groups) at a time. Separate with line breaks or commas.") 180 181 all_messages = config.get("telegram-search.can_query_all_messages", False, user=user) 182 if all_messages: 183 if "max" in options["max_posts"]: 184 del options["max_posts"]["max"] 185 else: 186 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 187 f"{options['max_posts']['max']:,} messages per entity.") 188 189 if config.get("telegram-search.max_crawl_depth", 0, user=user) > 0: 190 options["crawl_intro"] = { 191 "type": UserInput.OPTION_INFO, 192 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 193 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 194 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 195 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 196 "progress cannot be accurately tracked when you use this feature." 197 } 198 options["crawl-depth"] = { 199 "type": UserInput.OPTION_TEXT, 200 "coerce_type": int, 201 "min": 0, 202 "max": config.get("telegram-search.max_crawl_depth"), 203 "default": 0, 204 "help": "Crawl depth", 205 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 206 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 207 "starting entities." 208 } 209 options["crawl-threshold"] = { 210 "type": UserInput.OPTION_TEXT, 211 "coerce_type": int, 212 "min": 0, 213 "default": 5, 214 "help": "Crawl threshold", 215 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 216 "references discovered below the max crawl depth are taken into account." 217 } 218 options["crawl-via-links"] = { 219 "type": UserInput.OPTION_TOGGLE, 220 "default": False, 221 "help": "Extract new groups from links", 222 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 223 "This is more error-prone than crawling only via forwards, but can be a way to discover " 224 "links that would otherwise remain undetected." 225 } 226 227 return options 228 229 230 def get_items(self, query): 231 """ 232 Execute a query; get messages for given parameters 233 234 Basically a wrapper around execute_queries() to call it with asyncio. 235 236 :param dict query: Query parameters, as part of the DataSet object 237 :return list: Posts, sorted by thread and post ID, in ascending order 238 """ 239 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 240 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 241 "creating it again from scratch.", is_final=True) 242 return None 243 244 self.details_cache = {} 245 self.failures_cache = set() 246 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 247 results = asyncio.run(self.execute_queries()) 248 249 if not query.get("save-session"): 250 self.dataset.delete_parameter("api_hash", instant=True) 251 self.dataset.delete_parameter("api_phone", instant=True) 252 self.dataset.delete_parameter("api_id", instant=True) 253 254 if self.flawless: 255 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 256 "been private). View the log file for details.", is_final=True) 257 258 return results 259 260 async def execute_queries(self): 261 """ 262 Get messages for queries 263 264 This is basically what would be done in get_items(), except due to 265 Telethon's architecture this needs to be called in an async method, 266 which is this one. 267 268 :return list: Collected messages 269 """ 270 # session file has been created earlier, and we can re-use it here in 271 # order to avoid having to re-enter the security code 272 query = self.parameters 273 274 session_id = SearchTelegram.create_session_id(query["api_phone"], query["api_id"], query["api_hash"]) 275 self.dataset.log(f'Telegram session id: {session_id}') 276 session_path = Path(config.get("PATH_ROOT")).joinpath(config.get("PATH_SESSIONS"), session_id + ".session") 277 278 client = None 279 280 try: 281 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 282 loop=self.eventloop) 283 await client.start(phone=SearchTelegram.cancel_start) 284 except RuntimeError: 285 # session is no longer useable, delete file so user will be asked 286 # for security code again. The RuntimeError is raised by 287 # `cancel_start()` 288 self.dataset.update_status( 289 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 290 is_final=True) 291 292 if client and hasattr(client, "disconnect"): 293 await client.disconnect() 294 295 if session_path.exists(): 296 session_path.unlink() 297 298 return [] 299 except Exception as e: 300 # not sure what exception specifically is triggered here, but it 301 # always means the connection failed 302 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 303 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 304 if client and hasattr(client, "disconnect"): 305 await client.disconnect() 306 return [] 307 308 # ready our parameters 309 parameters = self.dataset.get_parameters() 310 queries = [query.strip() for query in parameters.get("query", "").split(",")] 311 max_items = convert_to_int(parameters.get("items", 10), 10) 312 313 # Telethon requires the offset date to be a datetime date 314 max_date = parameters.get("max_date") 315 if max_date: 316 try: 317 max_date = datetime.fromtimestamp(int(max_date)) 318 except ValueError: 319 max_date = None 320 321 # min_date can remain an integer 322 min_date = parameters.get("min_date") 323 if min_date: 324 try: 325 min_date = int(min_date) 326 except ValueError: 327 min_date = None 328 329 posts = [] 330 try: 331 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 332 posts.append(post) 333 return posts 334 except ProcessorInterruptedException as e: 335 raise e 336 except Exception as e: 337 # catch-all so we can disconnect properly 338 # ...should we? 339 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 340 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 341 # May as well return what was captured, yes? 342 return posts 343 finally: 344 await client.disconnect() 345 346 async def gather_posts(self, client, queries, max_items, min_date, max_date): 347 """ 348 Gather messages for each entity for which messages are requested 349 350 :param TelegramClient client: Telegram Client 351 :param list queries: List of entities to query (as string) 352 :param int max_items: Messages to scrape per entity 353 :param int min_date: Datetime date to get posts after 354 :param int max_date: Datetime date to get posts before 355 :return list: List of messages, each message a dictionary. 356 """ 357 resolve_refs = self.parameters.get("resolve-entities") 358 359 # Adding flag to stop; using for rate limits 360 no_additional_queries = False 361 362 # This is used for the 'crawl' feature so we know at which depth a 363 # given entity was discovered 364 depth_map = { 365 entity: 0 for entity in queries 366 } 367 368 crawl_max_depth = self.parameters.get("crawl-depth", 0) 369 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 370 crawl_via_links = self.parameters.get("crawl-via-links", False) 371 372 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 373 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 374 375 # this keeps track of how often an entity not in the original query 376 # has been mentioned. When crawling is enabled and this exceeds the 377 # given threshold, the entity is added to the query 378 crawl_references = {} 379 full_query = set(queries) 380 num_queries = len(queries) 381 382 # we may not always know the 'entity username' for an entity ID, so 383 # keep a reference map as we go 384 entity_id_map = {} 385 query_id_map= {} 386 387 # Collect queries 388 # Use while instead of for so we can change queries during iteration 389 # this is needed for the 'crawl' feature which can discover new 390 # entities during crawl 391 processed = 0 392 total_messages = 0 393 while queries: 394 query = queries.pop(0) 395 396 delay = 10 397 retries = 0 398 processed += 1 399 self.dataset.update_progress(processed / num_queries) 400 401 if no_additional_queries: 402 # Note that we are not completing this query 403 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 404 continue 405 406 while True: 407 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 408 entity_posts = 0 409 discovered = 0 410 try: 411 async for message in client.iter_messages(entity=query, offset_date=max_date): 412 entity_posts += 1 413 total_messages += 1 414 if self.interrupted: 415 raise ProcessorInterruptedException( 416 "Interrupted while fetching message data from the Telegram API") 417 418 if entity_posts % 100 == 0: 419 self.dataset.update_status( 420 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 421 422 if message.action is not None: 423 # e.g. someone joins the channel - not an actual message 424 continue 425 426 # todo: possibly enrich object with e.g. the name of 427 # the channel a message was forwarded from (but that 428 # needs extra API requests...) 429 serialized_message = SearchTelegram.serialize_obj(message) 430 if "_chat" in serialized_message: 431 # Add query ID to check if queries have been crawled previously 432 full_query.add(serialized_message["_chat"]["id"]) 433 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 434 # once we know what a channel ID resolves to, use the username instead so it is easier to 435 # understand for the user 436 entity_id_map[query] = serialized_message["_chat"]["username"] 437 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 438 439 if resolve_refs: 440 serialized_message = await self.resolve_groups(client, serialized_message) 441 442 # Stop if we're below the min date 443 if min_date and serialized_message.get("date") < min_date: 444 break 445 446 # if crawling is enabled, see if we found something to add to the query 447 linked_entities = set() 448 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 449 message_fwd = serialized_message.get("fwd_from") 450 fwd_from = None 451 fwd_source_type = None 452 if message_fwd and message_fwd.get("from_id"): 453 if message_fwd["from_id"].get("_type") == "PeerChannel": 454 # Legacy(?) data structure (pre 2024/7/22) 455 # even if we haven't resolved the ID, we can feed the numeric ID 456 # to Telethon! this is nice because it means we don't have to 457 # resolve entities to crawl iteratively 458 fwd_from = int(message_fwd["from_id"]["channel_id"]) 459 fwd_source_type = "channel" 460 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 461 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 462 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 463 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 464 fwd_source_type = "channel" 465 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 466 # forwards can also come from users 467 # these can never be followed, so don't add these to the crawl, but do document them 468 fwd_source_type = "user" 469 else: 470 print(json.dumps(message_fwd)) 471 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 472 fwd_source_type = "unknown" 473 474 if fwd_from: 475 linked_entities.add(fwd_from) 476 477 478 if crawl_via_links: 479 # t.me links 480 all_links = ural.urls_from_text(serialized_message["message"]) 481 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 482 for link in all_links: 483 if link.startswith("+"): 484 # invite links 485 continue 486 487 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 488 linked_entities.add(entity_name) 489 490 # @references 491 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 492 for reference in references: 493 if reference.startswith("@"): 494 reference = reference[1:] 495 496 reference = reference.split("/")[0] 497 498 linked_entities.add(reference) 499 500 # Check if fwd_from or the resolved entity ID is already queued or has been queried 501 for link in linked_entities: 502 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 503 # new entity discovered! 504 # might be discovered (before collection) multiple times, so retain lowest depth 505 # print(f"Potentially crawling {link}") 506 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 507 if link not in crawl_references: 508 crawl_references[link] = 0 509 crawl_references[link] += 1 510 511 # Add to queries if it has been referenced enough times 512 if crawl_references[link] >= crawl_msg_threshold: 513 queries.append(link) 514 full_query.add(link) 515 num_queries += 1 516 discovered += 1 517 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 518 519 520 521 serialized_message["4CAT_metadata"] = { 522 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 523 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 524 "query_depth": depth_map.get(query, 0) 525 } 526 yield serialized_message 527 528 if entity_posts >= max_items: 529 break 530 531 except ChannelPrivateError: 532 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 533 self.flawless += 1 534 535 except (UsernameInvalidError,): 536 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 537 self.flawless += 1 538 539 except FloodWaitError as e: 540 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 541 if e.seconds < self.end_if_rate_limited: 542 time.sleep(e.seconds) 543 continue 544 else: 545 self.flawless += 1 546 no_additional_queries = True 547 self.dataset.update_status( 548 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 549 break 550 551 except BadRequestError as e: 552 self.dataset.update_status( 553 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 554 self.flawless += 1 555 556 except ValueError as e: 557 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 558 self.flawless += 1 559 560 except ChannelPrivateError as e: 561 self.dataset.update_status( 562 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 563 break 564 565 except TimeoutError: 566 if retries < 3: 567 self.dataset.update_status( 568 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 569 self.flawless += 1 570 break 571 572 self.dataset.update_status( 573 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 574 time.sleep(delay) 575 delay *= 2 576 continue 577 578 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 579 break 580 581 async def resolve_groups(self, client, message): 582 """ 583 Recursively resolve references to groups and users 584 585 :param client: Telethon client instance 586 :param dict message: Message, as already mapped by serialize_obj 587 :return: Resolved dictionary 588 """ 589 resolved_message = message.copy() 590 for key, value in message.items(): 591 try: 592 if type(value) is not dict: 593 # if it's not a dict, we never have to resolve it, as it 594 # does not represent an entity 595 continue 596 597 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 598 # forwarded from a channel! 599 if value["channel_id"] in self.failures_cache: 600 continue 601 602 if value["channel_id"] not in self.details_cache: 603 channel = await client(GetFullChannelRequest(value["channel_id"])) 604 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 605 606 resolved_message[key] = self.details_cache[value["channel_id"]] 607 resolved_message[key]["channel_id"] = value["channel_id"] 608 609 elif "_type" in value and value["_type"] == "PeerUser": 610 # a user! 611 if value["user_id"] in self.failures_cache: 612 continue 613 614 if value["user_id"] not in self.details_cache: 615 user = await client(GetFullUserRequest(value["user_id"])) 616 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 617 618 resolved_message[key] = self.details_cache[value["user_id"]] 619 resolved_message[key]["user_id"] = value["user_id"] 620 else: 621 resolved_message[key] = await self.resolve_groups(client, value) 622 623 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 624 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 625 if type(e) in (ChannelPrivateError, UsernameInvalidError): 626 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 627 else: 628 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 629 630 return resolved_message 631 632 @staticmethod 633 def cancel_start(): 634 """ 635 Replace interactive phone number input in Telethon 636 637 By default, if Telethon cannot use the given session file to 638 authenticate, it will interactively prompt the user for a phone 639 number on the command line. That is not useful here, so instead 640 raise a RuntimeError. This will be caught below and the user will 641 be told they need to re-authenticate via 4CAT. 642 """ 643 raise RuntimeError("Connection cancelled") 644 645 @staticmethod 646 def map_item(message): 647 """ 648 Convert Message object to 4CAT-ready data object 649 650 :param Message message: Message to parse 651 :return dict: 4CAT-compatible item object 652 """ 653 if message["_chat"]["username"]: 654 # chats can apparently not have usernames??? 655 # truly telegram objects are way too lenient for their own good 656 thread = message["_chat"]["username"] 657 elif message["_chat"]["title"]: 658 thread = re.sub(r"\s", "", message["_chat"]["title"]) 659 else: 660 # just give up 661 thread = "unknown" 662 663 # determine username 664 # API responses only include the user *ID*, not the username, and to 665 # complicate things further not everyone is a user and not everyone 666 # has a username. If no username is available, try the first and 667 # last name someone has supplied 668 fullname = "" 669 username = "" 670 user_id = message["_sender"]["id"] if message.get("_sender") else "" 671 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 672 673 if message.get("_sender") and message["_sender"].get("username"): 674 username = message["_sender"]["username"] 675 676 if message.get("_sender") and message["_sender"].get("first_name"): 677 fullname += message["_sender"]["first_name"] 678 679 if message.get("_sender") and message["_sender"].get("last_name"): 680 fullname += " " + message["_sender"]["last_name"] 681 682 fullname = fullname.strip() 683 684 # determine media type 685 # these store some extra information of the attachment in 686 # attachment_data. Since the final result will be serialised as a csv 687 # file, we can only store text content. As such some media data is 688 # serialised as JSON. 689 attachment_type = SearchTelegram.get_media_type(message["media"]) 690 attachment_filename = "" 691 692 if attachment_type == "contact": 693 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 694 if message["media"].get('contact', False): 695 # Old datastructure 696 attachment = message["media"]["contact"] 697 elif all([property in message["media"].keys() for property in contact_data]): 698 # New datastructure 2022/7/25 699 attachment = message["media"] 700 else: 701 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 702 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 703 704 elif attachment_type == "document": 705 # videos, etc 706 # This could add a separate routine for videos to make them a 707 # separate type, which could then be scraped later, etc 708 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 709 if attachment_type == "video": 710 attachment = message["media"]["document"] 711 attachment_data = json.dumps({ 712 "id": attachment["id"], 713 "dc_id": attachment["dc_id"], 714 "file_reference": attachment["file_reference"], 715 }) 716 else: 717 attachment_data = "" 718 719 # elif attachment_type in ("geo", "geo_live"): 720 # untested whether geo_live is significantly different from geo 721 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 722 723 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 724 # we don't actually store any metadata about the photo, since very 725 # little of the metadata attached is of interest. Instead, the 726 # actual photos may be downloaded via a processor that is run on the 727 # search results 728 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 729 attachment_data = json.dumps({ 730 "id": attachment["id"], 731 "dc_id": attachment["dc_id"], 732 "file_reference": attachment["file_reference"], 733 }) 734 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 735 736 elif attachment_type == "poll": 737 # unfortunately poll results are only available when someone has 738 # actually voted on the poll - that will usually not be the case, 739 # so we store -1 as the vote count 740 attachment = message["media"] 741 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 742 attachment_data = json.dumps({ 743 "question": attachment["poll"]["question"], 744 "voters": attachment["results"]["total_voters"], 745 "answers": [{ 746 "answer": options[answer["option"]], 747 "votes": answer["voters"] 748 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 749 "answer": options[option], 750 "votes": -1 751 } for option in options] 752 }) 753 754 else: 755 attachment_data = "" 756 757 # was the message forwarded from somewhere and if so when? 758 forwarded_timestamp = "" 759 forwarded_name = "" 760 forwarded_id = "" 761 forwarded_username = "" 762 if message.get("fwd_from") and "from_id" in message["fwd_from"] and not ( 763 type(message["fwd_from"]["from_id"]) is int): 764 # forward information is spread out over a lot of places 765 # we can identify, in order of usefulness: username, full name, 766 # and ID. But not all of these are always available, and not 767 # always in the same place either 768 forwarded_timestamp = int(message["fwd_from"]["date"]) 769 from_data = message["fwd_from"]["from_id"] 770 771 if from_data: 772 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 773 774 if message["fwd_from"].get("from_name"): 775 forwarded_name = message["fwd_from"].get("from_name") 776 777 if from_data and from_data.get("from_name"): 778 forwarded_name = message["fwd_from"]["from_name"] 779 780 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 781 from_data["user"] = from_data["users"][0] 782 783 if from_data and ("user" in from_data or "chats" in from_data): 784 # 'resolve entities' was enabled for this dataset 785 if "user" in from_data: 786 if from_data["user"].get("username"): 787 forwarded_username = from_data["user"]["username"] 788 789 if from_data["user"].get("first_name"): 790 forwarded_name = from_data["user"]["first_name"] 791 if message["fwd_from"].get("last_name"): 792 forwarded_name += " " + from_data["user"]["last_name"] 793 794 forwarded_name = forwarded_name.strip() 795 796 elif "chats" in from_data: 797 channel_id = from_data.get("channel_id") 798 for chat in from_data["chats"]: 799 if chat["id"] == channel_id or channel_id is None: 800 forwarded_username = chat["username"] 801 802 elif message.get("_forward") and message["_forward"].get("_chat"): 803 if message["_forward"]["_chat"].get("username"): 804 forwarded_username = message["_forward"]["_chat"]["username"] 805 806 if message["_forward"]["_chat"].get("title"): 807 forwarded_name = message["_forward"]["_chat"]["title"] 808 809 link_title = "" 810 link_attached = "" 811 link_description = "" 812 reactions = "" 813 814 if message.get("media") and message["media"].get("webpage"): 815 link_title = message["media"]["webpage"].get("title") 816 link_attached = message["media"]["webpage"].get("url") 817 link_description = message["media"]["webpage"].get("description") 818 819 if message.get("reactions") and message["reactions"].get("results"): 820 for reaction in message["reactions"]["results"]: 821 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 822 # Updated to support new reaction datastructure 823 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 824 elif type(reaction["reaction"]) is str and "count" in reaction: 825 reactions += reaction["reaction"] * reaction["count"] 826 else: 827 # Failsafe; can be updated to support formatting of new datastructures in the future 828 reactions += f"{reaction}, " 829 830 # t.me links 831 linked_entities = set() 832 all_links = ural.urls_from_text(message["message"]) 833 all_links = [link.split("t.me/")[1] for link in all_links if 834 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 835 836 for link in all_links: 837 if link.startswith("+"): 838 # invite links 839 continue 840 841 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 842 linked_entities.add(entity_name) 843 844 # @references 845 # in execute_queries we use MessageEntityMention to get these 846 # however, after serializing these objects we only have the offsets of 847 # the mentioned username, and telegram does weird unicode things to its 848 # offsets meaning we can't just substring the message. So use a regex 849 # as a 'good enough' solution 850 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 851 852 # make this case-insensitive since people may use different casing in 853 # messages than the 'official' username for example 854 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 855 all_ci_connections = set() 856 seen = set() 857 for connection in all_connections: 858 if connection.lower() not in seen: 859 all_ci_connections.add(connection) 860 seen.add(connection.lower()) 861 862 return MappedItem({ 863 "id": f"{message['_chat']['username']}-{message['id']}", 864 "thread_id": thread, 865 "chat": message["_chat"]["username"], 866 "author": user_id, 867 "author_username": username, 868 "author_name": fullname, 869 "author_is_bot": "yes" if user_is_bot else "no", 870 "body": message["message"], 871 "reply_to": message.get("reply_to_msg_id", ""), 872 "views": message["views"] if message["views"] else "", 873 # "forwards": message.get("forwards", MissingMappedField(0)), 874 "reactions": reactions, 875 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 876 "unix_timestamp": int(message["date"]), 877 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 878 "edit_date"] else "", 879 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 880 "author_forwarded_from_name": forwarded_name, 881 "author_forwarded_from_username": forwarded_username, 882 "author_forwarded_from_id": forwarded_id, 883 "entities_linked": ",".join(linked_entities), 884 "entities_mentioned": ",".join(all_mentions), 885 "all_connections": ",".join(all_ci_connections), 886 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 887 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 888 "unix_timestamp_forwarded_from": forwarded_timestamp, 889 "link_title": link_title, 890 "link_description": link_description, 891 "link_attached": link_attached, 892 "attachment_type": attachment_type, 893 "attachment_data": attachment_data, 894 "attachment_filename": attachment_filename 895 }) 896 897 @staticmethod 898 def get_media_type(media): 899 """ 900 Get media type for a Telegram attachment 901 902 :param media: Media object 903 :return str: Textual identifier of the media type 904 """ 905 try: 906 return { 907 "NoneType": "", 908 "MessageMediaContact": "contact", 909 "MessageMediaDocument": "document", 910 "MessageMediaEmpty": "", 911 "MessageMediaGame": "game", 912 "MessageMediaGeo": "geo", 913 "MessageMediaGeoLive": "geo_live", 914 "MessageMediaInvoice": "invoice", 915 "MessageMediaPhoto": "photo", 916 "MessageMediaPoll": "poll", 917 "MessageMediaUnsupported": "unsupported", 918 "MessageMediaVenue": "venue", 919 "MessageMediaWebPage": "url" 920 }[media.get("_type", None)] 921 except (AttributeError, KeyError): 922 return "" 923 924 @staticmethod 925 def serialize_obj(input_obj): 926 """ 927 Serialize an object as a dictionary 928 929 Telethon message objects are not serializable by themselves, but most 930 relevant attributes are simply struct classes. This function replaces 931 those that are not with placeholders and then returns a dictionary that 932 can be serialized as JSON. 933 934 :param obj: Object to serialize 935 :return: Serialized object 936 """ 937 scalars = (int, str, float, list, tuple, set, bool) 938 939 if type(input_obj) in scalars or input_obj is None: 940 return input_obj 941 942 if type(input_obj) is not dict: 943 obj = input_obj.__dict__ 944 else: 945 obj = input_obj.copy() 946 947 mapped_obj = {} 948 for item, value in obj.items(): 949 if type(value) is datetime: 950 mapped_obj[item] = value.timestamp() 951 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 952 mapped_obj[item] = SearchTelegram.serialize_obj(value) 953 elif type(value) is list: 954 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 955 elif type(value) is bytes: 956 mapped_obj[item] = value.hex() 957 elif type(value) not in scalars and value is not None: 958 # type we can't make sense of here 959 continue 960 else: 961 mapped_obj[item] = value 962 963 # Add the _type if the original object was a telethon type 964 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 965 mapped_obj["_type"] = type(input_obj).__name__ 966 return mapped_obj 967 968 @staticmethod 969 def validate_query(query, request, user): 970 """ 971 Validate Telegram query 972 973 :param dict query: Query parameters, from client-side. 974 :param request: Flask request 975 :param User user: User object of user who has submitted the query 976 :return dict: Safe query parameters 977 """ 978 # no query 4 u 979 if not query.get("query", "").strip(): 980 raise QueryParametersException("You must provide a search query.") 981 982 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 983 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 984 985 all_posts = config.get("telegram-search.can_query_all_messages", False, user=user) 986 max_entities = config.get("telegram-search.max_entities", 25, user=user) 987 988 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options()["max_posts"]["max"]) 989 990 # reformat queries to be a comma-separated list with no wrapping 991 # whitespace 992 whitespace = re.compile(r"\s+") 993 items = whitespace.sub("", query.get("query").replace("\n", ",")) 994 if max_entities > 0 and len(items.split(",")) > max_entities: 995 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 996 997 sanitized_items = [] 998 # handle telegram URLs 999 for item in items.split(","): 1000 if not item.strip(): 1001 continue 1002 item = re.sub(r"^https?://t\.me/", "", item) 1003 item = re.sub(r"^/?s/", "", item) 1004 item = re.sub(r"[/]*$", "", item) 1005 sanitized_items.append(item) 1006 1007 # the dates need to make sense as a range to search within 1008 min_date, max_date = query.get("daterange") 1009 1010 # now check if there is an active API session 1011 if not user or not user.is_authenticated or user.is_anonymous: 1012 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1013 "accounts.") 1014 1015 # check for the information we need 1016 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1017 query.get("api_hash")) 1018 user.set_value("telegram.session", session_id) 1019 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session") 1020 1021 client = None 1022 1023 # API ID is always a number, if it's not, we can immediately fail 1024 try: 1025 api_id = int(query.get("api_id")) 1026 except ValueError: 1027 raise QueryParametersException("Invalid API ID.") 1028 1029 # maybe we've entered a code already and submitted it with the request 1030 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1031 code_callback = lambda: request.form.get("option-security-code") 1032 max_attempts = 1 1033 else: 1034 code_callback = lambda: -1 1035 # max_attempts = 0 because authing will always fail: we can't wait for 1036 # the code to be entered interactively, we'll need to do a new request 1037 # but we can't just immediately return, we still need to call start() 1038 # to get telegram to send us a code 1039 max_attempts = 0 1040 1041 # now try authenticating 1042 needs_code = False 1043 try: 1044 loop = asyncio.new_event_loop() 1045 asyncio.set_event_loop(loop) 1046 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1047 1048 try: 1049 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1050 1051 except ValueError as e: 1052 # this happens if 2FA is required 1053 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1054 f"does not support this authentication mode for Telegram. ({e})") 1055 except RuntimeError as e: 1056 # A code was sent to the given phone number 1057 needs_code = True 1058 except FloodWaitError as e: 1059 # uh oh, we got rate-limited 1060 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1061 str(e).split("(")[0] + ".") 1062 except ApiIdInvalidError as e: 1063 # wrong credentials 1064 raise QueryParametersException("Your API credentials are invalid.") 1065 except PhoneNumberInvalidError as e: 1066 # wrong phone number 1067 raise QueryParametersException( 1068 "The phone number provided is not a valid phone number for these credentials.") 1069 except RPCError as e: 1070 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1071 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1072 f"Telegram app(s) to the latest version to proceed ({e}).") 1073 except Exception as e: 1074 # ? 1075 raise QueryParametersException( 1076 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1077 finally: 1078 if client: 1079 client.disconnect() 1080 1081 if needs_code: 1082 raise QueryNeedsFurtherInputException(config={ 1083 "code-info": { 1084 "type": UserInput.OPTION_INFO, 1085 "help": "Please enter the security code that was sent to your Telegram app to continue." 1086 }, 1087 "security-code": { 1088 "type": UserInput.OPTION_TEXT, 1089 "help": "Security code", 1090 "sensitive": True 1091 }}) 1092 1093 # simple! 1094 return { 1095 "items": num_items, 1096 "query": ",".join(sanitized_items), 1097 "api_id": query.get("api_id"), 1098 "api_hash": query.get("api_hash"), 1099 "api_phone": query.get("api_phone"), 1100 "save-session": query.get("save-session"), 1101 "resolve-entities": query.get("resolve-entities"), 1102 "min_date": min_date, 1103 "max_date": max_date, 1104 "crawl-depth": query.get("crawl-depth"), 1105 "crawl-threshold": query.get("crawl-threshold"), 1106 "crawl-via-links": query.get("crawl-via-links") 1107 } 1108 1109 @staticmethod 1110 def create_session_id(api_phone, api_id, api_hash): 1111 """ 1112 Generate a filename for the session file 1113 1114 This is a combination of phone number and API credentials, but hashed 1115 so that one cannot actually derive someone's phone number from it. 1116 1117 :param str api_phone: Phone number for API ID 1118 :param int api_id: Telegram API ID 1119 :param str api_hash: Telegram API Hash 1120 :return str: A hash value derived from the input 1121 """ 1122 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1123 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
33class SearchTelegram(Search): 34 """ 35 Search Telegram via API 36 """ 37 type = "telegram-search" # job ID 38 category = "Search" # category 39 title = "Telegram API search" # title displayed in UI 40 description = "Scrapes messages from open Telegram groups via its API." # description displayed in UI 41 extension = "ndjson" # extension of result file, used internally and in UI 42 is_local = False # Whether this datasource is locally scraped 43 is_static = False # Whether this datasource is still updated 44 45 # cache 46 details_cache = None 47 failures_cache = None 48 eventloop = None 49 import_issues = 0 50 end_if_rate_limited = 600 # break if Telegram requires wait time above number of seconds 51 52 max_workers = 1 53 max_retries = 3 54 flawless = 0 55 56 config = { 57 "telegram-search.can_query_all_messages": { 58 "type": UserInput.OPTION_TOGGLE, 59 "help": "Remove message amount limit", 60 "default": False, 61 "tooltip": "Allows users to query unlimited messages from Telegram. This can lead to HUGE datasets!" 62 }, 63 "telegram-search.max_entities": { 64 "type": UserInput.OPTION_TEXT, 65 "help": "Max entities to query", 66 "coerce_type": int, 67 "min": 0, 68 "default": 25, 69 "tooltip": "Amount of entities that can be queried at a time. Entities are groups or channels. 0 to " 70 "disable limit." 71 }, 72 "telegram-search.max_crawl_depth": { 73 "type": UserInput.OPTION_TEXT, 74 "help": "Max crawl depth", 75 "coerce_type": int, 76 "min": 0, 77 "default": 0, 78 "tooltip": "If higher than 0, 4CAT can automatically add new entities to the query based on forwarded " 79 "messages. Recommended to leave at 0 for most users since this can exponentially increase " 80 "dataset sizes." 81 } 82 } 83 84 @classmethod 85 def get_options(cls, parent_dataset=None, user=None): 86 """ 87 Get processor options 88 89 Just updates the description of the entities field based on the 90 configured max entities. 91 92 :param DataSet parent_dataset: An object representing the dataset that 93 the processor would be run on 94 :param User user: Flask user the options will be displayed for, in 95 case they are requested for display in the 4CAT web interface. This can 96 be used to show some options only to privileges users. 97 """ 98 max_entities = config.get("telegram-search.max_entities", 25, user=user) 99 options = { 100 "intro": { 101 "type": UserInput.OPTION_INFO, 102 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 103 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 104 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 105 "authentication for Telegram." 106 }, 107 "api_id": { 108 "type": UserInput.OPTION_TEXT, 109 "help": "API ID", 110 "cache": True, 111 }, 112 "api_hash": { 113 "type": UserInput.OPTION_TEXT, 114 "help": "API Hash", 115 "cache": True, 116 }, 117 "api_phone": { 118 "type": UserInput.OPTION_TEXT, 119 "help": "Phone number", 120 "cache": True, 121 "default": "+xxxxxxxxxx" 122 }, 123 "divider": { 124 "type": UserInput.OPTION_DIVIDER 125 }, 126 "query-intro": { 127 "type": UserInput.OPTION_INFO, 128 "help": "Separate with commas or line breaks." 129 }, 130 "query": { 131 "type": UserInput.OPTION_TEXT_LARGE, 132 "help": "Entities to scrape", 133 "tooltip": "Separate with commas or line breaks." 134 }, 135 "max_posts": { 136 "type": UserInput.OPTION_TEXT, 137 "help": "Messages per group", 138 "min": 1, 139 "max": 50000, 140 "default": 10 141 }, 142 "daterange": { 143 "type": UserInput.OPTION_DATERANGE, 144 "help": "Date range" 145 }, 146 "divider-2": { 147 "type": UserInput.OPTION_DIVIDER 148 }, 149 "info-sensitive": { 150 "type": UserInput.OPTION_INFO, 151 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 152 "there while data is fetched. After the dataset has been created your credentials will be " 153 "deleted from the server, unless you enable the option below. If you want to download images " 154 "attached to the messages in your collected data, you need to enable this option. Your " 155 "credentials will never be visible to other users and can be erased later via the result page." 156 }, 157 "save-session": { 158 "type": UserInput.OPTION_TOGGLE, 159 "help": "Save session:", 160 "default": False 161 }, 162 "resolve-entities-intro": { 163 "type": UserInput.OPTION_INFO, 164 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 165 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 166 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 167 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 168 "to finish capturing. It will also dramatically increase the disk space needed to store the " 169 "data, so only enable this if you really need it!" 170 }, 171 "resolve-entities": { 172 "type": UserInput.OPTION_TOGGLE, 173 "help": "Resolve references", 174 "default": False, 175 } 176 } 177 178 if max_entities: 179 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 180 f"(channels or groups) at a time. Separate with line breaks or commas.") 181 182 all_messages = config.get("telegram-search.can_query_all_messages", False, user=user) 183 if all_messages: 184 if "max" in options["max_posts"]: 185 del options["max_posts"]["max"] 186 else: 187 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 188 f"{options['max_posts']['max']:,} messages per entity.") 189 190 if config.get("telegram-search.max_crawl_depth", 0, user=user) > 0: 191 options["crawl_intro"] = { 192 "type": UserInput.OPTION_INFO, 193 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 194 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 195 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 196 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 197 "progress cannot be accurately tracked when you use this feature." 198 } 199 options["crawl-depth"] = { 200 "type": UserInput.OPTION_TEXT, 201 "coerce_type": int, 202 "min": 0, 203 "max": config.get("telegram-search.max_crawl_depth"), 204 "default": 0, 205 "help": "Crawl depth", 206 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 207 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 208 "starting entities." 209 } 210 options["crawl-threshold"] = { 211 "type": UserInput.OPTION_TEXT, 212 "coerce_type": int, 213 "min": 0, 214 "default": 5, 215 "help": "Crawl threshold", 216 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 217 "references discovered below the max crawl depth are taken into account." 218 } 219 options["crawl-via-links"] = { 220 "type": UserInput.OPTION_TOGGLE, 221 "default": False, 222 "help": "Extract new groups from links", 223 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 224 "This is more error-prone than crawling only via forwards, but can be a way to discover " 225 "links that would otherwise remain undetected." 226 } 227 228 return options 229 230 231 def get_items(self, query): 232 """ 233 Execute a query; get messages for given parameters 234 235 Basically a wrapper around execute_queries() to call it with asyncio. 236 237 :param dict query: Query parameters, as part of the DataSet object 238 :return list: Posts, sorted by thread and post ID, in ascending order 239 """ 240 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 241 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 242 "creating it again from scratch.", is_final=True) 243 return None 244 245 self.details_cache = {} 246 self.failures_cache = set() 247 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 248 results = asyncio.run(self.execute_queries()) 249 250 if not query.get("save-session"): 251 self.dataset.delete_parameter("api_hash", instant=True) 252 self.dataset.delete_parameter("api_phone", instant=True) 253 self.dataset.delete_parameter("api_id", instant=True) 254 255 if self.flawless: 256 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 257 "been private). View the log file for details.", is_final=True) 258 259 return results 260 261 async def execute_queries(self): 262 """ 263 Get messages for queries 264 265 This is basically what would be done in get_items(), except due to 266 Telethon's architecture this needs to be called in an async method, 267 which is this one. 268 269 :return list: Collected messages 270 """ 271 # session file has been created earlier, and we can re-use it here in 272 # order to avoid having to re-enter the security code 273 query = self.parameters 274 275 session_id = SearchTelegram.create_session_id(query["api_phone"], query["api_id"], query["api_hash"]) 276 self.dataset.log(f'Telegram session id: {session_id}') 277 session_path = Path(config.get("PATH_ROOT")).joinpath(config.get("PATH_SESSIONS"), session_id + ".session") 278 279 client = None 280 281 try: 282 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 283 loop=self.eventloop) 284 await client.start(phone=SearchTelegram.cancel_start) 285 except RuntimeError: 286 # session is no longer useable, delete file so user will be asked 287 # for security code again. The RuntimeError is raised by 288 # `cancel_start()` 289 self.dataset.update_status( 290 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 291 is_final=True) 292 293 if client and hasattr(client, "disconnect"): 294 await client.disconnect() 295 296 if session_path.exists(): 297 session_path.unlink() 298 299 return [] 300 except Exception as e: 301 # not sure what exception specifically is triggered here, but it 302 # always means the connection failed 303 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 304 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 305 if client and hasattr(client, "disconnect"): 306 await client.disconnect() 307 return [] 308 309 # ready our parameters 310 parameters = self.dataset.get_parameters() 311 queries = [query.strip() for query in parameters.get("query", "").split(",")] 312 max_items = convert_to_int(parameters.get("items", 10), 10) 313 314 # Telethon requires the offset date to be a datetime date 315 max_date = parameters.get("max_date") 316 if max_date: 317 try: 318 max_date = datetime.fromtimestamp(int(max_date)) 319 except ValueError: 320 max_date = None 321 322 # min_date can remain an integer 323 min_date = parameters.get("min_date") 324 if min_date: 325 try: 326 min_date = int(min_date) 327 except ValueError: 328 min_date = None 329 330 posts = [] 331 try: 332 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 333 posts.append(post) 334 return posts 335 except ProcessorInterruptedException as e: 336 raise e 337 except Exception as e: 338 # catch-all so we can disconnect properly 339 # ...should we? 340 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 341 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 342 # May as well return what was captured, yes? 343 return posts 344 finally: 345 await client.disconnect() 346 347 async def gather_posts(self, client, queries, max_items, min_date, max_date): 348 """ 349 Gather messages for each entity for which messages are requested 350 351 :param TelegramClient client: Telegram Client 352 :param list queries: List of entities to query (as string) 353 :param int max_items: Messages to scrape per entity 354 :param int min_date: Datetime date to get posts after 355 :param int max_date: Datetime date to get posts before 356 :return list: List of messages, each message a dictionary. 357 """ 358 resolve_refs = self.parameters.get("resolve-entities") 359 360 # Adding flag to stop; using for rate limits 361 no_additional_queries = False 362 363 # This is used for the 'crawl' feature so we know at which depth a 364 # given entity was discovered 365 depth_map = { 366 entity: 0 for entity in queries 367 } 368 369 crawl_max_depth = self.parameters.get("crawl-depth", 0) 370 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 371 crawl_via_links = self.parameters.get("crawl-via-links", False) 372 373 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 374 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 375 376 # this keeps track of how often an entity not in the original query 377 # has been mentioned. When crawling is enabled and this exceeds the 378 # given threshold, the entity is added to the query 379 crawl_references = {} 380 full_query = set(queries) 381 num_queries = len(queries) 382 383 # we may not always know the 'entity username' for an entity ID, so 384 # keep a reference map as we go 385 entity_id_map = {} 386 query_id_map= {} 387 388 # Collect queries 389 # Use while instead of for so we can change queries during iteration 390 # this is needed for the 'crawl' feature which can discover new 391 # entities during crawl 392 processed = 0 393 total_messages = 0 394 while queries: 395 query = queries.pop(0) 396 397 delay = 10 398 retries = 0 399 processed += 1 400 self.dataset.update_progress(processed / num_queries) 401 402 if no_additional_queries: 403 # Note that we are not completing this query 404 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 405 continue 406 407 while True: 408 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 409 entity_posts = 0 410 discovered = 0 411 try: 412 async for message in client.iter_messages(entity=query, offset_date=max_date): 413 entity_posts += 1 414 total_messages += 1 415 if self.interrupted: 416 raise ProcessorInterruptedException( 417 "Interrupted while fetching message data from the Telegram API") 418 419 if entity_posts % 100 == 0: 420 self.dataset.update_status( 421 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 422 423 if message.action is not None: 424 # e.g. someone joins the channel - not an actual message 425 continue 426 427 # todo: possibly enrich object with e.g. the name of 428 # the channel a message was forwarded from (but that 429 # needs extra API requests...) 430 serialized_message = SearchTelegram.serialize_obj(message) 431 if "_chat" in serialized_message: 432 # Add query ID to check if queries have been crawled previously 433 full_query.add(serialized_message["_chat"]["id"]) 434 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 435 # once we know what a channel ID resolves to, use the username instead so it is easier to 436 # understand for the user 437 entity_id_map[query] = serialized_message["_chat"]["username"] 438 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 439 440 if resolve_refs: 441 serialized_message = await self.resolve_groups(client, serialized_message) 442 443 # Stop if we're below the min date 444 if min_date and serialized_message.get("date") < min_date: 445 break 446 447 # if crawling is enabled, see if we found something to add to the query 448 linked_entities = set() 449 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 450 message_fwd = serialized_message.get("fwd_from") 451 fwd_from = None 452 fwd_source_type = None 453 if message_fwd and message_fwd.get("from_id"): 454 if message_fwd["from_id"].get("_type") == "PeerChannel": 455 # Legacy(?) data structure (pre 2024/7/22) 456 # even if we haven't resolved the ID, we can feed the numeric ID 457 # to Telethon! this is nice because it means we don't have to 458 # resolve entities to crawl iteratively 459 fwd_from = int(message_fwd["from_id"]["channel_id"]) 460 fwd_source_type = "channel" 461 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 462 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 463 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 464 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 465 fwd_source_type = "channel" 466 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 467 # forwards can also come from users 468 # these can never be followed, so don't add these to the crawl, but do document them 469 fwd_source_type = "user" 470 else: 471 print(json.dumps(message_fwd)) 472 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 473 fwd_source_type = "unknown" 474 475 if fwd_from: 476 linked_entities.add(fwd_from) 477 478 479 if crawl_via_links: 480 # t.me links 481 all_links = ural.urls_from_text(serialized_message["message"]) 482 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 483 for link in all_links: 484 if link.startswith("+"): 485 # invite links 486 continue 487 488 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 489 linked_entities.add(entity_name) 490 491 # @references 492 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 493 for reference in references: 494 if reference.startswith("@"): 495 reference = reference[1:] 496 497 reference = reference.split("/")[0] 498 499 linked_entities.add(reference) 500 501 # Check if fwd_from or the resolved entity ID is already queued or has been queried 502 for link in linked_entities: 503 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 504 # new entity discovered! 505 # might be discovered (before collection) multiple times, so retain lowest depth 506 # print(f"Potentially crawling {link}") 507 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 508 if link not in crawl_references: 509 crawl_references[link] = 0 510 crawl_references[link] += 1 511 512 # Add to queries if it has been referenced enough times 513 if crawl_references[link] >= crawl_msg_threshold: 514 queries.append(link) 515 full_query.add(link) 516 num_queries += 1 517 discovered += 1 518 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 519 520 521 522 serialized_message["4CAT_metadata"] = { 523 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 524 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 525 "query_depth": depth_map.get(query, 0) 526 } 527 yield serialized_message 528 529 if entity_posts >= max_items: 530 break 531 532 except ChannelPrivateError: 533 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 534 self.flawless += 1 535 536 except (UsernameInvalidError,): 537 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 538 self.flawless += 1 539 540 except FloodWaitError as e: 541 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 542 if e.seconds < self.end_if_rate_limited: 543 time.sleep(e.seconds) 544 continue 545 else: 546 self.flawless += 1 547 no_additional_queries = True 548 self.dataset.update_status( 549 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 550 break 551 552 except BadRequestError as e: 553 self.dataset.update_status( 554 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 555 self.flawless += 1 556 557 except ValueError as e: 558 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 559 self.flawless += 1 560 561 except ChannelPrivateError as e: 562 self.dataset.update_status( 563 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 564 break 565 566 except TimeoutError: 567 if retries < 3: 568 self.dataset.update_status( 569 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 570 self.flawless += 1 571 break 572 573 self.dataset.update_status( 574 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 575 time.sleep(delay) 576 delay *= 2 577 continue 578 579 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 580 break 581 582 async def resolve_groups(self, client, message): 583 """ 584 Recursively resolve references to groups and users 585 586 :param client: Telethon client instance 587 :param dict message: Message, as already mapped by serialize_obj 588 :return: Resolved dictionary 589 """ 590 resolved_message = message.copy() 591 for key, value in message.items(): 592 try: 593 if type(value) is not dict: 594 # if it's not a dict, we never have to resolve it, as it 595 # does not represent an entity 596 continue 597 598 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 599 # forwarded from a channel! 600 if value["channel_id"] in self.failures_cache: 601 continue 602 603 if value["channel_id"] not in self.details_cache: 604 channel = await client(GetFullChannelRequest(value["channel_id"])) 605 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 606 607 resolved_message[key] = self.details_cache[value["channel_id"]] 608 resolved_message[key]["channel_id"] = value["channel_id"] 609 610 elif "_type" in value and value["_type"] == "PeerUser": 611 # a user! 612 if value["user_id"] in self.failures_cache: 613 continue 614 615 if value["user_id"] not in self.details_cache: 616 user = await client(GetFullUserRequest(value["user_id"])) 617 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 618 619 resolved_message[key] = self.details_cache[value["user_id"]] 620 resolved_message[key]["user_id"] = value["user_id"] 621 else: 622 resolved_message[key] = await self.resolve_groups(client, value) 623 624 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 625 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 626 if type(e) in (ChannelPrivateError, UsernameInvalidError): 627 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 628 else: 629 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 630 631 return resolved_message 632 633 @staticmethod 634 def cancel_start(): 635 """ 636 Replace interactive phone number input in Telethon 637 638 By default, if Telethon cannot use the given session file to 639 authenticate, it will interactively prompt the user for a phone 640 number on the command line. That is not useful here, so instead 641 raise a RuntimeError. This will be caught below and the user will 642 be told they need to re-authenticate via 4CAT. 643 """ 644 raise RuntimeError("Connection cancelled") 645 646 @staticmethod 647 def map_item(message): 648 """ 649 Convert Message object to 4CAT-ready data object 650 651 :param Message message: Message to parse 652 :return dict: 4CAT-compatible item object 653 """ 654 if message["_chat"]["username"]: 655 # chats can apparently not have usernames??? 656 # truly telegram objects are way too lenient for their own good 657 thread = message["_chat"]["username"] 658 elif message["_chat"]["title"]: 659 thread = re.sub(r"\s", "", message["_chat"]["title"]) 660 else: 661 # just give up 662 thread = "unknown" 663 664 # determine username 665 # API responses only include the user *ID*, not the username, and to 666 # complicate things further not everyone is a user and not everyone 667 # has a username. If no username is available, try the first and 668 # last name someone has supplied 669 fullname = "" 670 username = "" 671 user_id = message["_sender"]["id"] if message.get("_sender") else "" 672 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 673 674 if message.get("_sender") and message["_sender"].get("username"): 675 username = message["_sender"]["username"] 676 677 if message.get("_sender") and message["_sender"].get("first_name"): 678 fullname += message["_sender"]["first_name"] 679 680 if message.get("_sender") and message["_sender"].get("last_name"): 681 fullname += " " + message["_sender"]["last_name"] 682 683 fullname = fullname.strip() 684 685 # determine media type 686 # these store some extra information of the attachment in 687 # attachment_data. Since the final result will be serialised as a csv 688 # file, we can only store text content. As such some media data is 689 # serialised as JSON. 690 attachment_type = SearchTelegram.get_media_type(message["media"]) 691 attachment_filename = "" 692 693 if attachment_type == "contact": 694 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 695 if message["media"].get('contact', False): 696 # Old datastructure 697 attachment = message["media"]["contact"] 698 elif all([property in message["media"].keys() for property in contact_data]): 699 # New datastructure 2022/7/25 700 attachment = message["media"] 701 else: 702 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 703 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 704 705 elif attachment_type == "document": 706 # videos, etc 707 # This could add a separate routine for videos to make them a 708 # separate type, which could then be scraped later, etc 709 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 710 if attachment_type == "video": 711 attachment = message["media"]["document"] 712 attachment_data = json.dumps({ 713 "id": attachment["id"], 714 "dc_id": attachment["dc_id"], 715 "file_reference": attachment["file_reference"], 716 }) 717 else: 718 attachment_data = "" 719 720 # elif attachment_type in ("geo", "geo_live"): 721 # untested whether geo_live is significantly different from geo 722 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 723 724 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 725 # we don't actually store any metadata about the photo, since very 726 # little of the metadata attached is of interest. Instead, the 727 # actual photos may be downloaded via a processor that is run on the 728 # search results 729 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 730 attachment_data = json.dumps({ 731 "id": attachment["id"], 732 "dc_id": attachment["dc_id"], 733 "file_reference": attachment["file_reference"], 734 }) 735 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 736 737 elif attachment_type == "poll": 738 # unfortunately poll results are only available when someone has 739 # actually voted on the poll - that will usually not be the case, 740 # so we store -1 as the vote count 741 attachment = message["media"] 742 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 743 attachment_data = json.dumps({ 744 "question": attachment["poll"]["question"], 745 "voters": attachment["results"]["total_voters"], 746 "answers": [{ 747 "answer": options[answer["option"]], 748 "votes": answer["voters"] 749 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 750 "answer": options[option], 751 "votes": -1 752 } for option in options] 753 }) 754 755 else: 756 attachment_data = "" 757 758 # was the message forwarded from somewhere and if so when? 759 forwarded_timestamp = "" 760 forwarded_name = "" 761 forwarded_id = "" 762 forwarded_username = "" 763 if message.get("fwd_from") and "from_id" in message["fwd_from"] and not ( 764 type(message["fwd_from"]["from_id"]) is int): 765 # forward information is spread out over a lot of places 766 # we can identify, in order of usefulness: username, full name, 767 # and ID. But not all of these are always available, and not 768 # always in the same place either 769 forwarded_timestamp = int(message["fwd_from"]["date"]) 770 from_data = message["fwd_from"]["from_id"] 771 772 if from_data: 773 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 774 775 if message["fwd_from"].get("from_name"): 776 forwarded_name = message["fwd_from"].get("from_name") 777 778 if from_data and from_data.get("from_name"): 779 forwarded_name = message["fwd_from"]["from_name"] 780 781 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 782 from_data["user"] = from_data["users"][0] 783 784 if from_data and ("user" in from_data or "chats" in from_data): 785 # 'resolve entities' was enabled for this dataset 786 if "user" in from_data: 787 if from_data["user"].get("username"): 788 forwarded_username = from_data["user"]["username"] 789 790 if from_data["user"].get("first_name"): 791 forwarded_name = from_data["user"]["first_name"] 792 if message["fwd_from"].get("last_name"): 793 forwarded_name += " " + from_data["user"]["last_name"] 794 795 forwarded_name = forwarded_name.strip() 796 797 elif "chats" in from_data: 798 channel_id = from_data.get("channel_id") 799 for chat in from_data["chats"]: 800 if chat["id"] == channel_id or channel_id is None: 801 forwarded_username = chat["username"] 802 803 elif message.get("_forward") and message["_forward"].get("_chat"): 804 if message["_forward"]["_chat"].get("username"): 805 forwarded_username = message["_forward"]["_chat"]["username"] 806 807 if message["_forward"]["_chat"].get("title"): 808 forwarded_name = message["_forward"]["_chat"]["title"] 809 810 link_title = "" 811 link_attached = "" 812 link_description = "" 813 reactions = "" 814 815 if message.get("media") and message["media"].get("webpage"): 816 link_title = message["media"]["webpage"].get("title") 817 link_attached = message["media"]["webpage"].get("url") 818 link_description = message["media"]["webpage"].get("description") 819 820 if message.get("reactions") and message["reactions"].get("results"): 821 for reaction in message["reactions"]["results"]: 822 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 823 # Updated to support new reaction datastructure 824 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 825 elif type(reaction["reaction"]) is str and "count" in reaction: 826 reactions += reaction["reaction"] * reaction["count"] 827 else: 828 # Failsafe; can be updated to support formatting of new datastructures in the future 829 reactions += f"{reaction}, " 830 831 # t.me links 832 linked_entities = set() 833 all_links = ural.urls_from_text(message["message"]) 834 all_links = [link.split("t.me/")[1] for link in all_links if 835 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 836 837 for link in all_links: 838 if link.startswith("+"): 839 # invite links 840 continue 841 842 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 843 linked_entities.add(entity_name) 844 845 # @references 846 # in execute_queries we use MessageEntityMention to get these 847 # however, after serializing these objects we only have the offsets of 848 # the mentioned username, and telegram does weird unicode things to its 849 # offsets meaning we can't just substring the message. So use a regex 850 # as a 'good enough' solution 851 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 852 853 # make this case-insensitive since people may use different casing in 854 # messages than the 'official' username for example 855 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 856 all_ci_connections = set() 857 seen = set() 858 for connection in all_connections: 859 if connection.lower() not in seen: 860 all_ci_connections.add(connection) 861 seen.add(connection.lower()) 862 863 return MappedItem({ 864 "id": f"{message['_chat']['username']}-{message['id']}", 865 "thread_id": thread, 866 "chat": message["_chat"]["username"], 867 "author": user_id, 868 "author_username": username, 869 "author_name": fullname, 870 "author_is_bot": "yes" if user_is_bot else "no", 871 "body": message["message"], 872 "reply_to": message.get("reply_to_msg_id", ""), 873 "views": message["views"] if message["views"] else "", 874 # "forwards": message.get("forwards", MissingMappedField(0)), 875 "reactions": reactions, 876 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 877 "unix_timestamp": int(message["date"]), 878 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 879 "edit_date"] else "", 880 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 881 "author_forwarded_from_name": forwarded_name, 882 "author_forwarded_from_username": forwarded_username, 883 "author_forwarded_from_id": forwarded_id, 884 "entities_linked": ",".join(linked_entities), 885 "entities_mentioned": ",".join(all_mentions), 886 "all_connections": ",".join(all_ci_connections), 887 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 888 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 889 "unix_timestamp_forwarded_from": forwarded_timestamp, 890 "link_title": link_title, 891 "link_description": link_description, 892 "link_attached": link_attached, 893 "attachment_type": attachment_type, 894 "attachment_data": attachment_data, 895 "attachment_filename": attachment_filename 896 }) 897 898 @staticmethod 899 def get_media_type(media): 900 """ 901 Get media type for a Telegram attachment 902 903 :param media: Media object 904 :return str: Textual identifier of the media type 905 """ 906 try: 907 return { 908 "NoneType": "", 909 "MessageMediaContact": "contact", 910 "MessageMediaDocument": "document", 911 "MessageMediaEmpty": "", 912 "MessageMediaGame": "game", 913 "MessageMediaGeo": "geo", 914 "MessageMediaGeoLive": "geo_live", 915 "MessageMediaInvoice": "invoice", 916 "MessageMediaPhoto": "photo", 917 "MessageMediaPoll": "poll", 918 "MessageMediaUnsupported": "unsupported", 919 "MessageMediaVenue": "venue", 920 "MessageMediaWebPage": "url" 921 }[media.get("_type", None)] 922 except (AttributeError, KeyError): 923 return "" 924 925 @staticmethod 926 def serialize_obj(input_obj): 927 """ 928 Serialize an object as a dictionary 929 930 Telethon message objects are not serializable by themselves, but most 931 relevant attributes are simply struct classes. This function replaces 932 those that are not with placeholders and then returns a dictionary that 933 can be serialized as JSON. 934 935 :param obj: Object to serialize 936 :return: Serialized object 937 """ 938 scalars = (int, str, float, list, tuple, set, bool) 939 940 if type(input_obj) in scalars or input_obj is None: 941 return input_obj 942 943 if type(input_obj) is not dict: 944 obj = input_obj.__dict__ 945 else: 946 obj = input_obj.copy() 947 948 mapped_obj = {} 949 for item, value in obj.items(): 950 if type(value) is datetime: 951 mapped_obj[item] = value.timestamp() 952 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 953 mapped_obj[item] = SearchTelegram.serialize_obj(value) 954 elif type(value) is list: 955 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 956 elif type(value) is bytes: 957 mapped_obj[item] = value.hex() 958 elif type(value) not in scalars and value is not None: 959 # type we can't make sense of here 960 continue 961 else: 962 mapped_obj[item] = value 963 964 # Add the _type if the original object was a telethon type 965 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 966 mapped_obj["_type"] = type(input_obj).__name__ 967 return mapped_obj 968 969 @staticmethod 970 def validate_query(query, request, user): 971 """ 972 Validate Telegram query 973 974 :param dict query: Query parameters, from client-side. 975 :param request: Flask request 976 :param User user: User object of user who has submitted the query 977 :return dict: Safe query parameters 978 """ 979 # no query 4 u 980 if not query.get("query", "").strip(): 981 raise QueryParametersException("You must provide a search query.") 982 983 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 984 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 985 986 all_posts = config.get("telegram-search.can_query_all_messages", False, user=user) 987 max_entities = config.get("telegram-search.max_entities", 25, user=user) 988 989 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options()["max_posts"]["max"]) 990 991 # reformat queries to be a comma-separated list with no wrapping 992 # whitespace 993 whitespace = re.compile(r"\s+") 994 items = whitespace.sub("", query.get("query").replace("\n", ",")) 995 if max_entities > 0 and len(items.split(",")) > max_entities: 996 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 997 998 sanitized_items = [] 999 # handle telegram URLs 1000 for item in items.split(","): 1001 if not item.strip(): 1002 continue 1003 item = re.sub(r"^https?://t\.me/", "", item) 1004 item = re.sub(r"^/?s/", "", item) 1005 item = re.sub(r"[/]*$", "", item) 1006 sanitized_items.append(item) 1007 1008 # the dates need to make sense as a range to search within 1009 min_date, max_date = query.get("daterange") 1010 1011 # now check if there is an active API session 1012 if not user or not user.is_authenticated or user.is_anonymous: 1013 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1014 "accounts.") 1015 1016 # check for the information we need 1017 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1018 query.get("api_hash")) 1019 user.set_value("telegram.session", session_id) 1020 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session") 1021 1022 client = None 1023 1024 # API ID is always a number, if it's not, we can immediately fail 1025 try: 1026 api_id = int(query.get("api_id")) 1027 except ValueError: 1028 raise QueryParametersException("Invalid API ID.") 1029 1030 # maybe we've entered a code already and submitted it with the request 1031 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1032 code_callback = lambda: request.form.get("option-security-code") 1033 max_attempts = 1 1034 else: 1035 code_callback = lambda: -1 1036 # max_attempts = 0 because authing will always fail: we can't wait for 1037 # the code to be entered interactively, we'll need to do a new request 1038 # but we can't just immediately return, we still need to call start() 1039 # to get telegram to send us a code 1040 max_attempts = 0 1041 1042 # now try authenticating 1043 needs_code = False 1044 try: 1045 loop = asyncio.new_event_loop() 1046 asyncio.set_event_loop(loop) 1047 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1048 1049 try: 1050 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1051 1052 except ValueError as e: 1053 # this happens if 2FA is required 1054 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1055 f"does not support this authentication mode for Telegram. ({e})") 1056 except RuntimeError as e: 1057 # A code was sent to the given phone number 1058 needs_code = True 1059 except FloodWaitError as e: 1060 # uh oh, we got rate-limited 1061 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1062 str(e).split("(")[0] + ".") 1063 except ApiIdInvalidError as e: 1064 # wrong credentials 1065 raise QueryParametersException("Your API credentials are invalid.") 1066 except PhoneNumberInvalidError as e: 1067 # wrong phone number 1068 raise QueryParametersException( 1069 "The phone number provided is not a valid phone number for these credentials.") 1070 except RPCError as e: 1071 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1072 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1073 f"Telegram app(s) to the latest version to proceed ({e}).") 1074 except Exception as e: 1075 # ? 1076 raise QueryParametersException( 1077 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1078 finally: 1079 if client: 1080 client.disconnect() 1081 1082 if needs_code: 1083 raise QueryNeedsFurtherInputException(config={ 1084 "code-info": { 1085 "type": UserInput.OPTION_INFO, 1086 "help": "Please enter the security code that was sent to your Telegram app to continue." 1087 }, 1088 "security-code": { 1089 "type": UserInput.OPTION_TEXT, 1090 "help": "Security code", 1091 "sensitive": True 1092 }}) 1093 1094 # simple! 1095 return { 1096 "items": num_items, 1097 "query": ",".join(sanitized_items), 1098 "api_id": query.get("api_id"), 1099 "api_hash": query.get("api_hash"), 1100 "api_phone": query.get("api_phone"), 1101 "save-session": query.get("save-session"), 1102 "resolve-entities": query.get("resolve-entities"), 1103 "min_date": min_date, 1104 "max_date": max_date, 1105 "crawl-depth": query.get("crawl-depth"), 1106 "crawl-threshold": query.get("crawl-threshold"), 1107 "crawl-via-links": query.get("crawl-via-links") 1108 } 1109 1110 @staticmethod 1111 def create_session_id(api_phone, api_id, api_hash): 1112 """ 1113 Generate a filename for the session file 1114 1115 This is a combination of phone number and API credentials, but hashed 1116 so that one cannot actually derive someone's phone number from it. 1117 1118 :param str api_phone: Phone number for API ID 1119 :param int api_id: Telegram API ID 1120 :param str api_hash: Telegram API Hash 1121 :return str: A hash value derived from the input 1122 """ 1123 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1124 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Search Telegram via API
84 @classmethod 85 def get_options(cls, parent_dataset=None, user=None): 86 """ 87 Get processor options 88 89 Just updates the description of the entities field based on the 90 configured max entities. 91 92 :param DataSet parent_dataset: An object representing the dataset that 93 the processor would be run on 94 :param User user: Flask user the options will be displayed for, in 95 case they are requested for display in the 4CAT web interface. This can 96 be used to show some options only to privileges users. 97 """ 98 max_entities = config.get("telegram-search.max_entities", 25, user=user) 99 options = { 100 "intro": { 101 "type": UserInput.OPTION_INFO, 102 "help": "Messages are scraped in reverse chronological order: the most recent message for a given entity " 103 "(e.g. a group) will be scraped first.\n\nTo query the Telegram API, you need to supply your [API " 104 "credentials](https://my.telegram.org/apps). 4CAT at this time does not support two-factor " 105 "authentication for Telegram." 106 }, 107 "api_id": { 108 "type": UserInput.OPTION_TEXT, 109 "help": "API ID", 110 "cache": True, 111 }, 112 "api_hash": { 113 "type": UserInput.OPTION_TEXT, 114 "help": "API Hash", 115 "cache": True, 116 }, 117 "api_phone": { 118 "type": UserInput.OPTION_TEXT, 119 "help": "Phone number", 120 "cache": True, 121 "default": "+xxxxxxxxxx" 122 }, 123 "divider": { 124 "type": UserInput.OPTION_DIVIDER 125 }, 126 "query-intro": { 127 "type": UserInput.OPTION_INFO, 128 "help": "Separate with commas or line breaks." 129 }, 130 "query": { 131 "type": UserInput.OPTION_TEXT_LARGE, 132 "help": "Entities to scrape", 133 "tooltip": "Separate with commas or line breaks." 134 }, 135 "max_posts": { 136 "type": UserInput.OPTION_TEXT, 137 "help": "Messages per group", 138 "min": 1, 139 "max": 50000, 140 "default": 10 141 }, 142 "daterange": { 143 "type": UserInput.OPTION_DATERANGE, 144 "help": "Date range" 145 }, 146 "divider-2": { 147 "type": UserInput.OPTION_DIVIDER 148 }, 149 "info-sensitive": { 150 "type": UserInput.OPTION_INFO, 151 "help": "Your API credentials and phone number **will be sent to the 4CAT server** and will be stored " 152 "there while data is fetched. After the dataset has been created your credentials will be " 153 "deleted from the server, unless you enable the option below. If you want to download images " 154 "attached to the messages in your collected data, you need to enable this option. Your " 155 "credentials will never be visible to other users and can be erased later via the result page." 156 }, 157 "save-session": { 158 "type": UserInput.OPTION_TOGGLE, 159 "help": "Save session:", 160 "default": False 161 }, 162 "resolve-entities-intro": { 163 "type": UserInput.OPTION_INFO, 164 "help": "4CAT can resolve the references to channels and user and replace the numeric ID with the full " 165 "user, channel or group metadata. Doing so allows one to discover e.g. new relevant groups and " 166 "figure out where or who a message was forwarded from.\n\nHowever, this increases query time and " 167 "for large datasets, increases the chance you will be rate-limited and your dataset isn't able " 168 "to finish capturing. It will also dramatically increase the disk space needed to store the " 169 "data, so only enable this if you really need it!" 170 }, 171 "resolve-entities": { 172 "type": UserInput.OPTION_TOGGLE, 173 "help": "Resolve references", 174 "default": False, 175 } 176 } 177 178 if max_entities: 179 options["query-intro"]["help"] = (f"You can collect messages from up to **{max_entities:,}** entities " 180 f"(channels or groups) at a time. Separate with line breaks or commas.") 181 182 all_messages = config.get("telegram-search.can_query_all_messages", False, user=user) 183 if all_messages: 184 if "max" in options["max_posts"]: 185 del options["max_posts"]["max"] 186 else: 187 options["max_posts"]["help"] = (f"Messages to collect per entity. You can query up to " 188 f"{options['max_posts']['max']:,} messages per entity.") 189 190 if config.get("telegram-search.max_crawl_depth", 0, user=user) > 0: 191 options["crawl_intro"] = { 192 "type": UserInput.OPTION_INFO, 193 "help": "Optionally, 4CAT can 'discover' new entities via forwarded messages; for example, if a " 194 "channel X you are collecting data for contains a message forwarded from channel Y, 4CAT can " 195 "collect messages from both channel X and Y. **Use this feature with caution**, as datasets can " 196 "rapidly grow when adding newly discovered entities to the query this way. Note that dataset " 197 "progress cannot be accurately tracked when you use this feature." 198 } 199 options["crawl-depth"] = { 200 "type": UserInput.OPTION_TEXT, 201 "coerce_type": int, 202 "min": 0, 203 "max": config.get("telegram-search.max_crawl_depth"), 204 "default": 0, 205 "help": "Crawl depth", 206 "tooltip": "How many 'hops' to make when crawling messages. This is the distance from an initial " 207 "query, i.e. at most this many hops can be needed to reach the entity from one of the " 208 "starting entities." 209 } 210 options["crawl-threshold"] = { 211 "type": UserInput.OPTION_TEXT, 212 "coerce_type": int, 213 "min": 0, 214 "default": 5, 215 "help": "Crawl threshold", 216 "tooltip": "Entities need to be references at least this many times to be added to the query. Only " 217 "references discovered below the max crawl depth are taken into account." 218 } 219 options["crawl-via-links"] = { 220 "type": UserInput.OPTION_TOGGLE, 221 "default": False, 222 "help": "Extract new groups from links", 223 "tooltip": "Look for references to other groups in message content via t.me links and @references. " 224 "This is more error-prone than crawling only via forwards, but can be a way to discover " 225 "links that would otherwise remain undetected." 226 } 227 228 return options
Get processor options
Just updates the description of the entities field based on the configured max entities.
Parameters
- DataSet parent_dataset: An object representing the dataset that the processor would be run on
- User user: Flask user the options will be displayed for, in case they are requested for display in the 4CAT web interface. This can be used to show some options only to privileges users.
231 def get_items(self, query): 232 """ 233 Execute a query; get messages for given parameters 234 235 Basically a wrapper around execute_queries() to call it with asyncio. 236 237 :param dict query: Query parameters, as part of the DataSet object 238 :return list: Posts, sorted by thread and post ID, in ascending order 239 """ 240 if "api_phone" not in query or "api_hash" not in query or "api_id" not in query: 241 self.dataset.update_status("Could not create dataset since the Telegram API Hash and ID are missing. Try " 242 "creating it again from scratch.", is_final=True) 243 return None 244 245 self.details_cache = {} 246 self.failures_cache = set() 247 #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this 248 results = asyncio.run(self.execute_queries()) 249 250 if not query.get("save-session"): 251 self.dataset.delete_parameter("api_hash", instant=True) 252 self.dataset.delete_parameter("api_phone", instant=True) 253 self.dataset.delete_parameter("api_id", instant=True) 254 255 if self.flawless: 256 self.dataset.update_status(f"Dataset completed, but {self.flawless} requested entities were unavailable (they may have " 257 "been private). View the log file for details.", is_final=True) 258 259 return results
Execute a query; get messages for given parameters
Basically a wrapper around execute_queries() to call it with asyncio.
Parameters
- dict query: Query parameters, as part of the DataSet object
Returns
Posts, sorted by thread and post ID, in ascending order
261 async def execute_queries(self): 262 """ 263 Get messages for queries 264 265 This is basically what would be done in get_items(), except due to 266 Telethon's architecture this needs to be called in an async method, 267 which is this one. 268 269 :return list: Collected messages 270 """ 271 # session file has been created earlier, and we can re-use it here in 272 # order to avoid having to re-enter the security code 273 query = self.parameters 274 275 session_id = SearchTelegram.create_session_id(query["api_phone"], query["api_id"], query["api_hash"]) 276 self.dataset.log(f'Telegram session id: {session_id}') 277 session_path = Path(config.get("PATH_ROOT")).joinpath(config.get("PATH_SESSIONS"), session_id + ".session") 278 279 client = None 280 281 try: 282 client = TelegramClient(str(session_path), int(query.get("api_id")), query.get("api_hash"), 283 loop=self.eventloop) 284 await client.start(phone=SearchTelegram.cancel_start) 285 except RuntimeError: 286 # session is no longer useable, delete file so user will be asked 287 # for security code again. The RuntimeError is raised by 288 # `cancel_start()` 289 self.dataset.update_status( 290 "Session is not authenticated: login security code may have expired. You need to re-enter the security code.", 291 is_final=True) 292 293 if client and hasattr(client, "disconnect"): 294 await client.disconnect() 295 296 if session_path.exists(): 297 session_path.unlink() 298 299 return [] 300 except Exception as e: 301 # not sure what exception specifically is triggered here, but it 302 # always means the connection failed 303 self.log.error(f"Telegram: {e}\n{traceback.format_exc()}") 304 self.dataset.update_status("Error connecting to the Telegram API with provided credentials.", is_final=True) 305 if client and hasattr(client, "disconnect"): 306 await client.disconnect() 307 return [] 308 309 # ready our parameters 310 parameters = self.dataset.get_parameters() 311 queries = [query.strip() for query in parameters.get("query", "").split(",")] 312 max_items = convert_to_int(parameters.get("items", 10), 10) 313 314 # Telethon requires the offset date to be a datetime date 315 max_date = parameters.get("max_date") 316 if max_date: 317 try: 318 max_date = datetime.fromtimestamp(int(max_date)) 319 except ValueError: 320 max_date = None 321 322 # min_date can remain an integer 323 min_date = parameters.get("min_date") 324 if min_date: 325 try: 326 min_date = int(min_date) 327 except ValueError: 328 min_date = None 329 330 posts = [] 331 try: 332 async for post in self.gather_posts(client, queries, max_items, min_date, max_date): 333 posts.append(post) 334 return posts 335 except ProcessorInterruptedException as e: 336 raise e 337 except Exception as e: 338 # catch-all so we can disconnect properly 339 # ...should we? 340 self.dataset.update_status("Error scraping posts from Telegram; halting collection.") 341 self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") 342 # May as well return what was captured, yes? 343 return posts 344 finally: 345 await client.disconnect()
Get messages for queries
This is basically what would be done in get_items(), except due to Telethon's architecture this needs to be called in an async method, which is this one.
Returns
Collected messages
347 async def gather_posts(self, client, queries, max_items, min_date, max_date): 348 """ 349 Gather messages for each entity for which messages are requested 350 351 :param TelegramClient client: Telegram Client 352 :param list queries: List of entities to query (as string) 353 :param int max_items: Messages to scrape per entity 354 :param int min_date: Datetime date to get posts after 355 :param int max_date: Datetime date to get posts before 356 :return list: List of messages, each message a dictionary. 357 """ 358 resolve_refs = self.parameters.get("resolve-entities") 359 360 # Adding flag to stop; using for rate limits 361 no_additional_queries = False 362 363 # This is used for the 'crawl' feature so we know at which depth a 364 # given entity was discovered 365 depth_map = { 366 entity: 0 for entity in queries 367 } 368 369 crawl_max_depth = self.parameters.get("crawl-depth", 0) 370 crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) 371 crawl_via_links = self.parameters.get("crawl-via-links", False) 372 373 self.dataset.log(f"Max crawl depth: {crawl_max_depth}") 374 self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") 375 376 # this keeps track of how often an entity not in the original query 377 # has been mentioned. When crawling is enabled and this exceeds the 378 # given threshold, the entity is added to the query 379 crawl_references = {} 380 full_query = set(queries) 381 num_queries = len(queries) 382 383 # we may not always know the 'entity username' for an entity ID, so 384 # keep a reference map as we go 385 entity_id_map = {} 386 query_id_map= {} 387 388 # Collect queries 389 # Use while instead of for so we can change queries during iteration 390 # this is needed for the 'crawl' feature which can discover new 391 # entities during crawl 392 processed = 0 393 total_messages = 0 394 while queries: 395 query = queries.pop(0) 396 397 delay = 10 398 retries = 0 399 processed += 1 400 self.dataset.update_progress(processed / num_queries) 401 402 if no_additional_queries: 403 # Note that we are not completing this query 404 self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") 405 continue 406 407 while True: 408 self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") 409 entity_posts = 0 410 discovered = 0 411 try: 412 async for message in client.iter_messages(entity=query, offset_date=max_date): 413 entity_posts += 1 414 total_messages += 1 415 if self.interrupted: 416 raise ProcessorInterruptedException( 417 "Interrupted while fetching message data from the Telegram API") 418 419 if entity_posts % 100 == 0: 420 self.dataset.update_status( 421 f"Retrieved {entity_posts:,} posts for entity '{entity_id_map.get(query, query)}' ({total_messages:,} total)") 422 423 if message.action is not None: 424 # e.g. someone joins the channel - not an actual message 425 continue 426 427 # todo: possibly enrich object with e.g. the name of 428 # the channel a message was forwarded from (but that 429 # needs extra API requests...) 430 serialized_message = SearchTelegram.serialize_obj(message) 431 if "_chat" in serialized_message: 432 # Add query ID to check if queries have been crawled previously 433 full_query.add(serialized_message["_chat"]["id"]) 434 if query not in entity_id_map and serialized_message["_chat"]["id"] == query: 435 # once we know what a channel ID resolves to, use the username instead so it is easier to 436 # understand for the user 437 entity_id_map[query] = serialized_message["_chat"]["username"] 438 self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") 439 440 if resolve_refs: 441 serialized_message = await self.resolve_groups(client, serialized_message) 442 443 # Stop if we're below the min date 444 if min_date and serialized_message.get("date") < min_date: 445 break 446 447 # if crawling is enabled, see if we found something to add to the query 448 linked_entities = set() 449 if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): 450 message_fwd = serialized_message.get("fwd_from") 451 fwd_from = None 452 fwd_source_type = None 453 if message_fwd and message_fwd.get("from_id"): 454 if message_fwd["from_id"].get("_type") == "PeerChannel": 455 # Legacy(?) data structure (pre 2024/7/22) 456 # even if we haven't resolved the ID, we can feed the numeric ID 457 # to Telethon! this is nice because it means we don't have to 458 # resolve entities to crawl iteratively 459 fwd_from = int(message_fwd["from_id"]["channel_id"]) 460 fwd_source_type = "channel" 461 elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): 462 # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me 463 # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far 464 fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) 465 fwd_source_type = "channel" 466 elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): 467 # forwards can also come from users 468 # these can never be followed, so don't add these to the crawl, but do document them 469 fwd_source_type = "user" 470 else: 471 print(json.dumps(message_fwd)) 472 self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") 473 fwd_source_type = "unknown" 474 475 if fwd_from: 476 linked_entities.add(fwd_from) 477 478 479 if crawl_via_links: 480 # t.me links 481 all_links = ural.urls_from_text(serialized_message["message"]) 482 all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 483 for link in all_links: 484 if link.startswith("+"): 485 # invite links 486 continue 487 488 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 489 linked_entities.add(entity_name) 490 491 # @references 492 references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] 493 for reference in references: 494 if reference.startswith("@"): 495 reference = reference[1:] 496 497 reference = reference.split("/")[0] 498 499 linked_entities.add(reference) 500 501 # Check if fwd_from or the resolved entity ID is already queued or has been queried 502 for link in linked_entities: 503 if link not in full_query and link not in queries and fwd_source_type not in ("user",): 504 # new entity discovered! 505 # might be discovered (before collection) multiple times, so retain lowest depth 506 # print(f"Potentially crawling {link}") 507 depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) 508 if link not in crawl_references: 509 crawl_references[link] = 0 510 crawl_references[link] += 1 511 512 # Add to queries if it has been referenced enough times 513 if crawl_references[link] >= crawl_msg_threshold: 514 queries.append(link) 515 full_query.add(link) 516 num_queries += 1 517 discovered += 1 518 self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") 519 520 521 522 serialized_message["4CAT_metadata"] = { 523 "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls 524 "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity 525 "query_depth": depth_map.get(query, 0) 526 } 527 yield serialized_message 528 529 if entity_posts >= max_items: 530 break 531 532 except ChannelPrivateError: 533 self.dataset.update_status(f"Entity {entity_id_map.get(query, query)} is private, skipping") 534 self.flawless += 1 535 536 except (UsernameInvalidError,): 537 self.dataset.update_status(f"Could not scrape entity '{entity_id_map.get(query, query)}', does not seem to exist, skipping") 538 self.flawless += 1 539 540 except FloodWaitError as e: 541 self.dataset.update_status(f"Rate-limited by Telegram: {e}; waiting") 542 if e.seconds < self.end_if_rate_limited: 543 time.sleep(e.seconds) 544 continue 545 else: 546 self.flawless += 1 547 no_additional_queries = True 548 self.dataset.update_status( 549 f"Telegram wait grown larger than {int(e.seconds / 60)} minutes, ending") 550 break 551 552 except BadRequestError as e: 553 self.dataset.update_status( 554 f"Error '{e.__class__.__name__}' while collecting entity {entity_id_map.get(query, query)}, skipping") 555 self.flawless += 1 556 557 except ValueError as e: 558 self.dataset.update_status(f"Error '{e}' while collecting entity {entity_id_map.get(query, query)}, skipping") 559 self.flawless += 1 560 561 except ChannelPrivateError as e: 562 self.dataset.update_status( 563 f"QUERY '{entity_id_map.get(query, query)}' unable to complete due to error {e}. Skipping.") 564 break 565 566 except TimeoutError: 567 if retries < 3: 568 self.dataset.update_status( 569 f"Tried to fetch messages for entity '{entity_id_map.get(query, query)}' but timed out {retries:,} times. Skipping.") 570 self.flawless += 1 571 break 572 573 self.dataset.update_status( 574 f"Got a timeout from Telegram while fetching messages for entity '{entity_id_map.get(query, query)}'. Trying again in {delay:,} seconds.") 575 time.sleep(delay) 576 delay *= 2 577 continue 578 579 self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") 580 break
Gather messages for each entity for which messages are requested
Parameters
- TelegramClient client: Telegram Client
- list queries: List of entities to query (as string)
- int max_items: Messages to scrape per entity
- int min_date: Datetime date to get posts after
- int max_date: Datetime date to get posts before
Returns
List of messages, each message a dictionary.
582 async def resolve_groups(self, client, message): 583 """ 584 Recursively resolve references to groups and users 585 586 :param client: Telethon client instance 587 :param dict message: Message, as already mapped by serialize_obj 588 :return: Resolved dictionary 589 """ 590 resolved_message = message.copy() 591 for key, value in message.items(): 592 try: 593 if type(value) is not dict: 594 # if it's not a dict, we never have to resolve it, as it 595 # does not represent an entity 596 continue 597 598 elif "_type" in value and value["_type"] in ("InputPeerChannel", "PeerChannel"): 599 # forwarded from a channel! 600 if value["channel_id"] in self.failures_cache: 601 continue 602 603 if value["channel_id"] not in self.details_cache: 604 channel = await client(GetFullChannelRequest(value["channel_id"])) 605 self.details_cache[value["channel_id"]] = SearchTelegram.serialize_obj(channel) 606 607 resolved_message[key] = self.details_cache[value["channel_id"]] 608 resolved_message[key]["channel_id"] = value["channel_id"] 609 610 elif "_type" in value and value["_type"] == "PeerUser": 611 # a user! 612 if value["user_id"] in self.failures_cache: 613 continue 614 615 if value["user_id"] not in self.details_cache: 616 user = await client(GetFullUserRequest(value["user_id"])) 617 self.details_cache[value["user_id"]] = SearchTelegram.serialize_obj(user) 618 619 resolved_message[key] = self.details_cache[value["user_id"]] 620 resolved_message[key]["user_id"] = value["user_id"] 621 else: 622 resolved_message[key] = await self.resolve_groups(client, value) 623 624 except (TypeError, ChannelPrivateError, UsernameInvalidError) as e: 625 self.failures_cache.add(value.get("channel_id", value.get("user_id"))) 626 if type(e) in (ChannelPrivateError, UsernameInvalidError): 627 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']} ({e.__class__.__name__}), leaving as-is") 628 else: 629 self.dataset.log(f"Cannot resolve entity with ID {value.get('channel_id', value.get('user_id'))} of type {value['_type']}, leaving as-is") 630 631 return resolved_message
Recursively resolve references to groups and users
Parameters
- client: Telethon client instance
- dict message: Message, as already mapped by serialize_obj
Returns
Resolved dictionary
633 @staticmethod 634 def cancel_start(): 635 """ 636 Replace interactive phone number input in Telethon 637 638 By default, if Telethon cannot use the given session file to 639 authenticate, it will interactively prompt the user for a phone 640 number on the command line. That is not useful here, so instead 641 raise a RuntimeError. This will be caught below and the user will 642 be told they need to re-authenticate via 4CAT. 643 """ 644 raise RuntimeError("Connection cancelled")
Replace interactive phone number input in Telethon
By default, if Telethon cannot use the given session file to authenticate, it will interactively prompt the user for a phone number on the command line. That is not useful here, so instead raise a RuntimeError. This will be caught below and the user will be told they need to re-authenticate via 4CAT.
646 @staticmethod 647 def map_item(message): 648 """ 649 Convert Message object to 4CAT-ready data object 650 651 :param Message message: Message to parse 652 :return dict: 4CAT-compatible item object 653 """ 654 if message["_chat"]["username"]: 655 # chats can apparently not have usernames??? 656 # truly telegram objects are way too lenient for their own good 657 thread = message["_chat"]["username"] 658 elif message["_chat"]["title"]: 659 thread = re.sub(r"\s", "", message["_chat"]["title"]) 660 else: 661 # just give up 662 thread = "unknown" 663 664 # determine username 665 # API responses only include the user *ID*, not the username, and to 666 # complicate things further not everyone is a user and not everyone 667 # has a username. If no username is available, try the first and 668 # last name someone has supplied 669 fullname = "" 670 username = "" 671 user_id = message["_sender"]["id"] if message.get("_sender") else "" 672 user_is_bot = message["_sender"].get("bot", False) if message.get("_sender") else "" 673 674 if message.get("_sender") and message["_sender"].get("username"): 675 username = message["_sender"]["username"] 676 677 if message.get("_sender") and message["_sender"].get("first_name"): 678 fullname += message["_sender"]["first_name"] 679 680 if message.get("_sender") and message["_sender"].get("last_name"): 681 fullname += " " + message["_sender"]["last_name"] 682 683 fullname = fullname.strip() 684 685 # determine media type 686 # these store some extra information of the attachment in 687 # attachment_data. Since the final result will be serialised as a csv 688 # file, we can only store text content. As such some media data is 689 # serialised as JSON. 690 attachment_type = SearchTelegram.get_media_type(message["media"]) 691 attachment_filename = "" 692 693 if attachment_type == "contact": 694 contact_data = ["phone_number", "first_name", "last_name", "vcard", "user_id"] 695 if message["media"].get('contact', False): 696 # Old datastructure 697 attachment = message["media"]["contact"] 698 elif all([property in message["media"].keys() for property in contact_data]): 699 # New datastructure 2022/7/25 700 attachment = message["media"] 701 else: 702 raise ProcessorException('Cannot find contact data; Telegram datastructure may have changed') 703 attachment_data = json.dumps({property: attachment.get(property) for property in contact_data}) 704 705 elif attachment_type == "document": 706 # videos, etc 707 # This could add a separate routine for videos to make them a 708 # separate type, which could then be scraped later, etc 709 attachment_type = message["media"]["document"]["mime_type"].split("/")[0] 710 if attachment_type == "video": 711 attachment = message["media"]["document"] 712 attachment_data = json.dumps({ 713 "id": attachment["id"], 714 "dc_id": attachment["dc_id"], 715 "file_reference": attachment["file_reference"], 716 }) 717 else: 718 attachment_data = "" 719 720 # elif attachment_type in ("geo", "geo_live"): 721 # untested whether geo_live is significantly different from geo 722 # attachment_data = "%s %s" % (message["geo"]["lat"], message["geo"]["long"]) 723 724 elif attachment_type == "photo" or attachment_type == "url" and message["media"]["webpage"].get("photo"): 725 # we don't actually store any metadata about the photo, since very 726 # little of the metadata attached is of interest. Instead, the 727 # actual photos may be downloaded via a processor that is run on the 728 # search results 729 attachment = message["media"]["photo"] if attachment_type == "photo" else message["media"]["webpage"]["photo"] 730 attachment_data = json.dumps({ 731 "id": attachment["id"], 732 "dc_id": attachment["dc_id"], 733 "file_reference": attachment["file_reference"], 734 }) 735 attachment_filename = thread + "-" + str(message["id"]) + ".jpeg" 736 737 elif attachment_type == "poll": 738 # unfortunately poll results are only available when someone has 739 # actually voted on the poll - that will usually not be the case, 740 # so we store -1 as the vote count 741 attachment = message["media"] 742 options = {option["option"]: option["text"] for option in attachment["poll"]["answers"]} 743 attachment_data = json.dumps({ 744 "question": attachment["poll"]["question"], 745 "voters": attachment["results"]["total_voters"], 746 "answers": [{ 747 "answer": options[answer["option"]], 748 "votes": answer["voters"] 749 } for answer in attachment["results"]["results"]] if attachment["results"]["results"] else [{ 750 "answer": options[option], 751 "votes": -1 752 } for option in options] 753 }) 754 755 else: 756 attachment_data = "" 757 758 # was the message forwarded from somewhere and if so when? 759 forwarded_timestamp = "" 760 forwarded_name = "" 761 forwarded_id = "" 762 forwarded_username = "" 763 if message.get("fwd_from") and "from_id" in message["fwd_from"] and not ( 764 type(message["fwd_from"]["from_id"]) is int): 765 # forward information is spread out over a lot of places 766 # we can identify, in order of usefulness: username, full name, 767 # and ID. But not all of these are always available, and not 768 # always in the same place either 769 forwarded_timestamp = int(message["fwd_from"]["date"]) 770 from_data = message["fwd_from"]["from_id"] 771 772 if from_data: 773 forwarded_id = from_data.get("channel_id", from_data.get("user_id", "")) 774 775 if message["fwd_from"].get("from_name"): 776 forwarded_name = message["fwd_from"].get("from_name") 777 778 if from_data and from_data.get("from_name"): 779 forwarded_name = message["fwd_from"]["from_name"] 780 781 if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: 782 from_data["user"] = from_data["users"][0] 783 784 if from_data and ("user" in from_data or "chats" in from_data): 785 # 'resolve entities' was enabled for this dataset 786 if "user" in from_data: 787 if from_data["user"].get("username"): 788 forwarded_username = from_data["user"]["username"] 789 790 if from_data["user"].get("first_name"): 791 forwarded_name = from_data["user"]["first_name"] 792 if message["fwd_from"].get("last_name"): 793 forwarded_name += " " + from_data["user"]["last_name"] 794 795 forwarded_name = forwarded_name.strip() 796 797 elif "chats" in from_data: 798 channel_id = from_data.get("channel_id") 799 for chat in from_data["chats"]: 800 if chat["id"] == channel_id or channel_id is None: 801 forwarded_username = chat["username"] 802 803 elif message.get("_forward") and message["_forward"].get("_chat"): 804 if message["_forward"]["_chat"].get("username"): 805 forwarded_username = message["_forward"]["_chat"]["username"] 806 807 if message["_forward"]["_chat"].get("title"): 808 forwarded_name = message["_forward"]["_chat"]["title"] 809 810 link_title = "" 811 link_attached = "" 812 link_description = "" 813 reactions = "" 814 815 if message.get("media") and message["media"].get("webpage"): 816 link_title = message["media"]["webpage"].get("title") 817 link_attached = message["media"]["webpage"].get("url") 818 link_description = message["media"]["webpage"].get("description") 819 820 if message.get("reactions") and message["reactions"].get("results"): 821 for reaction in message["reactions"]["results"]: 822 if type(reaction["reaction"]) is dict and "emoticon" in reaction["reaction"]: 823 # Updated to support new reaction datastructure 824 reactions += reaction["reaction"]["emoticon"] * reaction["count"] 825 elif type(reaction["reaction"]) is str and "count" in reaction: 826 reactions += reaction["reaction"] * reaction["count"] 827 else: 828 # Failsafe; can be updated to support formatting of new datastructures in the future 829 reactions += f"{reaction}, " 830 831 # t.me links 832 linked_entities = set() 833 all_links = ural.urls_from_text(message["message"]) 834 all_links = [link.split("t.me/")[1] for link in all_links if 835 ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] 836 837 for link in all_links: 838 if link.startswith("+"): 839 # invite links 840 continue 841 842 entity_name = link.split("/")[0].split("?")[0].split("#")[0] 843 linked_entities.add(entity_name) 844 845 # @references 846 # in execute_queries we use MessageEntityMention to get these 847 # however, after serializing these objects we only have the offsets of 848 # the mentioned username, and telegram does weird unicode things to its 849 # offsets meaning we can't just substring the message. So use a regex 850 # as a 'good enough' solution 851 all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) 852 853 # make this case-insensitive since people may use different casing in 854 # messages than the 'official' username for example 855 all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) 856 all_ci_connections = set() 857 seen = set() 858 for connection in all_connections: 859 if connection.lower() not in seen: 860 all_ci_connections.add(connection) 861 seen.add(connection.lower()) 862 863 return MappedItem({ 864 "id": f"{message['_chat']['username']}-{message['id']}", 865 "thread_id": thread, 866 "chat": message["_chat"]["username"], 867 "author": user_id, 868 "author_username": username, 869 "author_name": fullname, 870 "author_is_bot": "yes" if user_is_bot else "no", 871 "body": message["message"], 872 "reply_to": message.get("reply_to_msg_id", ""), 873 "views": message["views"] if message["views"] else "", 874 # "forwards": message.get("forwards", MissingMappedField(0)), 875 "reactions": reactions, 876 "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), 877 "unix_timestamp": int(message["date"]), 878 "timestamp_edited": datetime.fromtimestamp(message["edit_date"]).strftime("%Y-%m-%d %H:%M:%S") if message[ 879 "edit_date"] else "", 880 "unix_timestamp_edited": int(message["edit_date"]) if message["edit_date"] else "", 881 "author_forwarded_from_name": forwarded_name, 882 "author_forwarded_from_username": forwarded_username, 883 "author_forwarded_from_id": forwarded_id, 884 "entities_linked": ",".join(linked_entities), 885 "entities_mentioned": ",".join(all_mentions), 886 "all_connections": ",".join(all_ci_connections), 887 "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( 888 "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", 889 "unix_timestamp_forwarded_from": forwarded_timestamp, 890 "link_title": link_title, 891 "link_description": link_description, 892 "link_attached": link_attached, 893 "attachment_type": attachment_type, 894 "attachment_data": attachment_data, 895 "attachment_filename": attachment_filename 896 })
Convert Message object to 4CAT-ready data object
Parameters
- Message message: Message to parse
Returns
4CAT-compatible item object
898 @staticmethod 899 def get_media_type(media): 900 """ 901 Get media type for a Telegram attachment 902 903 :param media: Media object 904 :return str: Textual identifier of the media type 905 """ 906 try: 907 return { 908 "NoneType": "", 909 "MessageMediaContact": "contact", 910 "MessageMediaDocument": "document", 911 "MessageMediaEmpty": "", 912 "MessageMediaGame": "game", 913 "MessageMediaGeo": "geo", 914 "MessageMediaGeoLive": "geo_live", 915 "MessageMediaInvoice": "invoice", 916 "MessageMediaPhoto": "photo", 917 "MessageMediaPoll": "poll", 918 "MessageMediaUnsupported": "unsupported", 919 "MessageMediaVenue": "venue", 920 "MessageMediaWebPage": "url" 921 }[media.get("_type", None)] 922 except (AttributeError, KeyError): 923 return ""
Get media type for a Telegram attachment
Parameters
- media: Media object
Returns
Textual identifier of the media type
925 @staticmethod 926 def serialize_obj(input_obj): 927 """ 928 Serialize an object as a dictionary 929 930 Telethon message objects are not serializable by themselves, but most 931 relevant attributes are simply struct classes. This function replaces 932 those that are not with placeholders and then returns a dictionary that 933 can be serialized as JSON. 934 935 :param obj: Object to serialize 936 :return: Serialized object 937 """ 938 scalars = (int, str, float, list, tuple, set, bool) 939 940 if type(input_obj) in scalars or input_obj is None: 941 return input_obj 942 943 if type(input_obj) is not dict: 944 obj = input_obj.__dict__ 945 else: 946 obj = input_obj.copy() 947 948 mapped_obj = {} 949 for item, value in obj.items(): 950 if type(value) is datetime: 951 mapped_obj[item] = value.timestamp() 952 elif type(value).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 953 mapped_obj[item] = SearchTelegram.serialize_obj(value) 954 elif type(value) is list: 955 mapped_obj[item] = [SearchTelegram.serialize_obj(item) for item in value] 956 elif type(value) is bytes: 957 mapped_obj[item] = value.hex() 958 elif type(value) not in scalars and value is not None: 959 # type we can't make sense of here 960 continue 961 else: 962 mapped_obj[item] = value 963 964 # Add the _type if the original object was a telethon type 965 if type(input_obj).__module__ in ("telethon.tl.types", "telethon.tl.custom.forward"): 966 mapped_obj["_type"] = type(input_obj).__name__ 967 return mapped_obj
Serialize an object as a dictionary
Telethon message objects are not serializable by themselves, but most relevant attributes are simply struct classes. This function replaces those that are not with placeholders and then returns a dictionary that can be serialized as JSON.
Parameters
- obj: Object to serialize
Returns
Serialized object
969 @staticmethod 970 def validate_query(query, request, user): 971 """ 972 Validate Telegram query 973 974 :param dict query: Query parameters, from client-side. 975 :param request: Flask request 976 :param User user: User object of user who has submitted the query 977 :return dict: Safe query parameters 978 """ 979 # no query 4 u 980 if not query.get("query", "").strip(): 981 raise QueryParametersException("You must provide a search query.") 982 983 if not query.get("api_id", None) or not query.get("api_hash", None) or not query.get("api_phone", None): 984 raise QueryParametersException("You need to provide valid Telegram API credentials first.") 985 986 all_posts = config.get("telegram-search.can_query_all_messages", False, user=user) 987 max_entities = config.get("telegram-search.max_entities", 25, user=user) 988 989 num_items = query.get("max_posts") if all_posts else min(query.get("max_posts"), SearchTelegram.get_options()["max_posts"]["max"]) 990 991 # reformat queries to be a comma-separated list with no wrapping 992 # whitespace 993 whitespace = re.compile(r"\s+") 994 items = whitespace.sub("", query.get("query").replace("\n", ",")) 995 if max_entities > 0 and len(items.split(",")) > max_entities: 996 raise QueryParametersException(f"You cannot query more than {max_entities:,} items at a time.") 997 998 sanitized_items = [] 999 # handle telegram URLs 1000 for item in items.split(","): 1001 if not item.strip(): 1002 continue 1003 item = re.sub(r"^https?://t\.me/", "", item) 1004 item = re.sub(r"^/?s/", "", item) 1005 item = re.sub(r"[/]*$", "", item) 1006 sanitized_items.append(item) 1007 1008 # the dates need to make sense as a range to search within 1009 min_date, max_date = query.get("daterange") 1010 1011 # now check if there is an active API session 1012 if not user or not user.is_authenticated or user.is_anonymous: 1013 raise QueryParametersException("Telegram scraping is only available to logged-in users with personal " 1014 "accounts.") 1015 1016 # check for the information we need 1017 session_id = SearchTelegram.create_session_id(query.get("api_phone"), query.get("api_id"), 1018 query.get("api_hash")) 1019 user.set_value("telegram.session", session_id) 1020 session_path = Path(config.get('PATH_ROOT')).joinpath(config.get('PATH_SESSIONS'), session_id + ".session") 1021 1022 client = None 1023 1024 # API ID is always a number, if it's not, we can immediately fail 1025 try: 1026 api_id = int(query.get("api_id")) 1027 except ValueError: 1028 raise QueryParametersException("Invalid API ID.") 1029 1030 # maybe we've entered a code already and submitted it with the request 1031 if "option-security-code" in request.form and request.form.get("option-security-code").strip(): 1032 code_callback = lambda: request.form.get("option-security-code") 1033 max_attempts = 1 1034 else: 1035 code_callback = lambda: -1 1036 # max_attempts = 0 because authing will always fail: we can't wait for 1037 # the code to be entered interactively, we'll need to do a new request 1038 # but we can't just immediately return, we still need to call start() 1039 # to get telegram to send us a code 1040 max_attempts = 0 1041 1042 # now try authenticating 1043 needs_code = False 1044 try: 1045 loop = asyncio.new_event_loop() 1046 asyncio.set_event_loop(loop) 1047 client = TelegramClient(str(session_path), api_id, query.get("api_hash"), loop=loop) 1048 1049 try: 1050 client.start(max_attempts=max_attempts, phone=query.get("api_phone"), code_callback=code_callback) 1051 1052 except ValueError as e: 1053 # this happens if 2FA is required 1054 raise QueryParametersException("Your account requires two-factor authentication. 4CAT at this time " 1055 f"does not support this authentication mode for Telegram. ({e})") 1056 except RuntimeError as e: 1057 # A code was sent to the given phone number 1058 needs_code = True 1059 except FloodWaitError as e: 1060 # uh oh, we got rate-limited 1061 raise QueryParametersException("You were rate-limited and should wait a while before trying again. " + 1062 str(e).split("(")[0] + ".") 1063 except ApiIdInvalidError as e: 1064 # wrong credentials 1065 raise QueryParametersException("Your API credentials are invalid.") 1066 except PhoneNumberInvalidError as e: 1067 # wrong phone number 1068 raise QueryParametersException( 1069 "The phone number provided is not a valid phone number for these credentials.") 1070 except RPCError as e: 1071 # only seen this with an 'UPDATE_APP_TO_LOGIN' status 1072 raise QueryParametersException(f"Could not verify your authentication. You may need to update your " 1073 f"Telegram app(s) to the latest version to proceed ({e}).") 1074 except Exception as e: 1075 # ? 1076 raise QueryParametersException( 1077 f"An unexpected error ({e}) occurred and your authentication could not be verified.") 1078 finally: 1079 if client: 1080 client.disconnect() 1081 1082 if needs_code: 1083 raise QueryNeedsFurtherInputException(config={ 1084 "code-info": { 1085 "type": UserInput.OPTION_INFO, 1086 "help": "Please enter the security code that was sent to your Telegram app to continue." 1087 }, 1088 "security-code": { 1089 "type": UserInput.OPTION_TEXT, 1090 "help": "Security code", 1091 "sensitive": True 1092 }}) 1093 1094 # simple! 1095 return { 1096 "items": num_items, 1097 "query": ",".join(sanitized_items), 1098 "api_id": query.get("api_id"), 1099 "api_hash": query.get("api_hash"), 1100 "api_phone": query.get("api_phone"), 1101 "save-session": query.get("save-session"), 1102 "resolve-entities": query.get("resolve-entities"), 1103 "min_date": min_date, 1104 "max_date": max_date, 1105 "crawl-depth": query.get("crawl-depth"), 1106 "crawl-threshold": query.get("crawl-threshold"), 1107 "crawl-via-links": query.get("crawl-via-links") 1108 }
Validate Telegram query
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
1110 @staticmethod 1111 def create_session_id(api_phone, api_id, api_hash): 1112 """ 1113 Generate a filename for the session file 1114 1115 This is a combination of phone number and API credentials, but hashed 1116 so that one cannot actually derive someone's phone number from it. 1117 1118 :param str api_phone: Phone number for API ID 1119 :param int api_id: Telegram API ID 1120 :param str api_hash: Telegram API Hash 1121 :return str: A hash value derived from the input 1122 """ 1123 hash_base = api_phone.strip().replace("+", "") + str(api_id).strip() + api_hash.strip() 1124 return hashlib.blake2b(hash_base.encode("ascii")).hexdigest()
Generate a filename for the session file
This is a combination of phone number and API credentials, but hashed so that one cannot actually derive someone's phone number from it.
Parameters
- str api_phone: Phone number for API ID
- int api_id: Telegram API ID
- str api_hash: Telegram API Hash
Returns
A hash value derived from the input
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor