datasources.twitterv2.search_twitter
X/Twitter keyword search via the X API v2
1""" 2X/Twitter keyword search via the X API v2 3""" 4import requests 5import datetime 6import copy 7import time 8import json 9import re 10 11from backend.lib.search import Search 12from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, QueryNeedsExplicitConfirmationException 13from common.lib.helpers import convert_to_int, UserInput, timify_long 14from common.config_manager import config 15from common.lib.item_mapping import MappedItem, MissingMappedField 16 17 18class SearchWithTwitterAPIv2(Search): 19 """ 20 Get Tweets via the X API 21 """ 22 type = "twitterv2-search" # job ID 23 title = "X/Twitter API (v2)" 24 extension = "ndjson" 25 is_local = False # Whether this datasource is locally scraped 26 is_static = False # Whether this datasource is still updated 27 28 previous_request = 0 29 import_issues = True 30 31 references = [ 32 "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)" 33 ] 34 35 config = { 36 "twitterv2-search.academic_api_key": { 37 "type": UserInput.OPTION_TEXT, 38 "default": "", 39 "help": "Research API Key", 40 "tooltip": "An API key for the X/Twitter v2 Research API. If " 41 "provided, the user will not need to enter their own " 42 "key to retrieve tweets. Note that this API key should " 43 "have access to the Full Archive Search endpoint." 44 }, 45 "twitterv2-search.max_tweets": { 46 "type": UserInput.OPTION_TEXT, 47 "default": 0, 48 "min": 0, 49 "max": 10_000_000, 50 "help": "Max posts per dataset", 51 "tooltip": "4CAT will never retrieve more than this amount of " 52 "posts per dataset. Enter '0' for unlimited posts." 53 }, 54 "twitterv2-search.id_lookup": { 55 "type": UserInput.OPTION_TOGGLE, 56 "default": False, 57 "help": "Allow lookup by ID", 58 "tooltip": "If enabled, allow users to enter a list of post IDs " 59 "to retrieve. This is disabled by default because it " 60 "can be confusing to novice users." 61 } 62 } 63 64 def get_items(self, query): 65 """ 66 Use the Twitter v2 API historical search to get tweets 67 68 :param query: 69 :return: 70 """ 71 # Compile any errors to highlight at end of log 72 error_report = [] 73 # this is pretty sensitive so delete it immediately after storing in 74 # memory 75 have_api_key = self.config.get("twitterv2-search.academic_api_key") 76 bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key 77 api_type = query.get("api_type", "all") if not have_api_key else "all" 78 auth = {"Authorization": "Bearer %s" % bearer_token} 79 expected_tweets = query.get("expected-tweets", "unknown") 80 81 # these are all expansions and fields available at the time of writing 82 # since it does not cost anything extra in terms of rate limiting, go 83 # for as much data per tweet as possible... 84 tweet_fields = ( 85 "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id", 86 "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings", 87 "source", "text", "withheld") 88 user_fields = ( 89 "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url", 90 "protected", "public_metrics", "url", "username", "verified", "withheld") 91 place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type") 92 poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status") 93 expansions = ( 94 "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id", 95 "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id") 96 media_fields = ( 97 "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants", 98 "alt_text") 99 100 params = { 101 "expansions": ",".join(expansions), 102 "tweet.fields": ",".join(tweet_fields), 103 "user.fields": ",".join(user_fields), 104 "poll.fields": ",".join(poll_fields), 105 "place.fields": ",".join(place_fields), 106 "media.fields": ",".join(media_fields), 107 } 108 109 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 110 endpoint = "https://api.x.com/2/tweets" 111 112 tweet_ids = self.parameters.get("query", []).split(',') 113 114 # Only can lookup 100 tweets in each query per Twitter API 115 chunk_size = 100 116 queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)] 117 expected_tweets = len(tweet_ids) 118 119 amount = len(tweet_ids) 120 121 # Initiate collection of any IDs that are unavailable 122 collected_errors = [] 123 124 else: 125 # Query to all or search 126 endpoint = "https://api.x.com/2/tweets/search/" + api_type 127 128 queries = [self.parameters.get("query", "")] 129 130 amount = convert_to_int(self.parameters.get("amount"), 10) 131 132 params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100 # 100 = upper limit, 10 = lower 133 134 if self.parameters.get("min_date"): 135 params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime( 136 "%Y-%m-%dT%H:%M:%SZ") 137 138 if self.parameters.get("max_date"): 139 params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime( 140 "%Y-%m-%dT%H:%M:%SZ") 141 142 if type(expected_tweets) is int: 143 num_expected_tweets = expected_tweets 144 expected_tweets = "{:,}".format(expected_tweets) 145 else: 146 num_expected_tweets = None 147 148 tweets = 0 149 for query in queries: 150 if self.parameters.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"): 151 params['ids'] = query 152 else: 153 params['query'] = query 154 self.dataset.log("Search parameters: %s" % repr(params)) 155 while True: 156 157 if self.interrupted: 158 raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API") 159 160 # there is a limit of one request per second, so stay on the safe side of this 161 while self.previous_request == int(time.time()): 162 time.sleep(0.1) 163 time.sleep(0.05) 164 self.previous_request = int(time.time()) 165 166 # now send the request, allowing for at least 5 retries if the connection seems unstable 167 retries = 5 168 api_response = None 169 while retries > 0: 170 try: 171 api_response = requests.get(endpoint, headers=auth, params=params, timeout=30) 172 break 173 except (ConnectionError, requests.exceptions.RequestException) as e: 174 retries -= 1 175 wait_time = (5 - retries) * 10 176 self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time)) 177 time.sleep(wait_time) 178 179 # rate limited - the limit at time of writing is 300 reqs per 15 180 # minutes 181 # usually you don't hit this when requesting batches of 500 at 182 # 1/second, but this is also returned when the user reaches the 183 # monthly tweet cap, albeit with different content in that case 184 if api_response.status_code == 429: 185 try: 186 structured_response = api_response.json() 187 if structured_response.get("title") == "UsageCapExceeded": 188 self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts " 189 "until your API quota resets. Dataset completed with posts " 190 "collected so far.", is_final=True) 191 return 192 except (json.JSONDecodeError, ValueError): 193 self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting " 194 "post collection.", is_final=True) 195 return 196 197 resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1 198 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 199 self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str) 200 while time.time() <= resume_at: 201 if self.interrupted: 202 raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset") 203 time.sleep(0.5) 204 continue 205 206 # API keys that are valid but don't have access or haven't been 207 # activated properly get a 403 208 elif api_response.status_code == 403: 209 try: 210 structured_response = api_response.json() 211 self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API " 212 "with this API key. %s" % structured_response.get("detail", ""), is_final=True) 213 except (json.JSONDecodeError, ValueError): 214 self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to " 215 "the full-archive search endpoint.", is_final=True) 216 finally: 217 return 218 219 # sometimes twitter says '503 service unavailable' for unclear 220 # reasons - in that case just wait a while and try again 221 elif api_response.status_code in (502, 503, 504): 222 resume_at = time.time() + 60 223 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 224 self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % ( 225 api_response.status_code, resume_at_str)) 226 while time.time() <= resume_at: 227 time.sleep(0.5) 228 continue 229 230 # this usually means the query is too long or otherwise contains 231 # a syntax error 232 elif api_response.status_code == 400: 233 msg = "Response %i from the X API; " % api_response.status_code 234 try: 235 api_response = api_response.json() 236 msg += api_response.get("title", "") 237 if "detail" in api_response: 238 msg += ": " + api_response.get("detail", "") 239 except (json.JSONDecodeError, TypeError): 240 msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long." 241 242 self.dataset.update_status(msg, is_final=True) 243 return 244 245 # invalid API key 246 elif api_response.status_code == 401: 247 self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True) 248 return 249 250 # haven't seen one yet, but they probably exist 251 elif api_response.status_code != 200: 252 self.dataset.update_status( 253 "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True) 254 self.log.warning("X API v2 responded with status code %i. Response body: %s" % ( 255 api_response.status_code, api_response.text)) 256 return 257 258 elif not api_response: 259 self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True) 260 return 261 262 api_response = api_response.json() 263 264 # The API response contains tweets (of course) and 'includes', 265 # objects that can be referenced in tweets. Later we will splice 266 # this data into the tweets themselves to make them easier to 267 # process. So extract them first... 268 included_users = api_response.get("includes", {}).get("users", {}) 269 included_media = api_response.get("includes", {}).get("media", {}) 270 included_polls = api_response.get("includes", {}).get("polls", {}) 271 included_tweets = api_response.get("includes", {}).get("tweets", {}) 272 included_places = api_response.get("includes", {}).get("places", {}) 273 274 # Collect missing objects from Twitter API response by type 275 missing_objects = {} 276 for missing_object in api_response.get("errors", {}): 277 parameter_type = missing_object.get('resource_type', 'unknown') 278 if parameter_type in missing_objects: 279 missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object 280 else: 281 missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object} 282 num_missing_objects = sum([len(v) for v in missing_objects.values()]) 283 284 # Record any missing objects in log 285 if num_missing_objects > 0: 286 # Log amount 287 self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 288 if num_missing_objects > 50: 289 # Large amount of missing objects; possible error with Twitter API 290 self.import_issues = False 291 error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets)) 292 error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 293 294 # Warn if new missing object is recorded (for developers to handle) 295 expected_error_types = ['user', 'media', 'poll', 'tweet', 'place'] 296 if any(key not in expected_error_types for key in missing_objects.keys()): 297 self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types])) 298 299 # Loop through and collect tweets 300 for tweet in api_response.get("data", []): 301 302 if 0 < amount <= tweets: 303 break 304 305 # splice referenced data back in 306 # we use copy.deepcopy here because else we run into a 307 # pass-by-reference quagmire 308 tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects) 309 310 tweets += 1 311 if tweets % 500 == 0: 312 self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets)) 313 if num_expected_tweets is not None: 314 self.dataset.update_progress(tweets / num_expected_tweets) 315 316 yield tweet 317 318 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 319 # If id_lookup return errors in collecting tweets 320 for tweet_error in api_response.get("errors", []): 321 tweet_id = str(tweet_error.get('resource_id')) 322 if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors: 323 tweet_error = self.fix_tweet_error(tweet_error) 324 collected_errors.append(tweet_id) 325 yield tweet_error 326 327 # paginate 328 if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]: 329 params["next_token"] = api_response["meta"]["next_token"] 330 else: 331 break 332 333 if not self.import_issues: 334 self.dataset.log('Error Report:\n' + '\n'.join(error_report)) 335 self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True) 336 337 def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects): 338 """ 339 Enrich tweet with user and attachment metadata 340 341 Twitter API returns some of the tweet's metadata separately, as 342 'includes' that can be cross-referenced with a user ID or media key. 343 This makes sense to conserve bandwidth, but also means tweets are not 344 'standalone' objects as originally returned. 345 346 However, for processing, making them standalone greatly reduces 347 complexity, as we can simply read one single tweet object and process 348 that data without worrying about having to get other data from 349 elsewhere. So this method takes the metadata and the original tweet, 350 splices the metadata into it where appropriate, and returns the 351 enriched object. 352 353 /!\ This is not an efficient way to store things /!\ but it is more 354 convenient. 355 356 :param dict tweet: The tweet object 357 :param list users: User metadata, as a list of user objects 358 :param list media: Media metadata, as a list of media objects 359 :param list polls: Poll metadata, as a list of poll objects 360 :param list places: Place metadata, as a list of place objects 361 :param list referenced_tweets: Tweets referenced in the tweet, as a 362 list of tweet objects. These will be enriched in turn. 363 :param dict missing_objects: Dictionary with data on missing objects 364 from the API by type. 365 366 :return dict: Enriched tweet object 367 """ 368 # Copy the tweet so that updating this tweet has no effect on others 369 tweet = copy.deepcopy(tweet) 370 # first create temporary mappings so we can easily find the relevant 371 # object later 372 users_by_id = {user["id"]: user for user in users} 373 users_by_name = {user["username"]: user for user in users} 374 media_by_key = {item["media_key"]: item for item in media} 375 polls_by_id = {poll["id"]: poll for poll in polls} 376 places_by_id = {place["id"]: place for place in places} 377 tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets} 378 379 # add tweet author metadata 380 tweet["author_user"] = users_by_id.get(tweet["author_id"]) 381 382 # add place to geo metadata 383 # referenced_tweets also contain place_id, but these places may not included in the place objects 384 if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id: 385 tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id")) 386 elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}): 387 tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {}) 388 389 # add user metadata for mentioned users 390 for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])): 391 if mention["username"] in users_by_name: 392 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])} 393 # missing users can be stored by either user ID or Username in Twitter API's error data; we check both 394 elif mention["username"] in missing_objects.get('user', {}): 395 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}} 396 elif mention["id"] in missing_objects.get('user', {}): 397 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}} 398 399 400 # add poll metadata 401 for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])): 402 if poll_id in polls_by_id: 403 tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id] 404 elif poll_id in missing_objects.get('poll', {}): 405 tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]} 406 407 # add media metadata - seems to be just the media type, the media URL 408 # etc is stored in the 'entities' attribute instead 409 for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])): 410 if media_key in media_by_key: 411 tweet["attachments"]["media_keys"][index] = media_by_key[media_key] 412 elif media_key in missing_objects.get('media', {}): 413 tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]} 414 415 # replied-to user metadata 416 if "in_reply_to_user_id" in tweet: 417 if tweet["in_reply_to_user_id"] in users_by_id: 418 tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]] 419 elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}): 420 tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]} 421 422 # enrich referenced tweets. Even though there should be no recursion - 423 # since tweets cannot be edited - we do not recursively enrich 424 # referenced tweets (should we?) 425 for index, reference in enumerate(tweet.get("referenced_tweets", [])): 426 if reference["id"] in tweets_by_id: 427 tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)} 428 elif reference["id"] in missing_objects.get('tweet', {}): 429 tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}} 430 431 return tweet 432 433 def fix_tweet_error(self, tweet_error): 434 """ 435 Add fields as needed by map_tweet and other functions for errors as they 436 do not conform to normal tweet fields. Specifically for ID Lookup as 437 complete tweet could be missing. 438 439 :param dict tweet_error: Tweet error object from the Twitter API 440 :return dict: A tweet object with the relevant fields sanitised 441 """ 442 modified_tweet = tweet_error 443 modified_tweet['id'] = tweet_error.get('resource_id') 444 modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z") 445 modified_tweet['text'] = '' 446 modified_tweet['author_user'] = {} 447 modified_tweet['author_user']['name'] = 'UNKNOWN' 448 modified_tweet['author_user']['username'] = 'UNKNOWN' 449 modified_tweet['author_id'] = 'UNKNOWN' 450 modified_tweet['public_metrics'] = {} 451 452 # putting detail info in 'subject' field which is normally blank for tweets 453 modified_tweet['subject'] = tweet_error.get('detail') 454 455 return modified_tweet 456 457 @classmethod 458 def get_options(cls, parent_dataset=None, user=None): 459 """ 460 Get Twitter data source options 461 462 These are somewhat dynamic, because depending on settings users may or 463 may not need to provide their own API key, and may or may not be able 464 to enter a list of tweet IDs as their query. Hence the method. 465 466 :param parent_dataset: Should always be None 467 :param user: User to provide options for 468 :return dict: Data source options 469 """ 470 have_api_key = config.get("twitterv2-search.academic_api_key", user=user) 471 max_tweets = config.get("twitterv2-search.max_tweets", user=user) 472 473 if have_api_key: 474 intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve " 475 "historic tweets that match a given query.") 476 477 else: 478 intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the " 479 "it, you must have access to the Research track of the X API. You will need to provide a " 480 "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The " 481 "bearer token **will be sent to the 4CAT server**, where it will be deleted after data " 482 "collection has started. Note that any posts retrieved with 4CAT will count towards your " 483 "monthly post retrieval cap.") 484 485 intro_text += ("\n\nPlease refer to the [X API documentation](" 486 "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) " 487 "documentation for more information about this API endpoint and the syntax you can use in your " 488 "search query. Retweets are included by default; add `-is:retweet` to exclude them.") 489 490 options = { 491 "intro-1": { 492 "type": UserInput.OPTION_INFO, 493 "help": intro_text 494 }, 495 } 496 497 if not have_api_key: 498 # options.update({ 499 # "api_type": { 500 # "type": UserInput.OPTION_CHOICE, 501 # "help": "API track", 502 # "options": { 503 # "all": "Research API: Full-archive search", 504 # "recent": "Standard: Recent search (Tweets published in last 7 days)", 505 # }, 506 # "default": "all" 507 # } 508 # }) 509 options.update({ 510 "api_bearer_token": { 511 "type": UserInput.OPTION_TEXT, 512 "sensitive": True, 513 "cache": True, 514 "help": "API Bearer Token" 515 }, 516 }) 517 518 if config.get("twitterv2.id_lookup", user=user): 519 options.update({ 520 "query_type": { 521 "type": UserInput.OPTION_CHOICE, 522 "help": "Query type", 523 "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup", 524 "options": { 525 "query": "Search query", 526 "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)", 527 }, 528 "default": "query" 529 } 530 }) 531 532 options.update({ 533 "query": { 534 "type": UserInput.OPTION_TEXT_LARGE, 535 "help": "Query" 536 }, 537 "amount": { 538 "type": UserInput.OPTION_TEXT, 539 "help": "Posts to retrieve", 540 "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)), 541 "min": 0, 542 "max": max_tweets if max_tweets else 10_000_000, 543 "default": 10 544 }, 545 "divider-2": { 546 "type": UserInput.OPTION_DIVIDER 547 }, 548 "daterange-info": { 549 "type": UserInput.OPTION_INFO, 550 "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you " 551 "need to explicitly set a date range." 552 }, 553 "daterange": { 554 "type": UserInput.OPTION_DATERANGE, 555 "help": "Date range" 556 }, 557 }) 558 559 return options 560 561 @staticmethod 562 def validate_query(query, request, user): 563 """ 564 Validate input for a dataset query on the Twitter data source. 565 566 Will raise a QueryParametersException if invalid parameters are 567 encountered. Parameters are additionally sanitised. 568 569 Will also raise a QueryNeedsExplicitConfirmation if the 'counts' 570 endpoint of the Twitter API indicates that it will take more than 571 30 minutes to collect the dataset. In the front-end, this will 572 trigger a warning and confirmation request. 573 574 :param dict query: Query parameters, from client-side. 575 :param request: Flask request 576 :param User user: User object of user who has submitted the query 577 :return dict: Safe query parameters 578 """ 579 have_api_key = config.get("twitterv2-search.academic_api_key", user=user) 580 max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000, user=user) 581 582 # this is the bare minimum, else we can't narrow down the full data set 583 if not query.get("query", None): 584 raise QueryParametersException("Please provide a query.") 585 586 if not have_api_key: 587 if not query.get("api_bearer_token", None): 588 raise QueryParametersException("Please provide a valid bearer token.") 589 590 if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup": 591 raise QueryParametersException("X API queries cannot be longer than 1024 characters.") 592 593 if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup", user=user): 594 # reformat queries to be a comma-separated list with no wrapping 595 # whitespace 596 whitespace = re.compile(r"\s+") 597 items = whitespace.sub("", query.get("query").replace("\n", ",")) 598 # eliminate empty queries 599 twitter_query = ','.join([item for item in items.split(",") if item]) 600 else: 601 twitter_query = query.get("query") 602 603 # the dates need to make sense as a range to search within 604 # but, on Twitter, you can also specify before *or* after only 605 after, before = query.get("daterange") 606 if before and after and before < after: 607 raise QueryParametersException("Date range must start before it ends") 608 609 # if we made it this far, the query can be executed 610 params = { 611 "query": twitter_query, 612 "api_bearer_token": query.get("api_bearer_token"), 613 "api_type": query.get("api_type", "all"), 614 "query_type": query.get("query_type", "query"), 615 "min_date": after, 616 "max_date": before 617 } 618 619 # never query more tweets than allowed 620 tweets_to_collect = convert_to_int(query.get("amount"), 10) 621 622 if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0): 623 tweets_to_collect = max_tweets 624 params["amount"] = tweets_to_collect 625 626 # figure out how many tweets we expect to get back - we can use this 627 # to dissuade users from running huge queries that will take forever 628 # to process 629 if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key): 630 count_url = "https://api.x.com/2/tweets/counts/all" 631 count_params = { 632 "granularity": "day", 633 "query": params["query"], 634 } 635 636 # if we're doing a date range, pass this on to the counts endpoint in 637 # the right format 638 if after: 639 count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ") 640 641 if before: 642 count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ") 643 644 bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key 645 646 expected_tweets = 0 647 while True: 648 response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token}, 649 timeout=15) 650 if response.status_code == 200: 651 try: 652 # figure out how many tweets there are and estimate how much 653 # time it will take to process them. if it's going to take 654 # longer than half an hour, warn the user 655 expected_tweets += int(response.json()["meta"]["total_tweet_count"]) 656 except KeyError: 657 # no harm done, we just don't know how many tweets will be 658 # returned (but they will still be returned) 659 break 660 661 if "next_token" not in response.json().get("meta", {}): 662 break 663 else: 664 count_params["next_token"] = response.json()["meta"]["next_token"] 665 666 elif response.status_code == 401: 667 raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid " 668 "for the Research track of the X API.") 669 670 elif response.status_code == 400: 671 raise QueryParametersException("Your query is invalid. Please make sure the date range does not " 672 "extend into the future, or to before Twitter's founding, and that " 673 "your query is shorter than 1024 characters. Using AND in the query " 674 "is not possible (AND is implied; OR can be used). Use \"and\" to " 675 "search for the literal word.") 676 677 else: 678 # we can still continue without the expected tweets 679 break 680 681 warning = "" 682 if expected_tweets: 683 collectible_tweets = min(max_tweets, params["amount"]) 684 if collectible_tweets == 0: 685 collectible_tweets = max_tweets 686 687 if collectible_tweets > 0: 688 if collectible_tweets < expected_tweets: 689 warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets) 690 real_expected_tweets = min(expected_tweets, collectible_tweets) 691 else: 692 real_expected_tweets = expected_tweets 693 694 expected_seconds = int(real_expected_tweets / 30) # seems to be about this 695 expected_time = timify_long(expected_seconds) 696 params["expected-tweets"] = expected_tweets 697 698 if expected_seconds > 900: 699 warning += ". Collection will take approximately %s." % expected_time 700 701 if warning and not query.get("frontend-confirm"): 702 warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning) 703 warning += " Do you want to continue?" 704 raise QueryNeedsExplicitConfirmationException(warning) 705 706 params["amount"] = min(params["amount"], expected_tweets) 707 if max_tweets: 708 params["amount"] = min(max_tweets, params["amount"]) 709 710 return params 711 712 @staticmethod 713 def map_item(item): 714 """ 715 Map a nested Tweet object to a flat dictionary 716 717 Tweet objects are quite rich but 4CAT expects flat dictionaries per 718 item in many cases. Since it would be a shame to not store the data 719 retrieved from Twitter that cannot be stored in a flat file, we store 720 the full objects and only map them to a flat dictionary when needed. 721 This has a speed (and disk space) penalty, but makes sure we don't 722 throw away valuable data and allows for later changes that e.g. store 723 the tweets more efficiently as a MongoDB collection. 724 725 :param item: Tweet object as originally returned by the Twitter API 726 :return dict: Dictionary in the format expected by 4CAT 727 """ 728 tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z") 729 730 # For backward compatibility 731 author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"] 732 author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"] 733 author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else "" 734 735 hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])] 736 mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])] 737 urls = [tag.get("expanded_url", tag["display_url"]) for tag in item.get("entities", {}).get("urls", [])] 738 images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 739 video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 740 741 # by default, the text of retweets is returned as "RT [excerpt of 742 # retweeted tweet]". Since we have the full tweet text, we can complete 743 # the excerpt: 744 is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])]) 745 if is_retweet: 746 retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0] 747 if retweeted_tweet.get("text", False): 748 retweeted_body = retweeted_tweet.get("text") 749 # Get user's username that was retweeted 750 if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'): 751 item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body 752 elif item.get('entities', {}).get('mentions', []): 753 # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted 754 retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')] 755 if retweeting_users: 756 # should only ever be one, but this verifies that there IS one and not NONE 757 item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body 758 759 retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched 760 761 # Retweet entities are only included in the retweet if they occur in the first 140 characters 762 # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense 763 [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])] 764 [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])] 765 [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", [])] 766 # Images appear to be inheritted by retweets, but just in case 767 [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 768 [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 769 770 is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])]) 771 is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])]) 772 773 videos = [] 774 for video in video_items: 775 variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True) 776 if variants: 777 videos.append(variants[0].get('url')) 778 779 expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"} 780 public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics} 781 missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]] 782 warning = "" 783 if missing_metrics: 784 warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports." 785 786 return MappedItem({ 787 "id": item["id"], 788 "thread_id": item.get("conversation_id", item["id"]), 789 "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"), 790 "unix_timestamp": int(tweet_time.timestamp()), 791 'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')), 792 "subject": "", 793 "body": item["text"], 794 "author": author_username, 795 "author_fullname": author_fullname, 796 "author_id": item["author_id"], 797 "author_followers": author_followers, 798 "source": item.get("source"), 799 "language_guess": item.get("lang"), 800 "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no", 801 **public_metrics, 802 "is_retweet": "yes" if is_retweet else "no", 803 "retweeted_user": "" if not is_retweet else retweeted_user, 804 "is_quote_tweet": "yes" if is_quoted else "no", 805 "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""), 806 "is_reply": "yes" if is_reply else "no", 807 "replied_user": item.get("in_reply_to_user", {}).get("username", ""), 808 "hashtags": ','.join(set(hashtags)), 809 "urls": ','.join(set(urls)), 810 "images": ','.join(set(images)), 811 "videos": ','.join(set(videos)), 812 "mentions": ','.join(set(mentions)), 813 "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]), 814 'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''), 815 }, message=warning)
19class SearchWithTwitterAPIv2(Search): 20 """ 21 Get Tweets via the X API 22 """ 23 type = "twitterv2-search" # job ID 24 title = "X/Twitter API (v2)" 25 extension = "ndjson" 26 is_local = False # Whether this datasource is locally scraped 27 is_static = False # Whether this datasource is still updated 28 29 previous_request = 0 30 import_issues = True 31 32 references = [ 33 "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)" 34 ] 35 36 config = { 37 "twitterv2-search.academic_api_key": { 38 "type": UserInput.OPTION_TEXT, 39 "default": "", 40 "help": "Research API Key", 41 "tooltip": "An API key for the X/Twitter v2 Research API. If " 42 "provided, the user will not need to enter their own " 43 "key to retrieve tweets. Note that this API key should " 44 "have access to the Full Archive Search endpoint." 45 }, 46 "twitterv2-search.max_tweets": { 47 "type": UserInput.OPTION_TEXT, 48 "default": 0, 49 "min": 0, 50 "max": 10_000_000, 51 "help": "Max posts per dataset", 52 "tooltip": "4CAT will never retrieve more than this amount of " 53 "posts per dataset. Enter '0' for unlimited posts." 54 }, 55 "twitterv2-search.id_lookup": { 56 "type": UserInput.OPTION_TOGGLE, 57 "default": False, 58 "help": "Allow lookup by ID", 59 "tooltip": "If enabled, allow users to enter a list of post IDs " 60 "to retrieve. This is disabled by default because it " 61 "can be confusing to novice users." 62 } 63 } 64 65 def get_items(self, query): 66 """ 67 Use the Twitter v2 API historical search to get tweets 68 69 :param query: 70 :return: 71 """ 72 # Compile any errors to highlight at end of log 73 error_report = [] 74 # this is pretty sensitive so delete it immediately after storing in 75 # memory 76 have_api_key = self.config.get("twitterv2-search.academic_api_key") 77 bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key 78 api_type = query.get("api_type", "all") if not have_api_key else "all" 79 auth = {"Authorization": "Bearer %s" % bearer_token} 80 expected_tweets = query.get("expected-tweets", "unknown") 81 82 # these are all expansions and fields available at the time of writing 83 # since it does not cost anything extra in terms of rate limiting, go 84 # for as much data per tweet as possible... 85 tweet_fields = ( 86 "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id", 87 "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings", 88 "source", "text", "withheld") 89 user_fields = ( 90 "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url", 91 "protected", "public_metrics", "url", "username", "verified", "withheld") 92 place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type") 93 poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status") 94 expansions = ( 95 "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id", 96 "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id") 97 media_fields = ( 98 "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants", 99 "alt_text") 100 101 params = { 102 "expansions": ",".join(expansions), 103 "tweet.fields": ",".join(tweet_fields), 104 "user.fields": ",".join(user_fields), 105 "poll.fields": ",".join(poll_fields), 106 "place.fields": ",".join(place_fields), 107 "media.fields": ",".join(media_fields), 108 } 109 110 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 111 endpoint = "https://api.x.com/2/tweets" 112 113 tweet_ids = self.parameters.get("query", []).split(',') 114 115 # Only can lookup 100 tweets in each query per Twitter API 116 chunk_size = 100 117 queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)] 118 expected_tweets = len(tweet_ids) 119 120 amount = len(tweet_ids) 121 122 # Initiate collection of any IDs that are unavailable 123 collected_errors = [] 124 125 else: 126 # Query to all or search 127 endpoint = "https://api.x.com/2/tweets/search/" + api_type 128 129 queries = [self.parameters.get("query", "")] 130 131 amount = convert_to_int(self.parameters.get("amount"), 10) 132 133 params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100 # 100 = upper limit, 10 = lower 134 135 if self.parameters.get("min_date"): 136 params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime( 137 "%Y-%m-%dT%H:%M:%SZ") 138 139 if self.parameters.get("max_date"): 140 params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime( 141 "%Y-%m-%dT%H:%M:%SZ") 142 143 if type(expected_tweets) is int: 144 num_expected_tweets = expected_tweets 145 expected_tweets = "{:,}".format(expected_tweets) 146 else: 147 num_expected_tweets = None 148 149 tweets = 0 150 for query in queries: 151 if self.parameters.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"): 152 params['ids'] = query 153 else: 154 params['query'] = query 155 self.dataset.log("Search parameters: %s" % repr(params)) 156 while True: 157 158 if self.interrupted: 159 raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API") 160 161 # there is a limit of one request per second, so stay on the safe side of this 162 while self.previous_request == int(time.time()): 163 time.sleep(0.1) 164 time.sleep(0.05) 165 self.previous_request = int(time.time()) 166 167 # now send the request, allowing for at least 5 retries if the connection seems unstable 168 retries = 5 169 api_response = None 170 while retries > 0: 171 try: 172 api_response = requests.get(endpoint, headers=auth, params=params, timeout=30) 173 break 174 except (ConnectionError, requests.exceptions.RequestException) as e: 175 retries -= 1 176 wait_time = (5 - retries) * 10 177 self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time)) 178 time.sleep(wait_time) 179 180 # rate limited - the limit at time of writing is 300 reqs per 15 181 # minutes 182 # usually you don't hit this when requesting batches of 500 at 183 # 1/second, but this is also returned when the user reaches the 184 # monthly tweet cap, albeit with different content in that case 185 if api_response.status_code == 429: 186 try: 187 structured_response = api_response.json() 188 if structured_response.get("title") == "UsageCapExceeded": 189 self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts " 190 "until your API quota resets. Dataset completed with posts " 191 "collected so far.", is_final=True) 192 return 193 except (json.JSONDecodeError, ValueError): 194 self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting " 195 "post collection.", is_final=True) 196 return 197 198 resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1 199 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 200 self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str) 201 while time.time() <= resume_at: 202 if self.interrupted: 203 raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset") 204 time.sleep(0.5) 205 continue 206 207 # API keys that are valid but don't have access or haven't been 208 # activated properly get a 403 209 elif api_response.status_code == 403: 210 try: 211 structured_response = api_response.json() 212 self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API " 213 "with this API key. %s" % structured_response.get("detail", ""), is_final=True) 214 except (json.JSONDecodeError, ValueError): 215 self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to " 216 "the full-archive search endpoint.", is_final=True) 217 finally: 218 return 219 220 # sometimes twitter says '503 service unavailable' for unclear 221 # reasons - in that case just wait a while and try again 222 elif api_response.status_code in (502, 503, 504): 223 resume_at = time.time() + 60 224 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 225 self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % ( 226 api_response.status_code, resume_at_str)) 227 while time.time() <= resume_at: 228 time.sleep(0.5) 229 continue 230 231 # this usually means the query is too long or otherwise contains 232 # a syntax error 233 elif api_response.status_code == 400: 234 msg = "Response %i from the X API; " % api_response.status_code 235 try: 236 api_response = api_response.json() 237 msg += api_response.get("title", "") 238 if "detail" in api_response: 239 msg += ": " + api_response.get("detail", "") 240 except (json.JSONDecodeError, TypeError): 241 msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long." 242 243 self.dataset.update_status(msg, is_final=True) 244 return 245 246 # invalid API key 247 elif api_response.status_code == 401: 248 self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True) 249 return 250 251 # haven't seen one yet, but they probably exist 252 elif api_response.status_code != 200: 253 self.dataset.update_status( 254 "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True) 255 self.log.warning("X API v2 responded with status code %i. Response body: %s" % ( 256 api_response.status_code, api_response.text)) 257 return 258 259 elif not api_response: 260 self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True) 261 return 262 263 api_response = api_response.json() 264 265 # The API response contains tweets (of course) and 'includes', 266 # objects that can be referenced in tweets. Later we will splice 267 # this data into the tweets themselves to make them easier to 268 # process. So extract them first... 269 included_users = api_response.get("includes", {}).get("users", {}) 270 included_media = api_response.get("includes", {}).get("media", {}) 271 included_polls = api_response.get("includes", {}).get("polls", {}) 272 included_tweets = api_response.get("includes", {}).get("tweets", {}) 273 included_places = api_response.get("includes", {}).get("places", {}) 274 275 # Collect missing objects from Twitter API response by type 276 missing_objects = {} 277 for missing_object in api_response.get("errors", {}): 278 parameter_type = missing_object.get('resource_type', 'unknown') 279 if parameter_type in missing_objects: 280 missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object 281 else: 282 missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object} 283 num_missing_objects = sum([len(v) for v in missing_objects.values()]) 284 285 # Record any missing objects in log 286 if num_missing_objects > 0: 287 # Log amount 288 self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 289 if num_missing_objects > 50: 290 # Large amount of missing objects; possible error with Twitter API 291 self.import_issues = False 292 error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets)) 293 error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 294 295 # Warn if new missing object is recorded (for developers to handle) 296 expected_error_types = ['user', 'media', 'poll', 'tweet', 'place'] 297 if any(key not in expected_error_types for key in missing_objects.keys()): 298 self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types])) 299 300 # Loop through and collect tweets 301 for tweet in api_response.get("data", []): 302 303 if 0 < amount <= tweets: 304 break 305 306 # splice referenced data back in 307 # we use copy.deepcopy here because else we run into a 308 # pass-by-reference quagmire 309 tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects) 310 311 tweets += 1 312 if tweets % 500 == 0: 313 self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets)) 314 if num_expected_tweets is not None: 315 self.dataset.update_progress(tweets / num_expected_tweets) 316 317 yield tweet 318 319 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 320 # If id_lookup return errors in collecting tweets 321 for tweet_error in api_response.get("errors", []): 322 tweet_id = str(tweet_error.get('resource_id')) 323 if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors: 324 tweet_error = self.fix_tweet_error(tweet_error) 325 collected_errors.append(tweet_id) 326 yield tweet_error 327 328 # paginate 329 if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]: 330 params["next_token"] = api_response["meta"]["next_token"] 331 else: 332 break 333 334 if not self.import_issues: 335 self.dataset.log('Error Report:\n' + '\n'.join(error_report)) 336 self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True) 337 338 def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects): 339 """ 340 Enrich tweet with user and attachment metadata 341 342 Twitter API returns some of the tweet's metadata separately, as 343 'includes' that can be cross-referenced with a user ID or media key. 344 This makes sense to conserve bandwidth, but also means tweets are not 345 'standalone' objects as originally returned. 346 347 However, for processing, making them standalone greatly reduces 348 complexity, as we can simply read one single tweet object and process 349 that data without worrying about having to get other data from 350 elsewhere. So this method takes the metadata and the original tweet, 351 splices the metadata into it where appropriate, and returns the 352 enriched object. 353 354 /!\ This is not an efficient way to store things /!\ but it is more 355 convenient. 356 357 :param dict tweet: The tweet object 358 :param list users: User metadata, as a list of user objects 359 :param list media: Media metadata, as a list of media objects 360 :param list polls: Poll metadata, as a list of poll objects 361 :param list places: Place metadata, as a list of place objects 362 :param list referenced_tweets: Tweets referenced in the tweet, as a 363 list of tweet objects. These will be enriched in turn. 364 :param dict missing_objects: Dictionary with data on missing objects 365 from the API by type. 366 367 :return dict: Enriched tweet object 368 """ 369 # Copy the tweet so that updating this tweet has no effect on others 370 tweet = copy.deepcopy(tweet) 371 # first create temporary mappings so we can easily find the relevant 372 # object later 373 users_by_id = {user["id"]: user for user in users} 374 users_by_name = {user["username"]: user for user in users} 375 media_by_key = {item["media_key"]: item for item in media} 376 polls_by_id = {poll["id"]: poll for poll in polls} 377 places_by_id = {place["id"]: place for place in places} 378 tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets} 379 380 # add tweet author metadata 381 tweet["author_user"] = users_by_id.get(tweet["author_id"]) 382 383 # add place to geo metadata 384 # referenced_tweets also contain place_id, but these places may not included in the place objects 385 if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id: 386 tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id")) 387 elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}): 388 tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {}) 389 390 # add user metadata for mentioned users 391 for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])): 392 if mention["username"] in users_by_name: 393 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])} 394 # missing users can be stored by either user ID or Username in Twitter API's error data; we check both 395 elif mention["username"] in missing_objects.get('user', {}): 396 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}} 397 elif mention["id"] in missing_objects.get('user', {}): 398 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}} 399 400 401 # add poll metadata 402 for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])): 403 if poll_id in polls_by_id: 404 tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id] 405 elif poll_id in missing_objects.get('poll', {}): 406 tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]} 407 408 # add media metadata - seems to be just the media type, the media URL 409 # etc is stored in the 'entities' attribute instead 410 for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])): 411 if media_key in media_by_key: 412 tweet["attachments"]["media_keys"][index] = media_by_key[media_key] 413 elif media_key in missing_objects.get('media', {}): 414 tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]} 415 416 # replied-to user metadata 417 if "in_reply_to_user_id" in tweet: 418 if tweet["in_reply_to_user_id"] in users_by_id: 419 tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]] 420 elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}): 421 tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]} 422 423 # enrich referenced tweets. Even though there should be no recursion - 424 # since tweets cannot be edited - we do not recursively enrich 425 # referenced tweets (should we?) 426 for index, reference in enumerate(tweet.get("referenced_tweets", [])): 427 if reference["id"] in tweets_by_id: 428 tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)} 429 elif reference["id"] in missing_objects.get('tweet', {}): 430 tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}} 431 432 return tweet 433 434 def fix_tweet_error(self, tweet_error): 435 """ 436 Add fields as needed by map_tweet and other functions for errors as they 437 do not conform to normal tweet fields. Specifically for ID Lookup as 438 complete tweet could be missing. 439 440 :param dict tweet_error: Tweet error object from the Twitter API 441 :return dict: A tweet object with the relevant fields sanitised 442 """ 443 modified_tweet = tweet_error 444 modified_tweet['id'] = tweet_error.get('resource_id') 445 modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z") 446 modified_tweet['text'] = '' 447 modified_tweet['author_user'] = {} 448 modified_tweet['author_user']['name'] = 'UNKNOWN' 449 modified_tweet['author_user']['username'] = 'UNKNOWN' 450 modified_tweet['author_id'] = 'UNKNOWN' 451 modified_tweet['public_metrics'] = {} 452 453 # putting detail info in 'subject' field which is normally blank for tweets 454 modified_tweet['subject'] = tweet_error.get('detail') 455 456 return modified_tweet 457 458 @classmethod 459 def get_options(cls, parent_dataset=None, user=None): 460 """ 461 Get Twitter data source options 462 463 These are somewhat dynamic, because depending on settings users may or 464 may not need to provide their own API key, and may or may not be able 465 to enter a list of tweet IDs as their query. Hence the method. 466 467 :param parent_dataset: Should always be None 468 :param user: User to provide options for 469 :return dict: Data source options 470 """ 471 have_api_key = config.get("twitterv2-search.academic_api_key", user=user) 472 max_tweets = config.get("twitterv2-search.max_tweets", user=user) 473 474 if have_api_key: 475 intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve " 476 "historic tweets that match a given query.") 477 478 else: 479 intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the " 480 "it, you must have access to the Research track of the X API. You will need to provide a " 481 "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The " 482 "bearer token **will be sent to the 4CAT server**, where it will be deleted after data " 483 "collection has started. Note that any posts retrieved with 4CAT will count towards your " 484 "monthly post retrieval cap.") 485 486 intro_text += ("\n\nPlease refer to the [X API documentation](" 487 "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) " 488 "documentation for more information about this API endpoint and the syntax you can use in your " 489 "search query. Retweets are included by default; add `-is:retweet` to exclude them.") 490 491 options = { 492 "intro-1": { 493 "type": UserInput.OPTION_INFO, 494 "help": intro_text 495 }, 496 } 497 498 if not have_api_key: 499 # options.update({ 500 # "api_type": { 501 # "type": UserInput.OPTION_CHOICE, 502 # "help": "API track", 503 # "options": { 504 # "all": "Research API: Full-archive search", 505 # "recent": "Standard: Recent search (Tweets published in last 7 days)", 506 # }, 507 # "default": "all" 508 # } 509 # }) 510 options.update({ 511 "api_bearer_token": { 512 "type": UserInput.OPTION_TEXT, 513 "sensitive": True, 514 "cache": True, 515 "help": "API Bearer Token" 516 }, 517 }) 518 519 if config.get("twitterv2.id_lookup", user=user): 520 options.update({ 521 "query_type": { 522 "type": UserInput.OPTION_CHOICE, 523 "help": "Query type", 524 "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup", 525 "options": { 526 "query": "Search query", 527 "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)", 528 }, 529 "default": "query" 530 } 531 }) 532 533 options.update({ 534 "query": { 535 "type": UserInput.OPTION_TEXT_LARGE, 536 "help": "Query" 537 }, 538 "amount": { 539 "type": UserInput.OPTION_TEXT, 540 "help": "Posts to retrieve", 541 "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)), 542 "min": 0, 543 "max": max_tweets if max_tweets else 10_000_000, 544 "default": 10 545 }, 546 "divider-2": { 547 "type": UserInput.OPTION_DIVIDER 548 }, 549 "daterange-info": { 550 "type": UserInput.OPTION_INFO, 551 "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you " 552 "need to explicitly set a date range." 553 }, 554 "daterange": { 555 "type": UserInput.OPTION_DATERANGE, 556 "help": "Date range" 557 }, 558 }) 559 560 return options 561 562 @staticmethod 563 def validate_query(query, request, user): 564 """ 565 Validate input for a dataset query on the Twitter data source. 566 567 Will raise a QueryParametersException if invalid parameters are 568 encountered. Parameters are additionally sanitised. 569 570 Will also raise a QueryNeedsExplicitConfirmation if the 'counts' 571 endpoint of the Twitter API indicates that it will take more than 572 30 minutes to collect the dataset. In the front-end, this will 573 trigger a warning and confirmation request. 574 575 :param dict query: Query parameters, from client-side. 576 :param request: Flask request 577 :param User user: User object of user who has submitted the query 578 :return dict: Safe query parameters 579 """ 580 have_api_key = config.get("twitterv2-search.academic_api_key", user=user) 581 max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000, user=user) 582 583 # this is the bare minimum, else we can't narrow down the full data set 584 if not query.get("query", None): 585 raise QueryParametersException("Please provide a query.") 586 587 if not have_api_key: 588 if not query.get("api_bearer_token", None): 589 raise QueryParametersException("Please provide a valid bearer token.") 590 591 if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup": 592 raise QueryParametersException("X API queries cannot be longer than 1024 characters.") 593 594 if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup", user=user): 595 # reformat queries to be a comma-separated list with no wrapping 596 # whitespace 597 whitespace = re.compile(r"\s+") 598 items = whitespace.sub("", query.get("query").replace("\n", ",")) 599 # eliminate empty queries 600 twitter_query = ','.join([item for item in items.split(",") if item]) 601 else: 602 twitter_query = query.get("query") 603 604 # the dates need to make sense as a range to search within 605 # but, on Twitter, you can also specify before *or* after only 606 after, before = query.get("daterange") 607 if before and after and before < after: 608 raise QueryParametersException("Date range must start before it ends") 609 610 # if we made it this far, the query can be executed 611 params = { 612 "query": twitter_query, 613 "api_bearer_token": query.get("api_bearer_token"), 614 "api_type": query.get("api_type", "all"), 615 "query_type": query.get("query_type", "query"), 616 "min_date": after, 617 "max_date": before 618 } 619 620 # never query more tweets than allowed 621 tweets_to_collect = convert_to_int(query.get("amount"), 10) 622 623 if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0): 624 tweets_to_collect = max_tweets 625 params["amount"] = tweets_to_collect 626 627 # figure out how many tweets we expect to get back - we can use this 628 # to dissuade users from running huge queries that will take forever 629 # to process 630 if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key): 631 count_url = "https://api.x.com/2/tweets/counts/all" 632 count_params = { 633 "granularity": "day", 634 "query": params["query"], 635 } 636 637 # if we're doing a date range, pass this on to the counts endpoint in 638 # the right format 639 if after: 640 count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ") 641 642 if before: 643 count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ") 644 645 bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key 646 647 expected_tweets = 0 648 while True: 649 response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token}, 650 timeout=15) 651 if response.status_code == 200: 652 try: 653 # figure out how many tweets there are and estimate how much 654 # time it will take to process them. if it's going to take 655 # longer than half an hour, warn the user 656 expected_tweets += int(response.json()["meta"]["total_tweet_count"]) 657 except KeyError: 658 # no harm done, we just don't know how many tweets will be 659 # returned (but they will still be returned) 660 break 661 662 if "next_token" not in response.json().get("meta", {}): 663 break 664 else: 665 count_params["next_token"] = response.json()["meta"]["next_token"] 666 667 elif response.status_code == 401: 668 raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid " 669 "for the Research track of the X API.") 670 671 elif response.status_code == 400: 672 raise QueryParametersException("Your query is invalid. Please make sure the date range does not " 673 "extend into the future, or to before Twitter's founding, and that " 674 "your query is shorter than 1024 characters. Using AND in the query " 675 "is not possible (AND is implied; OR can be used). Use \"and\" to " 676 "search for the literal word.") 677 678 else: 679 # we can still continue without the expected tweets 680 break 681 682 warning = "" 683 if expected_tweets: 684 collectible_tweets = min(max_tweets, params["amount"]) 685 if collectible_tweets == 0: 686 collectible_tweets = max_tweets 687 688 if collectible_tweets > 0: 689 if collectible_tweets < expected_tweets: 690 warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets) 691 real_expected_tweets = min(expected_tweets, collectible_tweets) 692 else: 693 real_expected_tweets = expected_tweets 694 695 expected_seconds = int(real_expected_tweets / 30) # seems to be about this 696 expected_time = timify_long(expected_seconds) 697 params["expected-tweets"] = expected_tweets 698 699 if expected_seconds > 900: 700 warning += ". Collection will take approximately %s." % expected_time 701 702 if warning and not query.get("frontend-confirm"): 703 warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning) 704 warning += " Do you want to continue?" 705 raise QueryNeedsExplicitConfirmationException(warning) 706 707 params["amount"] = min(params["amount"], expected_tweets) 708 if max_tweets: 709 params["amount"] = min(max_tweets, params["amount"]) 710 711 return params 712 713 @staticmethod 714 def map_item(item): 715 """ 716 Map a nested Tweet object to a flat dictionary 717 718 Tweet objects are quite rich but 4CAT expects flat dictionaries per 719 item in many cases. Since it would be a shame to not store the data 720 retrieved from Twitter that cannot be stored in a flat file, we store 721 the full objects and only map them to a flat dictionary when needed. 722 This has a speed (and disk space) penalty, but makes sure we don't 723 throw away valuable data and allows for later changes that e.g. store 724 the tweets more efficiently as a MongoDB collection. 725 726 :param item: Tweet object as originally returned by the Twitter API 727 :return dict: Dictionary in the format expected by 4CAT 728 """ 729 tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z") 730 731 # For backward compatibility 732 author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"] 733 author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"] 734 author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else "" 735 736 hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])] 737 mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])] 738 urls = [tag.get("expanded_url", tag["display_url"]) for tag in item.get("entities", {}).get("urls", [])] 739 images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 740 video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 741 742 # by default, the text of retweets is returned as "RT [excerpt of 743 # retweeted tweet]". Since we have the full tweet text, we can complete 744 # the excerpt: 745 is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])]) 746 if is_retweet: 747 retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0] 748 if retweeted_tweet.get("text", False): 749 retweeted_body = retweeted_tweet.get("text") 750 # Get user's username that was retweeted 751 if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'): 752 item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body 753 elif item.get('entities', {}).get('mentions', []): 754 # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted 755 retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')] 756 if retweeting_users: 757 # should only ever be one, but this verifies that there IS one and not NONE 758 item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body 759 760 retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched 761 762 # Retweet entities are only included in the retweet if they occur in the first 140 characters 763 # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense 764 [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])] 765 [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])] 766 [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", [])] 767 # Images appear to be inheritted by retweets, but just in case 768 [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 769 [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 770 771 is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])]) 772 is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])]) 773 774 videos = [] 775 for video in video_items: 776 variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True) 777 if variants: 778 videos.append(variants[0].get('url')) 779 780 expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"} 781 public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics} 782 missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]] 783 warning = "" 784 if missing_metrics: 785 warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports." 786 787 return MappedItem({ 788 "id": item["id"], 789 "thread_id": item.get("conversation_id", item["id"]), 790 "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"), 791 "unix_timestamp": int(tweet_time.timestamp()), 792 'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')), 793 "subject": "", 794 "body": item["text"], 795 "author": author_username, 796 "author_fullname": author_fullname, 797 "author_id": item["author_id"], 798 "author_followers": author_followers, 799 "source": item.get("source"), 800 "language_guess": item.get("lang"), 801 "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no", 802 **public_metrics, 803 "is_retweet": "yes" if is_retweet else "no", 804 "retweeted_user": "" if not is_retweet else retweeted_user, 805 "is_quote_tweet": "yes" if is_quoted else "no", 806 "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""), 807 "is_reply": "yes" if is_reply else "no", 808 "replied_user": item.get("in_reply_to_user", {}).get("username", ""), 809 "hashtags": ','.join(set(hashtags)), 810 "urls": ','.join(set(urls)), 811 "images": ','.join(set(images)), 812 "videos": ','.join(set(videos)), 813 "mentions": ','.join(set(mentions)), 814 "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]), 815 'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''), 816 }, message=warning)
Get Tweets via the X API
65 def get_items(self, query): 66 """ 67 Use the Twitter v2 API historical search to get tweets 68 69 :param query: 70 :return: 71 """ 72 # Compile any errors to highlight at end of log 73 error_report = [] 74 # this is pretty sensitive so delete it immediately after storing in 75 # memory 76 have_api_key = self.config.get("twitterv2-search.academic_api_key") 77 bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key 78 api_type = query.get("api_type", "all") if not have_api_key else "all" 79 auth = {"Authorization": "Bearer %s" % bearer_token} 80 expected_tweets = query.get("expected-tweets", "unknown") 81 82 # these are all expansions and fields available at the time of writing 83 # since it does not cost anything extra in terms of rate limiting, go 84 # for as much data per tweet as possible... 85 tweet_fields = ( 86 "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id", 87 "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings", 88 "source", "text", "withheld") 89 user_fields = ( 90 "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url", 91 "protected", "public_metrics", "url", "username", "verified", "withheld") 92 place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type") 93 poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status") 94 expansions = ( 95 "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id", 96 "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id") 97 media_fields = ( 98 "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants", 99 "alt_text") 100 101 params = { 102 "expansions": ",".join(expansions), 103 "tweet.fields": ",".join(tweet_fields), 104 "user.fields": ",".join(user_fields), 105 "poll.fields": ",".join(poll_fields), 106 "place.fields": ",".join(place_fields), 107 "media.fields": ",".join(media_fields), 108 } 109 110 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 111 endpoint = "https://api.x.com/2/tweets" 112 113 tweet_ids = self.parameters.get("query", []).split(',') 114 115 # Only can lookup 100 tweets in each query per Twitter API 116 chunk_size = 100 117 queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)] 118 expected_tweets = len(tweet_ids) 119 120 amount = len(tweet_ids) 121 122 # Initiate collection of any IDs that are unavailable 123 collected_errors = [] 124 125 else: 126 # Query to all or search 127 endpoint = "https://api.x.com/2/tweets/search/" + api_type 128 129 queries = [self.parameters.get("query", "")] 130 131 amount = convert_to_int(self.parameters.get("amount"), 10) 132 133 params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100 # 100 = upper limit, 10 = lower 134 135 if self.parameters.get("min_date"): 136 params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime( 137 "%Y-%m-%dT%H:%M:%SZ") 138 139 if self.parameters.get("max_date"): 140 params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime( 141 "%Y-%m-%dT%H:%M:%SZ") 142 143 if type(expected_tweets) is int: 144 num_expected_tweets = expected_tweets 145 expected_tweets = "{:,}".format(expected_tweets) 146 else: 147 num_expected_tweets = None 148 149 tweets = 0 150 for query in queries: 151 if self.parameters.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"): 152 params['ids'] = query 153 else: 154 params['query'] = query 155 self.dataset.log("Search parameters: %s" % repr(params)) 156 while True: 157 158 if self.interrupted: 159 raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API") 160 161 # there is a limit of one request per second, so stay on the safe side of this 162 while self.previous_request == int(time.time()): 163 time.sleep(0.1) 164 time.sleep(0.05) 165 self.previous_request = int(time.time()) 166 167 # now send the request, allowing for at least 5 retries if the connection seems unstable 168 retries = 5 169 api_response = None 170 while retries > 0: 171 try: 172 api_response = requests.get(endpoint, headers=auth, params=params, timeout=30) 173 break 174 except (ConnectionError, requests.exceptions.RequestException) as e: 175 retries -= 1 176 wait_time = (5 - retries) * 10 177 self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time)) 178 time.sleep(wait_time) 179 180 # rate limited - the limit at time of writing is 300 reqs per 15 181 # minutes 182 # usually you don't hit this when requesting batches of 500 at 183 # 1/second, but this is also returned when the user reaches the 184 # monthly tweet cap, albeit with different content in that case 185 if api_response.status_code == 429: 186 try: 187 structured_response = api_response.json() 188 if structured_response.get("title") == "UsageCapExceeded": 189 self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts " 190 "until your API quota resets. Dataset completed with posts " 191 "collected so far.", is_final=True) 192 return 193 except (json.JSONDecodeError, ValueError): 194 self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting " 195 "post collection.", is_final=True) 196 return 197 198 resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1 199 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 200 self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str) 201 while time.time() <= resume_at: 202 if self.interrupted: 203 raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset") 204 time.sleep(0.5) 205 continue 206 207 # API keys that are valid but don't have access or haven't been 208 # activated properly get a 403 209 elif api_response.status_code == 403: 210 try: 211 structured_response = api_response.json() 212 self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API " 213 "with this API key. %s" % structured_response.get("detail", ""), is_final=True) 214 except (json.JSONDecodeError, ValueError): 215 self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to " 216 "the full-archive search endpoint.", is_final=True) 217 finally: 218 return 219 220 # sometimes twitter says '503 service unavailable' for unclear 221 # reasons - in that case just wait a while and try again 222 elif api_response.status_code in (502, 503, 504): 223 resume_at = time.time() + 60 224 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 225 self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % ( 226 api_response.status_code, resume_at_str)) 227 while time.time() <= resume_at: 228 time.sleep(0.5) 229 continue 230 231 # this usually means the query is too long or otherwise contains 232 # a syntax error 233 elif api_response.status_code == 400: 234 msg = "Response %i from the X API; " % api_response.status_code 235 try: 236 api_response = api_response.json() 237 msg += api_response.get("title", "") 238 if "detail" in api_response: 239 msg += ": " + api_response.get("detail", "") 240 except (json.JSONDecodeError, TypeError): 241 msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long." 242 243 self.dataset.update_status(msg, is_final=True) 244 return 245 246 # invalid API key 247 elif api_response.status_code == 401: 248 self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True) 249 return 250 251 # haven't seen one yet, but they probably exist 252 elif api_response.status_code != 200: 253 self.dataset.update_status( 254 "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True) 255 self.log.warning("X API v2 responded with status code %i. Response body: %s" % ( 256 api_response.status_code, api_response.text)) 257 return 258 259 elif not api_response: 260 self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True) 261 return 262 263 api_response = api_response.json() 264 265 # The API response contains tweets (of course) and 'includes', 266 # objects that can be referenced in tweets. Later we will splice 267 # this data into the tweets themselves to make them easier to 268 # process. So extract them first... 269 included_users = api_response.get("includes", {}).get("users", {}) 270 included_media = api_response.get("includes", {}).get("media", {}) 271 included_polls = api_response.get("includes", {}).get("polls", {}) 272 included_tweets = api_response.get("includes", {}).get("tweets", {}) 273 included_places = api_response.get("includes", {}).get("places", {}) 274 275 # Collect missing objects from Twitter API response by type 276 missing_objects = {} 277 for missing_object in api_response.get("errors", {}): 278 parameter_type = missing_object.get('resource_type', 'unknown') 279 if parameter_type in missing_objects: 280 missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object 281 else: 282 missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object} 283 num_missing_objects = sum([len(v) for v in missing_objects.values()]) 284 285 # Record any missing objects in log 286 if num_missing_objects > 0: 287 # Log amount 288 self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 289 if num_missing_objects > 50: 290 # Large amount of missing objects; possible error with Twitter API 291 self.import_issues = False 292 error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets)) 293 error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 294 295 # Warn if new missing object is recorded (for developers to handle) 296 expected_error_types = ['user', 'media', 'poll', 'tweet', 'place'] 297 if any(key not in expected_error_types for key in missing_objects.keys()): 298 self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types])) 299 300 # Loop through and collect tweets 301 for tweet in api_response.get("data", []): 302 303 if 0 < amount <= tweets: 304 break 305 306 # splice referenced data back in 307 # we use copy.deepcopy here because else we run into a 308 # pass-by-reference quagmire 309 tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects) 310 311 tweets += 1 312 if tweets % 500 == 0: 313 self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets)) 314 if num_expected_tweets is not None: 315 self.dataset.update_progress(tweets / num_expected_tweets) 316 317 yield tweet 318 319 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 320 # If id_lookup return errors in collecting tweets 321 for tweet_error in api_response.get("errors", []): 322 tweet_id = str(tweet_error.get('resource_id')) 323 if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors: 324 tweet_error = self.fix_tweet_error(tweet_error) 325 collected_errors.append(tweet_id) 326 yield tweet_error 327 328 # paginate 329 if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]: 330 params["next_token"] = api_response["meta"]["next_token"] 331 else: 332 break 333 334 if not self.import_issues: 335 self.dataset.log('Error Report:\n' + '\n'.join(error_report)) 336 self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)
Use the Twitter v2 API historical search to get tweets
Parameters
- query:
Returns
338 def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects): 339 """ 340 Enrich tweet with user and attachment metadata 341 342 Twitter API returns some of the tweet's metadata separately, as 343 'includes' that can be cross-referenced with a user ID or media key. 344 This makes sense to conserve bandwidth, but also means tweets are not 345 'standalone' objects as originally returned. 346 347 However, for processing, making them standalone greatly reduces 348 complexity, as we can simply read one single tweet object and process 349 that data without worrying about having to get other data from 350 elsewhere. So this method takes the metadata and the original tweet, 351 splices the metadata into it where appropriate, and returns the 352 enriched object. 353 354 /!\ This is not an efficient way to store things /!\ but it is more 355 convenient. 356 357 :param dict tweet: The tweet object 358 :param list users: User metadata, as a list of user objects 359 :param list media: Media metadata, as a list of media objects 360 :param list polls: Poll metadata, as a list of poll objects 361 :param list places: Place metadata, as a list of place objects 362 :param list referenced_tweets: Tweets referenced in the tweet, as a 363 list of tweet objects. These will be enriched in turn. 364 :param dict missing_objects: Dictionary with data on missing objects 365 from the API by type. 366 367 :return dict: Enriched tweet object 368 """ 369 # Copy the tweet so that updating this tweet has no effect on others 370 tweet = copy.deepcopy(tweet) 371 # first create temporary mappings so we can easily find the relevant 372 # object later 373 users_by_id = {user["id"]: user for user in users} 374 users_by_name = {user["username"]: user for user in users} 375 media_by_key = {item["media_key"]: item for item in media} 376 polls_by_id = {poll["id"]: poll for poll in polls} 377 places_by_id = {place["id"]: place for place in places} 378 tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets} 379 380 # add tweet author metadata 381 tweet["author_user"] = users_by_id.get(tweet["author_id"]) 382 383 # add place to geo metadata 384 # referenced_tweets also contain place_id, but these places may not included in the place objects 385 if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id: 386 tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id")) 387 elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}): 388 tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {}) 389 390 # add user metadata for mentioned users 391 for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])): 392 if mention["username"] in users_by_name: 393 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])} 394 # missing users can be stored by either user ID or Username in Twitter API's error data; we check both 395 elif mention["username"] in missing_objects.get('user', {}): 396 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}} 397 elif mention["id"] in missing_objects.get('user', {}): 398 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}} 399 400 401 # add poll metadata 402 for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])): 403 if poll_id in polls_by_id: 404 tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id] 405 elif poll_id in missing_objects.get('poll', {}): 406 tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]} 407 408 # add media metadata - seems to be just the media type, the media URL 409 # etc is stored in the 'entities' attribute instead 410 for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])): 411 if media_key in media_by_key: 412 tweet["attachments"]["media_keys"][index] = media_by_key[media_key] 413 elif media_key in missing_objects.get('media', {}): 414 tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]} 415 416 # replied-to user metadata 417 if "in_reply_to_user_id" in tweet: 418 if tweet["in_reply_to_user_id"] in users_by_id: 419 tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]] 420 elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}): 421 tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]} 422 423 # enrich referenced tweets. Even though there should be no recursion - 424 # since tweets cannot be edited - we do not recursively enrich 425 # referenced tweets (should we?) 426 for index, reference in enumerate(tweet.get("referenced_tweets", [])): 427 if reference["id"] in tweets_by_id: 428 tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)} 429 elif reference["id"] in missing_objects.get('tweet', {}): 430 tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}} 431 432 return tweet
Enrich tweet with user and attachment metadata
Twitter API returns some of the tweet's metadata separately, as 'includes' that can be cross-referenced with a user ID or media key. This makes sense to conserve bandwidth, but also means tweets are not 'standalone' objects as originally returned.
However, for processing, making them standalone greatly reduces complexity, as we can simply read one single tweet object and process that data without worrying about having to get other data from elsewhere. So this method takes the metadata and the original tweet, splices the metadata into it where appropriate, and returns the enriched object.
/!\ This is not an efficient way to store things /!\ but it is more convenient.
Parameters
- dict tweet: The tweet object
- list users: User metadata, as a list of user objects
- list media: Media metadata, as a list of media objects
- list polls: Poll metadata, as a list of poll objects
- list places: Place metadata, as a list of place objects
- list referenced_tweets: Tweets referenced in the tweet, as a list of tweet objects. These will be enriched in turn.
- dict missing_objects: Dictionary with data on missing objects from the API by type.
Returns
Enriched tweet object
434 def fix_tweet_error(self, tweet_error): 435 """ 436 Add fields as needed by map_tweet and other functions for errors as they 437 do not conform to normal tweet fields. Specifically for ID Lookup as 438 complete tweet could be missing. 439 440 :param dict tweet_error: Tweet error object from the Twitter API 441 :return dict: A tweet object with the relevant fields sanitised 442 """ 443 modified_tweet = tweet_error 444 modified_tweet['id'] = tweet_error.get('resource_id') 445 modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z") 446 modified_tweet['text'] = '' 447 modified_tweet['author_user'] = {} 448 modified_tweet['author_user']['name'] = 'UNKNOWN' 449 modified_tweet['author_user']['username'] = 'UNKNOWN' 450 modified_tweet['author_id'] = 'UNKNOWN' 451 modified_tweet['public_metrics'] = {} 452 453 # putting detail info in 'subject' field which is normally blank for tweets 454 modified_tweet['subject'] = tweet_error.get('detail') 455 456 return modified_tweet
Add fields as needed by map_tweet and other functions for errors as they do not conform to normal tweet fields. Specifically for ID Lookup as complete tweet could be missing.
Parameters
- dict tweet_error: Tweet error object from the Twitter API
Returns
A tweet object with the relevant fields sanitised
458 @classmethod 459 def get_options(cls, parent_dataset=None, user=None): 460 """ 461 Get Twitter data source options 462 463 These are somewhat dynamic, because depending on settings users may or 464 may not need to provide their own API key, and may or may not be able 465 to enter a list of tweet IDs as their query. Hence the method. 466 467 :param parent_dataset: Should always be None 468 :param user: User to provide options for 469 :return dict: Data source options 470 """ 471 have_api_key = config.get("twitterv2-search.academic_api_key", user=user) 472 max_tweets = config.get("twitterv2-search.max_tweets", user=user) 473 474 if have_api_key: 475 intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve " 476 "historic tweets that match a given query.") 477 478 else: 479 intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the " 480 "it, you must have access to the Research track of the X API. You will need to provide a " 481 "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The " 482 "bearer token **will be sent to the 4CAT server**, where it will be deleted after data " 483 "collection has started. Note that any posts retrieved with 4CAT will count towards your " 484 "monthly post retrieval cap.") 485 486 intro_text += ("\n\nPlease refer to the [X API documentation](" 487 "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) " 488 "documentation for more information about this API endpoint and the syntax you can use in your " 489 "search query. Retweets are included by default; add `-is:retweet` to exclude them.") 490 491 options = { 492 "intro-1": { 493 "type": UserInput.OPTION_INFO, 494 "help": intro_text 495 }, 496 } 497 498 if not have_api_key: 499 # options.update({ 500 # "api_type": { 501 # "type": UserInput.OPTION_CHOICE, 502 # "help": "API track", 503 # "options": { 504 # "all": "Research API: Full-archive search", 505 # "recent": "Standard: Recent search (Tweets published in last 7 days)", 506 # }, 507 # "default": "all" 508 # } 509 # }) 510 options.update({ 511 "api_bearer_token": { 512 "type": UserInput.OPTION_TEXT, 513 "sensitive": True, 514 "cache": True, 515 "help": "API Bearer Token" 516 }, 517 }) 518 519 if config.get("twitterv2.id_lookup", user=user): 520 options.update({ 521 "query_type": { 522 "type": UserInput.OPTION_CHOICE, 523 "help": "Query type", 524 "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup", 525 "options": { 526 "query": "Search query", 527 "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)", 528 }, 529 "default": "query" 530 } 531 }) 532 533 options.update({ 534 "query": { 535 "type": UserInput.OPTION_TEXT_LARGE, 536 "help": "Query" 537 }, 538 "amount": { 539 "type": UserInput.OPTION_TEXT, 540 "help": "Posts to retrieve", 541 "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)), 542 "min": 0, 543 "max": max_tweets if max_tweets else 10_000_000, 544 "default": 10 545 }, 546 "divider-2": { 547 "type": UserInput.OPTION_DIVIDER 548 }, 549 "daterange-info": { 550 "type": UserInput.OPTION_INFO, 551 "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you " 552 "need to explicitly set a date range." 553 }, 554 "daterange": { 555 "type": UserInput.OPTION_DATERANGE, 556 "help": "Date range" 557 }, 558 }) 559 560 return options
Get Twitter data source options
These are somewhat dynamic, because depending on settings users may or may not need to provide their own API key, and may or may not be able to enter a list of tweet IDs as their query. Hence the method.
Parameters
- parent_dataset: Should always be None
- user: User to provide options for
Returns
Data source options
562 @staticmethod 563 def validate_query(query, request, user): 564 """ 565 Validate input for a dataset query on the Twitter data source. 566 567 Will raise a QueryParametersException if invalid parameters are 568 encountered. Parameters are additionally sanitised. 569 570 Will also raise a QueryNeedsExplicitConfirmation if the 'counts' 571 endpoint of the Twitter API indicates that it will take more than 572 30 minutes to collect the dataset. In the front-end, this will 573 trigger a warning and confirmation request. 574 575 :param dict query: Query parameters, from client-side. 576 :param request: Flask request 577 :param User user: User object of user who has submitted the query 578 :return dict: Safe query parameters 579 """ 580 have_api_key = config.get("twitterv2-search.academic_api_key", user=user) 581 max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000, user=user) 582 583 # this is the bare minimum, else we can't narrow down the full data set 584 if not query.get("query", None): 585 raise QueryParametersException("Please provide a query.") 586 587 if not have_api_key: 588 if not query.get("api_bearer_token", None): 589 raise QueryParametersException("Please provide a valid bearer token.") 590 591 if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup": 592 raise QueryParametersException("X API queries cannot be longer than 1024 characters.") 593 594 if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup", user=user): 595 # reformat queries to be a comma-separated list with no wrapping 596 # whitespace 597 whitespace = re.compile(r"\s+") 598 items = whitespace.sub("", query.get("query").replace("\n", ",")) 599 # eliminate empty queries 600 twitter_query = ','.join([item for item in items.split(",") if item]) 601 else: 602 twitter_query = query.get("query") 603 604 # the dates need to make sense as a range to search within 605 # but, on Twitter, you can also specify before *or* after only 606 after, before = query.get("daterange") 607 if before and after and before < after: 608 raise QueryParametersException("Date range must start before it ends") 609 610 # if we made it this far, the query can be executed 611 params = { 612 "query": twitter_query, 613 "api_bearer_token": query.get("api_bearer_token"), 614 "api_type": query.get("api_type", "all"), 615 "query_type": query.get("query_type", "query"), 616 "min_date": after, 617 "max_date": before 618 } 619 620 # never query more tweets than allowed 621 tweets_to_collect = convert_to_int(query.get("amount"), 10) 622 623 if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0): 624 tweets_to_collect = max_tweets 625 params["amount"] = tweets_to_collect 626 627 # figure out how many tweets we expect to get back - we can use this 628 # to dissuade users from running huge queries that will take forever 629 # to process 630 if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key): 631 count_url = "https://api.x.com/2/tweets/counts/all" 632 count_params = { 633 "granularity": "day", 634 "query": params["query"], 635 } 636 637 # if we're doing a date range, pass this on to the counts endpoint in 638 # the right format 639 if after: 640 count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ") 641 642 if before: 643 count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ") 644 645 bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key 646 647 expected_tweets = 0 648 while True: 649 response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token}, 650 timeout=15) 651 if response.status_code == 200: 652 try: 653 # figure out how many tweets there are and estimate how much 654 # time it will take to process them. if it's going to take 655 # longer than half an hour, warn the user 656 expected_tweets += int(response.json()["meta"]["total_tweet_count"]) 657 except KeyError: 658 # no harm done, we just don't know how many tweets will be 659 # returned (but they will still be returned) 660 break 661 662 if "next_token" not in response.json().get("meta", {}): 663 break 664 else: 665 count_params["next_token"] = response.json()["meta"]["next_token"] 666 667 elif response.status_code == 401: 668 raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid " 669 "for the Research track of the X API.") 670 671 elif response.status_code == 400: 672 raise QueryParametersException("Your query is invalid. Please make sure the date range does not " 673 "extend into the future, or to before Twitter's founding, and that " 674 "your query is shorter than 1024 characters. Using AND in the query " 675 "is not possible (AND is implied; OR can be used). Use \"and\" to " 676 "search for the literal word.") 677 678 else: 679 # we can still continue without the expected tweets 680 break 681 682 warning = "" 683 if expected_tweets: 684 collectible_tweets = min(max_tweets, params["amount"]) 685 if collectible_tweets == 0: 686 collectible_tweets = max_tweets 687 688 if collectible_tweets > 0: 689 if collectible_tweets < expected_tweets: 690 warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets) 691 real_expected_tweets = min(expected_tweets, collectible_tweets) 692 else: 693 real_expected_tweets = expected_tweets 694 695 expected_seconds = int(real_expected_tweets / 30) # seems to be about this 696 expected_time = timify_long(expected_seconds) 697 params["expected-tweets"] = expected_tweets 698 699 if expected_seconds > 900: 700 warning += ". Collection will take approximately %s." % expected_time 701 702 if warning and not query.get("frontend-confirm"): 703 warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning) 704 warning += " Do you want to continue?" 705 raise QueryNeedsExplicitConfirmationException(warning) 706 707 params["amount"] = min(params["amount"], expected_tweets) 708 if max_tweets: 709 params["amount"] = min(max_tweets, params["amount"]) 710 711 return params
Validate input for a dataset query on the Twitter data source.
Will raise a QueryParametersException if invalid parameters are encountered. Parameters are additionally sanitised.
Will also raise a QueryNeedsExplicitConfirmation if the 'counts' endpoint of the Twitter API indicates that it will take more than 30 minutes to collect the dataset. In the front-end, this will trigger a warning and confirmation request.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
713 @staticmethod 714 def map_item(item): 715 """ 716 Map a nested Tweet object to a flat dictionary 717 718 Tweet objects are quite rich but 4CAT expects flat dictionaries per 719 item in many cases. Since it would be a shame to not store the data 720 retrieved from Twitter that cannot be stored in a flat file, we store 721 the full objects and only map them to a flat dictionary when needed. 722 This has a speed (and disk space) penalty, but makes sure we don't 723 throw away valuable data and allows for later changes that e.g. store 724 the tweets more efficiently as a MongoDB collection. 725 726 :param item: Tweet object as originally returned by the Twitter API 727 :return dict: Dictionary in the format expected by 4CAT 728 """ 729 tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z") 730 731 # For backward compatibility 732 author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"] 733 author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"] 734 author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else "" 735 736 hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])] 737 mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])] 738 urls = [tag.get("expanded_url", tag["display_url"]) for tag in item.get("entities", {}).get("urls", [])] 739 images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 740 video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 741 742 # by default, the text of retweets is returned as "RT [excerpt of 743 # retweeted tweet]". Since we have the full tweet text, we can complete 744 # the excerpt: 745 is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])]) 746 if is_retweet: 747 retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0] 748 if retweeted_tweet.get("text", False): 749 retweeted_body = retweeted_tweet.get("text") 750 # Get user's username that was retweeted 751 if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'): 752 item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body 753 elif item.get('entities', {}).get('mentions', []): 754 # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted 755 retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')] 756 if retweeting_users: 757 # should only ever be one, but this verifies that there IS one and not NONE 758 item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body 759 760 retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched 761 762 # Retweet entities are only included in the retweet if they occur in the first 140 characters 763 # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense 764 [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])] 765 [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])] 766 [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", [])] 767 # Images appear to be inheritted by retweets, but just in case 768 [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 769 [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 770 771 is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])]) 772 is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])]) 773 774 videos = [] 775 for video in video_items: 776 variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True) 777 if variants: 778 videos.append(variants[0].get('url')) 779 780 expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"} 781 public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics} 782 missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]] 783 warning = "" 784 if missing_metrics: 785 warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports." 786 787 return MappedItem({ 788 "id": item["id"], 789 "thread_id": item.get("conversation_id", item["id"]), 790 "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"), 791 "unix_timestamp": int(tweet_time.timestamp()), 792 'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')), 793 "subject": "", 794 "body": item["text"], 795 "author": author_username, 796 "author_fullname": author_fullname, 797 "author_id": item["author_id"], 798 "author_followers": author_followers, 799 "source": item.get("source"), 800 "language_guess": item.get("lang"), 801 "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no", 802 **public_metrics, 803 "is_retweet": "yes" if is_retweet else "no", 804 "retweeted_user": "" if not is_retweet else retweeted_user, 805 "is_quote_tweet": "yes" if is_quoted else "no", 806 "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""), 807 "is_reply": "yes" if is_reply else "no", 808 "replied_user": item.get("in_reply_to_user", {}).get("username", ""), 809 "hashtags": ','.join(set(hashtags)), 810 "urls": ','.join(set(urls)), 811 "images": ','.join(set(images)), 812 "videos": ','.join(set(videos)), 813 "mentions": ','.join(set(mentions)), 814 "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]), 815 'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''), 816 }, message=warning)
Map a nested Tweet object to a flat dictionary
Tweet objects are quite rich but 4CAT expects flat dictionaries per item in many cases. Since it would be a shame to not store the data retrieved from Twitter that cannot be stored in a flat file, we store the full objects and only map them to a flat dictionary when needed. This has a speed (and disk space) penalty, but makes sure we don't throw away valuable data and allows for later changes that e.g. store the tweets more efficiently as a MongoDB collection.
Parameters
- item: Tweet object as originally returned by the Twitter API
Returns
Dictionary in the format expected by 4CAT
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor