datasources.twitterv2.search_twitter
X/Twitter keyword search via the X API v2
1""" 2X/Twitter keyword search via the X API v2 3""" 4import requests 5import datetime 6import copy 7import time 8import json 9import re 10 11from backend.lib.search import Search 12from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, QueryNeedsExplicitConfirmationException 13from common.lib.helpers import convert_to_int, UserInput, timify 14from common.lib.item_mapping import MappedItem, MissingMappedField 15 16 17class SearchWithTwitterAPIv2(Search): 18 """ 19 Get Tweets via the X API 20 """ 21 type = "twitterv2-search" # job ID 22 title = "X/Twitter API (v2)" 23 extension = "ndjson" 24 is_local = False # Whether this datasource is locally scraped 25 is_static = False # Whether this datasource is still updated 26 27 previous_request = 0 28 import_issues = True 29 30 references = [ 31 "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)" 32 ] 33 34 config = { 35 "twitterv2-search.academic_api_key": { 36 "type": UserInput.OPTION_TEXT, 37 "default": "", 38 "help": "Research API Key", 39 "tooltip": "An API key for the X/Twitter v2 Research API. If " 40 "provided, the user will not need to enter their own " 41 "key to retrieve tweets. Note that this API key should " 42 "have access to the Full Archive Search endpoint." 43 }, 44 "twitterv2-search.max_tweets": { 45 "type": UserInput.OPTION_TEXT, 46 "default": 0, 47 "min": 0, 48 "max": 10_000_000, 49 "help": "Max posts per dataset", 50 "tooltip": "4CAT will never retrieve more than this amount of " 51 "posts per dataset. Enter '0' for unlimited posts." 52 }, 53 "twitterv2-search.id_lookup": { 54 "type": UserInput.OPTION_TOGGLE, 55 "default": False, 56 "help": "Allow lookup by ID", 57 "tooltip": "If enabled, allow users to enter a list of post IDs " 58 "to retrieve. This is disabled by default because it " 59 "can be confusing to novice users." 60 } 61 } 62 63 def get_items(self, query): 64 """ 65 Use the Twitter v2 API historical search to get tweets 66 67 :param query: 68 :return: 69 """ 70 # Compile any errors to highlight at end of log 71 error_report = [] 72 # this is pretty sensitive so delete it immediately after storing in 73 # memory 74 have_api_key = self.config.get("twitterv2-search.academic_api_key") 75 bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key 76 api_type = query.get("api_type", "all") if not have_api_key else "all" 77 auth = {"Authorization": "Bearer %s" % bearer_token} 78 expected_tweets = query.get("expected-tweets", "unknown") 79 80 # these are all expansions and fields available at the time of writing 81 # since it does not cost anything extra in terms of rate limiting, go 82 # for as much data per tweet as possible... 83 tweet_fields = ( 84 "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id", 85 "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings", 86 "source", "text", "withheld") 87 user_fields = ( 88 "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url", 89 "protected", "public_metrics", "url", "username", "verified", "withheld") 90 place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type") 91 poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status") 92 expansions = ( 93 "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id", 94 "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id") 95 media_fields = ( 96 "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants", 97 "alt_text") 98 99 params = { 100 "expansions": ",".join(expansions), 101 "tweet.fields": ",".join(tweet_fields), 102 "user.fields": ",".join(user_fields), 103 "poll.fields": ",".join(poll_fields), 104 "place.fields": ",".join(place_fields), 105 "media.fields": ",".join(media_fields), 106 } 107 108 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 109 endpoint = "https://api.x.com/2/tweets" 110 111 tweet_ids = self.parameters.get("query", []).split(',') 112 113 # Only can lookup 100 tweets in each query per Twitter API 114 chunk_size = 100 115 queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)] 116 expected_tweets = len(tweet_ids) 117 118 amount = len(tweet_ids) 119 120 # Initiate collection of any IDs that are unavailable 121 collected_errors = [] 122 123 else: 124 # Query to all or search 125 endpoint = "https://api.x.com/2/tweets/search/" + api_type 126 127 queries = [self.parameters.get("query", "")] 128 129 amount = convert_to_int(self.parameters.get("amount"), 10) 130 131 params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100 # 100 = upper limit, 10 = lower 132 133 if self.parameters.get("min_date"): 134 params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime( 135 "%Y-%m-%dT%H:%M:%SZ") 136 137 if self.parameters.get("max_date"): 138 params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime( 139 "%Y-%m-%dT%H:%M:%SZ") 140 141 if type(expected_tweets) is int: 142 num_expected_tweets = expected_tweets 143 expected_tweets = "{:,}".format(expected_tweets) 144 else: 145 num_expected_tweets = None 146 147 tweets = 0 148 for query in queries: 149 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 150 params['ids'] = query 151 else: 152 params['query'] = query 153 self.dataset.log("Search parameters: %s" % repr(params)) 154 while True: 155 156 if self.interrupted: 157 raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API") 158 159 # there is a limit of one request per second, so stay on the safe side of this 160 while self.previous_request == int(time.time()): 161 time.sleep(0.1) 162 time.sleep(0.05) 163 self.previous_request = int(time.time()) 164 165 # now send the request, allowing for at least 5 retries if the connection seems unstable 166 retries = 5 167 api_response = None 168 while retries > 0: 169 try: 170 api_response = requests.get(endpoint, headers=auth, params=params, timeout=30) 171 break 172 except (ConnectionError, requests.exceptions.RequestException) as e: 173 retries -= 1 174 wait_time = (5 - retries) * 10 175 self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time)) 176 time.sleep(wait_time) 177 178 # rate limited - the limit at time of writing is 300 reqs per 15 179 # minutes 180 # usually you don't hit this when requesting batches of 500 at 181 # 1/second, but this is also returned when the user reaches the 182 # monthly tweet cap, albeit with different content in that case 183 if api_response.status_code == 429: 184 try: 185 structured_response = api_response.json() 186 if structured_response.get("title") == "UsageCapExceeded": 187 self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts " 188 "until your API quota resets. Dataset completed with posts " 189 "collected so far.", is_final=True) 190 return 191 except (json.JSONDecodeError, ValueError): 192 self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting " 193 "post collection.", is_final=True) 194 return 195 196 resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1 197 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 198 self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str) 199 while time.time() <= resume_at: 200 if self.interrupted: 201 raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset") 202 time.sleep(0.5) 203 continue 204 205 # API keys that are valid but don't have access or haven't been 206 # activated properly get a 403 207 elif api_response.status_code == 403: 208 try: 209 structured_response = api_response.json() 210 self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API " 211 "with this API key. %s" % structured_response.get("detail", ""), is_final=True) 212 except (json.JSONDecodeError, ValueError): 213 self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to " 214 "the full-archive search endpoint.", is_final=True) 215 finally: 216 return 217 218 # sometimes twitter says '503 service unavailable' for unclear 219 # reasons - in that case just wait a while and try again 220 elif api_response.status_code in (502, 503, 504): 221 resume_at = time.time() + 60 222 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 223 self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % ( 224 api_response.status_code, resume_at_str)) 225 while time.time() <= resume_at: 226 time.sleep(0.5) 227 continue 228 229 # this usually means the query is too long or otherwise contains 230 # a syntax error 231 elif api_response.status_code == 400: 232 msg = "Response %i from the X API; " % api_response.status_code 233 try: 234 api_response = api_response.json() 235 msg += api_response.get("title", "") 236 if "detail" in api_response: 237 msg += ": " + api_response.get("detail", "") 238 except (json.JSONDecodeError, TypeError): 239 msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long." 240 241 self.dataset.update_status(msg, is_final=True) 242 return 243 244 # invalid API key 245 elif api_response.status_code == 401: 246 self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True) 247 return 248 249 # haven't seen one yet, but they probably exist 250 elif api_response.status_code != 200: 251 self.dataset.update_status( 252 "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True) 253 self.log.warning("X API v2 responded with status code %i. Response body: %s" % ( 254 api_response.status_code, api_response.text)) 255 return 256 257 elif not api_response: 258 self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True) 259 return 260 261 api_response = api_response.json() 262 263 # The API response contains tweets (of course) and 'includes', 264 # objects that can be referenced in tweets. Later we will splice 265 # this data into the tweets themselves to make them easier to 266 # process. So extract them first... 267 included_users = api_response.get("includes", {}).get("users", {}) 268 included_media = api_response.get("includes", {}).get("media", {}) 269 included_polls = api_response.get("includes", {}).get("polls", {}) 270 included_tweets = api_response.get("includes", {}).get("tweets", {}) 271 included_places = api_response.get("includes", {}).get("places", {}) 272 273 # Collect missing objects from Twitter API response by type 274 missing_objects = {} 275 for missing_object in api_response.get("errors", {}): 276 parameter_type = missing_object.get('resource_type', 'unknown') 277 if parameter_type in missing_objects: 278 missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object 279 else: 280 missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object} 281 num_missing_objects = sum([len(v) for v in missing_objects.values()]) 282 283 # Record any missing objects in log 284 if num_missing_objects > 0: 285 # Log amount 286 self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 287 if num_missing_objects > 50: 288 # Large amount of missing objects; possible error with Twitter API 289 self.import_issues = False 290 error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets)) 291 error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 292 293 # Warn if new missing object is recorded (for developers to handle) 294 expected_error_types = ['user', 'media', 'poll', 'tweet', 'place'] 295 if any(key not in expected_error_types for key in missing_objects.keys()): 296 self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types])) 297 298 # Loop through and collect tweets 299 for tweet in api_response.get("data", []): 300 301 if 0 < amount <= tweets: 302 break 303 304 # splice referenced data back in 305 # we use copy.deepcopy here because else we run into a 306 # pass-by-reference quagmire 307 tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects) 308 309 tweets += 1 310 if tweets % 500 == 0: 311 self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets)) 312 if num_expected_tweets is not None: 313 self.dataset.update_progress(tweets / num_expected_tweets) 314 315 yield tweet 316 317 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 318 # If id_lookup return errors in collecting tweets 319 for tweet_error in api_response.get("errors", []): 320 tweet_id = str(tweet_error.get('resource_id')) 321 if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors: 322 tweet_error = self.fix_tweet_error(tweet_error) 323 collected_errors.append(tweet_id) 324 yield tweet_error 325 326 # paginate 327 if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]: 328 params["next_token"] = api_response["meta"]["next_token"] 329 else: 330 break 331 332 if not self.import_issues: 333 self.dataset.log('Error Report:\n' + '\n'.join(error_report)) 334 self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True) 335 336 def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects): 337 """ 338 Enrich tweet with user and attachment metadata 339 340 Twitter API returns some of the tweet's metadata separately, as 341 'includes' that can be cross-referenced with a user ID or media key. 342 This makes sense to conserve bandwidth, but also means tweets are not 343 'standalone' objects as originally returned. 344 345 However, for processing, making them standalone greatly reduces 346 complexity, as we can simply read one single tweet object and process 347 that data without worrying about having to get other data from 348 elsewhere. So this method takes the metadata and the original tweet, 349 splices the metadata into it where appropriate, and returns the 350 enriched object. 351 352 **This is not an efficient way to store things** but it is more 353 convenient. 354 355 :param dict tweet: The tweet object 356 :param list users: User metadata, as a list of user objects 357 :param list media: Media metadata, as a list of media objects 358 :param list polls: Poll metadata, as a list of poll objects 359 :param list places: Place metadata, as a list of place objects 360 :param list referenced_tweets: Tweets referenced in the tweet, as a 361 list of tweet objects. These will be enriched in turn. 362 :param dict missing_objects: Dictionary with data on missing objects 363 from the API by type. 364 365 :return dict: Enriched tweet object 366 """ 367 # Copy the tweet so that updating this tweet has no effect on others 368 tweet = copy.deepcopy(tweet) 369 # first create temporary mappings so we can easily find the relevant 370 # object later 371 users_by_id = {user["id"]: user for user in users} 372 users_by_name = {user["username"]: user for user in users} 373 media_by_key = {item["media_key"]: item for item in media} 374 polls_by_id = {poll["id"]: poll for poll in polls} 375 places_by_id = {place["id"]: place for place in places} 376 tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets} 377 378 # add tweet author metadata 379 tweet["author_user"] = users_by_id.get(tweet["author_id"]) 380 381 # add place to geo metadata 382 # referenced_tweets also contain place_id, but these places may not included in the place objects 383 if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id: 384 tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id")) 385 elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}): 386 tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {}) 387 388 # add user metadata for mentioned users 389 for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])): 390 if mention["username"] in users_by_name: 391 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])} 392 # missing users can be stored by either user ID or Username in Twitter API's error data; we check both 393 elif mention["username"] in missing_objects.get('user', {}): 394 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}} 395 elif mention["id"] in missing_objects.get('user', {}): 396 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}} 397 398 399 # add poll metadata 400 for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])): 401 if poll_id in polls_by_id: 402 tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id] 403 elif poll_id in missing_objects.get('poll', {}): 404 tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]} 405 406 # add media metadata - seems to be just the media type, the media URL 407 # etc is stored in the 'entities' attribute instead 408 for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])): 409 if media_key in media_by_key: 410 tweet["attachments"]["media_keys"][index] = media_by_key[media_key] 411 elif media_key in missing_objects.get('media', {}): 412 tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]} 413 414 # replied-to user metadata 415 if "in_reply_to_user_id" in tweet: 416 if tweet["in_reply_to_user_id"] in users_by_id: 417 tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]] 418 elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}): 419 tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]} 420 421 # enrich referenced tweets. Even though there should be no recursion - 422 # since tweets cannot be edited - we do not recursively enrich 423 # referenced tweets (should we?) 424 for index, reference in enumerate(tweet.get("referenced_tweets", [])): 425 if reference["id"] in tweets_by_id: 426 tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)} 427 elif reference["id"] in missing_objects.get('tweet', {}): 428 tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}} 429 430 return tweet 431 432 def fix_tweet_error(self, tweet_error): 433 """ 434 Add fields as needed by map_tweet and other functions for errors as they 435 do not conform to normal tweet fields. Specifically for ID Lookup as 436 complete tweet could be missing. 437 438 :param dict tweet_error: Tweet error object from the Twitter API 439 :return dict: A tweet object with the relevant fields sanitised 440 """ 441 modified_tweet = tweet_error 442 modified_tweet['id'] = tweet_error.get('resource_id') 443 modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z") 444 modified_tweet['text'] = '' 445 modified_tweet['author_user'] = {} 446 modified_tweet['author_user']['name'] = 'UNKNOWN' 447 modified_tweet['author_user']['username'] = 'UNKNOWN' 448 modified_tweet['author_id'] = 'UNKNOWN' 449 modified_tweet['public_metrics'] = {} 450 451 # putting detail info in 'subject' field which is normally blank for tweets 452 modified_tweet['subject'] = tweet_error.get('detail') 453 454 return modified_tweet 455 456 @classmethod 457 def get_options(cls, parent_dataset=None, config=None): 458 """ 459 Get Twitter data source options 460 461 These are somewhat dynamic, because depending on settings users may or 462 may not need to provide their own API key, and may or may not be able 463 to enter a list of tweet IDs as their query. Hence the method. 464 465 :param config: 466 :param parent_dataset: Should always be None 467 :return dict: Data source options 468 """ 469 have_api_key = config.get("twitterv2-search.academic_api_key") 470 max_tweets = config.get("twitterv2-search.max_tweets") 471 472 if have_api_key: 473 intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve " 474 "historic tweets that match a given query.") 475 476 else: 477 intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the " 478 "it, you must have access to the Research track of the X API. You will need to provide a " 479 "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The " 480 "bearer token **will be sent to the 4CAT server**, where it will be deleted after data " 481 "collection has started. Note that any posts retrieved with 4CAT will count towards your " 482 "monthly post retrieval cap.") 483 484 intro_text += ("\n\nPlease refer to the [X API documentation](" 485 "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) " 486 "documentation for more information about this API endpoint and the syntax you can use in your " 487 "search query. Retweets are included by default; add `-is:retweet` to exclude them.") 488 489 options = { 490 "intro-1": { 491 "type": UserInput.OPTION_INFO, 492 "help": intro_text 493 }, 494 } 495 496 if not have_api_key: 497 # options.update({ 498 # "api_type": { 499 # "type": UserInput.OPTION_CHOICE, 500 # "help": "API track", 501 # "options": { 502 # "all": "Research API: Full-archive search", 503 # "recent": "Standard: Recent search (Tweets published in last 7 days)", 504 # }, 505 # "default": "all" 506 # } 507 # }) 508 options.update({ 509 "api_bearer_token": { 510 "type": UserInput.OPTION_TEXT, 511 "sensitive": True, 512 "cache": True, 513 "help": "API Bearer Token" 514 }, 515 }) 516 517 if config.get("twitterv2.id_lookup"): 518 options.update({ 519 "query_type": { 520 "type": UserInput.OPTION_CHOICE, 521 "help": "Query type", 522 "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup", 523 "options": { 524 "query": "Search query", 525 "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)", 526 }, 527 "default": "query" 528 } 529 }) 530 531 options.update({ 532 "query": { 533 "type": UserInput.OPTION_TEXT_LARGE, 534 "help": "Query" 535 }, 536 "amount": { 537 "type": UserInput.OPTION_TEXT, 538 "help": "Posts to retrieve", 539 "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)), 540 "min": 0, 541 "max": max_tweets if max_tweets else 10_000_000, 542 "default": 10 543 }, 544 "divider-2": { 545 "type": UserInput.OPTION_DIVIDER 546 }, 547 "daterange-info": { 548 "type": UserInput.OPTION_INFO, 549 "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you " 550 "need to explicitly set a date range." 551 }, 552 "daterange": { 553 "type": UserInput.OPTION_DATERANGE, 554 "help": "Date range" 555 }, 556 }) 557 558 return options 559 560 @staticmethod 561 def validate_query(query, request, config): 562 """ 563 Validate input for a dataset query on the Twitter data source. 564 565 Will raise a QueryParametersException if invalid parameters are 566 encountered. Parameters are additionally sanitised. 567 568 Will also raise a QueryNeedsExplicitConfirmation if the 'counts' 569 endpoint of the Twitter API indicates that it will take more than 570 30 minutes to collect the dataset. In the front-end, this will 571 trigger a warning and confirmation request. 572 573 :param dict query: Query parameters, from client-side. 574 :param request: Flask request 575 :param ConfigManager|None config: Configuration reader (context-aware) 576 :return dict: Safe query parameters 577 """ 578 have_api_key = config.get("twitterv2-search.academic_api_key") 579 max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000) 580 581 # this is the bare minimum, else we can't narrow down the full data set 582 if not query.get("query", None): 583 raise QueryParametersException("Please provide a query.") 584 585 if not have_api_key: 586 if not query.get("api_bearer_token", None): 587 raise QueryParametersException("Please provide a valid bearer token.") 588 589 if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup": 590 raise QueryParametersException("X API queries cannot be longer than 1024 characters.") 591 592 if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"): 593 # reformat queries to be a comma-separated list with no wrapping 594 # whitespace 595 whitespace = re.compile(r"\s+") 596 items = whitespace.sub("", query.get("query").replace("\n", ",")) 597 # eliminate empty queries 598 twitter_query = ','.join([item for item in items.split(",") if item]) 599 else: 600 twitter_query = query.get("query") 601 602 # the dates need to make sense as a range to search within 603 # but, on Twitter, you can also specify before *or* after only 604 after, before = query.get("daterange") 605 if before and after and before < after: 606 raise QueryParametersException("Date range must start before it ends") 607 608 # if we made it this far, the query can be executed 609 params = { 610 "query": twitter_query, 611 "api_bearer_token": query.get("api_bearer_token"), 612 "api_type": query.get("api_type", "all"), 613 "query_type": query.get("query_type", "query"), 614 "min_date": after, 615 "max_date": before 616 } 617 618 # never query more tweets than allowed 619 tweets_to_collect = convert_to_int(query.get("amount"), 10) 620 621 if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0): 622 tweets_to_collect = max_tweets 623 params["amount"] = tweets_to_collect 624 625 # figure out how many tweets we expect to get back - we can use this 626 # to dissuade users from running huge queries that will take forever 627 # to process 628 if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key): 629 count_url = "https://api.x.com/2/tweets/counts/all" 630 count_params = { 631 "granularity": "day", 632 "query": params["query"], 633 } 634 635 # if we're doing a date range, pass this on to the counts endpoint in 636 # the right format 637 if after: 638 count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ") 639 640 if before: 641 count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ") 642 643 bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key 644 645 expected_tweets = 0 646 while True: 647 response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token}, 648 timeout=15) 649 if response.status_code == 200: 650 try: 651 # figure out how many tweets there are and estimate how much 652 # time it will take to process them. if it's going to take 653 # longer than half an hour, warn the user 654 expected_tweets += int(response.json()["meta"]["total_tweet_count"]) 655 except KeyError: 656 # no harm done, we just don't know how many tweets will be 657 # returned (but they will still be returned) 658 break 659 660 if "next_token" not in response.json().get("meta", {}): 661 break 662 else: 663 count_params["next_token"] = response.json()["meta"]["next_token"] 664 665 elif response.status_code == 401: 666 raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid " 667 "for the Research track of the X API.") 668 669 elif response.status_code == 400: 670 raise QueryParametersException("Your query is invalid. Please make sure the date range does not " 671 "extend into the future, or to before Twitter's founding, and that " 672 "your query is shorter than 1024 characters. Using AND in the query " 673 "is not possible (AND is implied; OR can be used). Use \"and\" to " 674 "search for the literal word.") 675 676 else: 677 # we can still continue without the expected tweets 678 break 679 680 warning = "" 681 if expected_tweets: 682 collectible_tweets = min(max_tweets, params["amount"]) 683 if collectible_tweets == 0: 684 collectible_tweets = max_tweets 685 686 if collectible_tweets > 0: 687 if collectible_tweets < expected_tweets: 688 warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets) 689 real_expected_tweets = min(expected_tweets, collectible_tweets) 690 else: 691 real_expected_tweets = expected_tweets 692 693 expected_seconds = int(real_expected_tweets / 30) # seems to be about this 694 expected_time = timify(expected_seconds) 695 params["expected-tweets"] = expected_tweets 696 697 if expected_seconds > 900: 698 warning += ". Collection will take approximately %s." % expected_time 699 700 if warning and not query.get("frontend-confirm"): 701 warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning) 702 warning += " Do you want to continue?" 703 raise QueryNeedsExplicitConfirmationException(warning) 704 705 params["amount"] = min(params["amount"], expected_tweets) 706 if max_tweets: 707 params["amount"] = min(max_tweets, params["amount"]) 708 709 return params 710 711 @staticmethod 712 def map_item(item): 713 """ 714 Map a nested Tweet object to a flat dictionary 715 716 Tweet objects are quite rich but 4CAT expects flat dictionaries per 717 item in many cases. Since it would be a shame to not store the data 718 retrieved from Twitter that cannot be stored in a flat file, we store 719 the full objects and only map them to a flat dictionary when needed. 720 This has a speed (and disk space) penalty, but makes sure we don't 721 throw away valuable data and allows for later changes that e.g. store 722 the tweets more efficiently as a MongoDB collection. 723 724 :param item: Tweet object as originally returned by the Twitter API 725 :return dict: Dictionary in the format expected by 4CAT 726 """ 727 tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z") 728 729 # For backward compatibility 730 author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"] 731 author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"] 732 author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else "" 733 734 hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])] 735 mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])] 736 urls = [tag.get("expanded_url", tag.get("display_url")) for tag in item.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)] 737 images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 738 video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 739 740 # by default, the text of retweets is returned as "RT [excerpt of 741 # retweeted tweet]". Since we have the full tweet text, we can complete 742 # the excerpt: 743 is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])]) 744 if is_retweet: 745 retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0] 746 if retweeted_tweet.get("text", False): 747 retweeted_body = retweeted_tweet.get("text") 748 # Get user's username that was retweeted 749 if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'): 750 item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body 751 elif item.get('entities', {}).get('mentions', []): 752 # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted 753 retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')] 754 if retweeting_users: 755 # should only ever be one, but this verifies that there IS one and not NONE 756 item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body 757 758 retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched 759 760 # Retweet entities are only included in the retweet if they occur in the first 140 characters 761 # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense 762 [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])] 763 [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])] 764 [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)] 765 # Images appear to be inheritted by retweets, but just in case 766 [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 767 [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 768 769 is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])]) 770 is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])]) 771 772 videos = [] 773 for video in video_items: 774 variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True) 775 if variants: 776 videos.append(variants[0].get('url')) 777 778 expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"} 779 public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics} 780 missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]] 781 warning = "" 782 if missing_metrics: 783 warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports." 784 785 return MappedItem({ 786 "id": item["id"], 787 "thread_id": item.get("conversation_id", item["id"]), 788 "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"), 789 "unix_timestamp": int(tweet_time.timestamp()), 790 'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')), 791 "subject": "", 792 "body": item["text"], 793 "author": author_username, 794 "author_fullname": author_fullname, 795 "author_id": item["author_id"], 796 "author_followers": author_followers, 797 "source": item.get("source"), 798 "language_guess": item.get("lang"), 799 "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no", 800 **public_metrics, 801 "is_retweet": "yes" if is_retweet else "no", 802 "retweeted_user": "" if not is_retweet else retweeted_user, 803 "is_quote_tweet": "yes" if is_quoted else "no", 804 "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""), 805 "is_reply": "yes" if is_reply else "no", 806 "replied_user": item.get("in_reply_to_user", {}).get("username", ""), 807 "hashtags": ','.join(set(hashtags)), 808 "urls": ','.join(set(urls)), 809 "images": ','.join(set(images)), 810 "videos": ','.join(set(videos)), 811 "mentions": ','.join(set(mentions)), 812 "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]), 813 'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''), 814 }, message=warning)
18class SearchWithTwitterAPIv2(Search): 19 """ 20 Get Tweets via the X API 21 """ 22 type = "twitterv2-search" # job ID 23 title = "X/Twitter API (v2)" 24 extension = "ndjson" 25 is_local = False # Whether this datasource is locally scraped 26 is_static = False # Whether this datasource is still updated 27 28 previous_request = 0 29 import_issues = True 30 31 references = [ 32 "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)" 33 ] 34 35 config = { 36 "twitterv2-search.academic_api_key": { 37 "type": UserInput.OPTION_TEXT, 38 "default": "", 39 "help": "Research API Key", 40 "tooltip": "An API key for the X/Twitter v2 Research API. If " 41 "provided, the user will not need to enter their own " 42 "key to retrieve tweets. Note that this API key should " 43 "have access to the Full Archive Search endpoint." 44 }, 45 "twitterv2-search.max_tweets": { 46 "type": UserInput.OPTION_TEXT, 47 "default": 0, 48 "min": 0, 49 "max": 10_000_000, 50 "help": "Max posts per dataset", 51 "tooltip": "4CAT will never retrieve more than this amount of " 52 "posts per dataset. Enter '0' for unlimited posts." 53 }, 54 "twitterv2-search.id_lookup": { 55 "type": UserInput.OPTION_TOGGLE, 56 "default": False, 57 "help": "Allow lookup by ID", 58 "tooltip": "If enabled, allow users to enter a list of post IDs " 59 "to retrieve. This is disabled by default because it " 60 "can be confusing to novice users." 61 } 62 } 63 64 def get_items(self, query): 65 """ 66 Use the Twitter v2 API historical search to get tweets 67 68 :param query: 69 :return: 70 """ 71 # Compile any errors to highlight at end of log 72 error_report = [] 73 # this is pretty sensitive so delete it immediately after storing in 74 # memory 75 have_api_key = self.config.get("twitterv2-search.academic_api_key") 76 bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key 77 api_type = query.get("api_type", "all") if not have_api_key else "all" 78 auth = {"Authorization": "Bearer %s" % bearer_token} 79 expected_tweets = query.get("expected-tweets", "unknown") 80 81 # these are all expansions and fields available at the time of writing 82 # since it does not cost anything extra in terms of rate limiting, go 83 # for as much data per tweet as possible... 84 tweet_fields = ( 85 "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id", 86 "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings", 87 "source", "text", "withheld") 88 user_fields = ( 89 "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url", 90 "protected", "public_metrics", "url", "username", "verified", "withheld") 91 place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type") 92 poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status") 93 expansions = ( 94 "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id", 95 "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id") 96 media_fields = ( 97 "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants", 98 "alt_text") 99 100 params = { 101 "expansions": ",".join(expansions), 102 "tweet.fields": ",".join(tweet_fields), 103 "user.fields": ",".join(user_fields), 104 "poll.fields": ",".join(poll_fields), 105 "place.fields": ",".join(place_fields), 106 "media.fields": ",".join(media_fields), 107 } 108 109 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 110 endpoint = "https://api.x.com/2/tweets" 111 112 tweet_ids = self.parameters.get("query", []).split(',') 113 114 # Only can lookup 100 tweets in each query per Twitter API 115 chunk_size = 100 116 queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)] 117 expected_tweets = len(tweet_ids) 118 119 amount = len(tweet_ids) 120 121 # Initiate collection of any IDs that are unavailable 122 collected_errors = [] 123 124 else: 125 # Query to all or search 126 endpoint = "https://api.x.com/2/tweets/search/" + api_type 127 128 queries = [self.parameters.get("query", "")] 129 130 amount = convert_to_int(self.parameters.get("amount"), 10) 131 132 params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100 # 100 = upper limit, 10 = lower 133 134 if self.parameters.get("min_date"): 135 params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime( 136 "%Y-%m-%dT%H:%M:%SZ") 137 138 if self.parameters.get("max_date"): 139 params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime( 140 "%Y-%m-%dT%H:%M:%SZ") 141 142 if type(expected_tweets) is int: 143 num_expected_tweets = expected_tweets 144 expected_tweets = "{:,}".format(expected_tweets) 145 else: 146 num_expected_tweets = None 147 148 tweets = 0 149 for query in queries: 150 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 151 params['ids'] = query 152 else: 153 params['query'] = query 154 self.dataset.log("Search parameters: %s" % repr(params)) 155 while True: 156 157 if self.interrupted: 158 raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API") 159 160 # there is a limit of one request per second, so stay on the safe side of this 161 while self.previous_request == int(time.time()): 162 time.sleep(0.1) 163 time.sleep(0.05) 164 self.previous_request = int(time.time()) 165 166 # now send the request, allowing for at least 5 retries if the connection seems unstable 167 retries = 5 168 api_response = None 169 while retries > 0: 170 try: 171 api_response = requests.get(endpoint, headers=auth, params=params, timeout=30) 172 break 173 except (ConnectionError, requests.exceptions.RequestException) as e: 174 retries -= 1 175 wait_time = (5 - retries) * 10 176 self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time)) 177 time.sleep(wait_time) 178 179 # rate limited - the limit at time of writing is 300 reqs per 15 180 # minutes 181 # usually you don't hit this when requesting batches of 500 at 182 # 1/second, but this is also returned when the user reaches the 183 # monthly tweet cap, albeit with different content in that case 184 if api_response.status_code == 429: 185 try: 186 structured_response = api_response.json() 187 if structured_response.get("title") == "UsageCapExceeded": 188 self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts " 189 "until your API quota resets. Dataset completed with posts " 190 "collected so far.", is_final=True) 191 return 192 except (json.JSONDecodeError, ValueError): 193 self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting " 194 "post collection.", is_final=True) 195 return 196 197 resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1 198 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 199 self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str) 200 while time.time() <= resume_at: 201 if self.interrupted: 202 raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset") 203 time.sleep(0.5) 204 continue 205 206 # API keys that are valid but don't have access or haven't been 207 # activated properly get a 403 208 elif api_response.status_code == 403: 209 try: 210 structured_response = api_response.json() 211 self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API " 212 "with this API key. %s" % structured_response.get("detail", ""), is_final=True) 213 except (json.JSONDecodeError, ValueError): 214 self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to " 215 "the full-archive search endpoint.", is_final=True) 216 finally: 217 return 218 219 # sometimes twitter says '503 service unavailable' for unclear 220 # reasons - in that case just wait a while and try again 221 elif api_response.status_code in (502, 503, 504): 222 resume_at = time.time() + 60 223 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 224 self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % ( 225 api_response.status_code, resume_at_str)) 226 while time.time() <= resume_at: 227 time.sleep(0.5) 228 continue 229 230 # this usually means the query is too long or otherwise contains 231 # a syntax error 232 elif api_response.status_code == 400: 233 msg = "Response %i from the X API; " % api_response.status_code 234 try: 235 api_response = api_response.json() 236 msg += api_response.get("title", "") 237 if "detail" in api_response: 238 msg += ": " + api_response.get("detail", "") 239 except (json.JSONDecodeError, TypeError): 240 msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long." 241 242 self.dataset.update_status(msg, is_final=True) 243 return 244 245 # invalid API key 246 elif api_response.status_code == 401: 247 self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True) 248 return 249 250 # haven't seen one yet, but they probably exist 251 elif api_response.status_code != 200: 252 self.dataset.update_status( 253 "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True) 254 self.log.warning("X API v2 responded with status code %i. Response body: %s" % ( 255 api_response.status_code, api_response.text)) 256 return 257 258 elif not api_response: 259 self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True) 260 return 261 262 api_response = api_response.json() 263 264 # The API response contains tweets (of course) and 'includes', 265 # objects that can be referenced in tweets. Later we will splice 266 # this data into the tweets themselves to make them easier to 267 # process. So extract them first... 268 included_users = api_response.get("includes", {}).get("users", {}) 269 included_media = api_response.get("includes", {}).get("media", {}) 270 included_polls = api_response.get("includes", {}).get("polls", {}) 271 included_tweets = api_response.get("includes", {}).get("tweets", {}) 272 included_places = api_response.get("includes", {}).get("places", {}) 273 274 # Collect missing objects from Twitter API response by type 275 missing_objects = {} 276 for missing_object in api_response.get("errors", {}): 277 parameter_type = missing_object.get('resource_type', 'unknown') 278 if parameter_type in missing_objects: 279 missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object 280 else: 281 missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object} 282 num_missing_objects = sum([len(v) for v in missing_objects.values()]) 283 284 # Record any missing objects in log 285 if num_missing_objects > 0: 286 # Log amount 287 self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 288 if num_missing_objects > 50: 289 # Large amount of missing objects; possible error with Twitter API 290 self.import_issues = False 291 error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets)) 292 error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 293 294 # Warn if new missing object is recorded (for developers to handle) 295 expected_error_types = ['user', 'media', 'poll', 'tweet', 'place'] 296 if any(key not in expected_error_types for key in missing_objects.keys()): 297 self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types])) 298 299 # Loop through and collect tweets 300 for tweet in api_response.get("data", []): 301 302 if 0 < amount <= tweets: 303 break 304 305 # splice referenced data back in 306 # we use copy.deepcopy here because else we run into a 307 # pass-by-reference quagmire 308 tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects) 309 310 tweets += 1 311 if tweets % 500 == 0: 312 self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets)) 313 if num_expected_tweets is not None: 314 self.dataset.update_progress(tweets / num_expected_tweets) 315 316 yield tweet 317 318 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 319 # If id_lookup return errors in collecting tweets 320 for tweet_error in api_response.get("errors", []): 321 tweet_id = str(tweet_error.get('resource_id')) 322 if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors: 323 tweet_error = self.fix_tweet_error(tweet_error) 324 collected_errors.append(tweet_id) 325 yield tweet_error 326 327 # paginate 328 if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]: 329 params["next_token"] = api_response["meta"]["next_token"] 330 else: 331 break 332 333 if not self.import_issues: 334 self.dataset.log('Error Report:\n' + '\n'.join(error_report)) 335 self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True) 336 337 def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects): 338 """ 339 Enrich tweet with user and attachment metadata 340 341 Twitter API returns some of the tweet's metadata separately, as 342 'includes' that can be cross-referenced with a user ID or media key. 343 This makes sense to conserve bandwidth, but also means tweets are not 344 'standalone' objects as originally returned. 345 346 However, for processing, making them standalone greatly reduces 347 complexity, as we can simply read one single tweet object and process 348 that data without worrying about having to get other data from 349 elsewhere. So this method takes the metadata and the original tweet, 350 splices the metadata into it where appropriate, and returns the 351 enriched object. 352 353 **This is not an efficient way to store things** but it is more 354 convenient. 355 356 :param dict tweet: The tweet object 357 :param list users: User metadata, as a list of user objects 358 :param list media: Media metadata, as a list of media objects 359 :param list polls: Poll metadata, as a list of poll objects 360 :param list places: Place metadata, as a list of place objects 361 :param list referenced_tweets: Tweets referenced in the tweet, as a 362 list of tweet objects. These will be enriched in turn. 363 :param dict missing_objects: Dictionary with data on missing objects 364 from the API by type. 365 366 :return dict: Enriched tweet object 367 """ 368 # Copy the tweet so that updating this tweet has no effect on others 369 tweet = copy.deepcopy(tweet) 370 # first create temporary mappings so we can easily find the relevant 371 # object later 372 users_by_id = {user["id"]: user for user in users} 373 users_by_name = {user["username"]: user for user in users} 374 media_by_key = {item["media_key"]: item for item in media} 375 polls_by_id = {poll["id"]: poll for poll in polls} 376 places_by_id = {place["id"]: place for place in places} 377 tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets} 378 379 # add tweet author metadata 380 tweet["author_user"] = users_by_id.get(tweet["author_id"]) 381 382 # add place to geo metadata 383 # referenced_tweets also contain place_id, but these places may not included in the place objects 384 if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id: 385 tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id")) 386 elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}): 387 tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {}) 388 389 # add user metadata for mentioned users 390 for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])): 391 if mention["username"] in users_by_name: 392 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])} 393 # missing users can be stored by either user ID or Username in Twitter API's error data; we check both 394 elif mention["username"] in missing_objects.get('user', {}): 395 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}} 396 elif mention["id"] in missing_objects.get('user', {}): 397 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}} 398 399 400 # add poll metadata 401 for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])): 402 if poll_id in polls_by_id: 403 tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id] 404 elif poll_id in missing_objects.get('poll', {}): 405 tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]} 406 407 # add media metadata - seems to be just the media type, the media URL 408 # etc is stored in the 'entities' attribute instead 409 for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])): 410 if media_key in media_by_key: 411 tweet["attachments"]["media_keys"][index] = media_by_key[media_key] 412 elif media_key in missing_objects.get('media', {}): 413 tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]} 414 415 # replied-to user metadata 416 if "in_reply_to_user_id" in tweet: 417 if tweet["in_reply_to_user_id"] in users_by_id: 418 tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]] 419 elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}): 420 tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]} 421 422 # enrich referenced tweets. Even though there should be no recursion - 423 # since tweets cannot be edited - we do not recursively enrich 424 # referenced tweets (should we?) 425 for index, reference in enumerate(tweet.get("referenced_tweets", [])): 426 if reference["id"] in tweets_by_id: 427 tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)} 428 elif reference["id"] in missing_objects.get('tweet', {}): 429 tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}} 430 431 return tweet 432 433 def fix_tweet_error(self, tweet_error): 434 """ 435 Add fields as needed by map_tweet and other functions for errors as they 436 do not conform to normal tweet fields. Specifically for ID Lookup as 437 complete tweet could be missing. 438 439 :param dict tweet_error: Tweet error object from the Twitter API 440 :return dict: A tweet object with the relevant fields sanitised 441 """ 442 modified_tweet = tweet_error 443 modified_tweet['id'] = tweet_error.get('resource_id') 444 modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z") 445 modified_tweet['text'] = '' 446 modified_tweet['author_user'] = {} 447 modified_tweet['author_user']['name'] = 'UNKNOWN' 448 modified_tweet['author_user']['username'] = 'UNKNOWN' 449 modified_tweet['author_id'] = 'UNKNOWN' 450 modified_tweet['public_metrics'] = {} 451 452 # putting detail info in 'subject' field which is normally blank for tweets 453 modified_tweet['subject'] = tweet_error.get('detail') 454 455 return modified_tweet 456 457 @classmethod 458 def get_options(cls, parent_dataset=None, config=None): 459 """ 460 Get Twitter data source options 461 462 These are somewhat dynamic, because depending on settings users may or 463 may not need to provide their own API key, and may or may not be able 464 to enter a list of tweet IDs as their query. Hence the method. 465 466 :param config: 467 :param parent_dataset: Should always be None 468 :return dict: Data source options 469 """ 470 have_api_key = config.get("twitterv2-search.academic_api_key") 471 max_tweets = config.get("twitterv2-search.max_tweets") 472 473 if have_api_key: 474 intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve " 475 "historic tweets that match a given query.") 476 477 else: 478 intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the " 479 "it, you must have access to the Research track of the X API. You will need to provide a " 480 "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The " 481 "bearer token **will be sent to the 4CAT server**, where it will be deleted after data " 482 "collection has started. Note that any posts retrieved with 4CAT will count towards your " 483 "monthly post retrieval cap.") 484 485 intro_text += ("\n\nPlease refer to the [X API documentation](" 486 "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) " 487 "documentation for more information about this API endpoint and the syntax you can use in your " 488 "search query. Retweets are included by default; add `-is:retweet` to exclude them.") 489 490 options = { 491 "intro-1": { 492 "type": UserInput.OPTION_INFO, 493 "help": intro_text 494 }, 495 } 496 497 if not have_api_key: 498 # options.update({ 499 # "api_type": { 500 # "type": UserInput.OPTION_CHOICE, 501 # "help": "API track", 502 # "options": { 503 # "all": "Research API: Full-archive search", 504 # "recent": "Standard: Recent search (Tweets published in last 7 days)", 505 # }, 506 # "default": "all" 507 # } 508 # }) 509 options.update({ 510 "api_bearer_token": { 511 "type": UserInput.OPTION_TEXT, 512 "sensitive": True, 513 "cache": True, 514 "help": "API Bearer Token" 515 }, 516 }) 517 518 if config.get("twitterv2.id_lookup"): 519 options.update({ 520 "query_type": { 521 "type": UserInput.OPTION_CHOICE, 522 "help": "Query type", 523 "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup", 524 "options": { 525 "query": "Search query", 526 "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)", 527 }, 528 "default": "query" 529 } 530 }) 531 532 options.update({ 533 "query": { 534 "type": UserInput.OPTION_TEXT_LARGE, 535 "help": "Query" 536 }, 537 "amount": { 538 "type": UserInput.OPTION_TEXT, 539 "help": "Posts to retrieve", 540 "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)), 541 "min": 0, 542 "max": max_tweets if max_tweets else 10_000_000, 543 "default": 10 544 }, 545 "divider-2": { 546 "type": UserInput.OPTION_DIVIDER 547 }, 548 "daterange-info": { 549 "type": UserInput.OPTION_INFO, 550 "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you " 551 "need to explicitly set a date range." 552 }, 553 "daterange": { 554 "type": UserInput.OPTION_DATERANGE, 555 "help": "Date range" 556 }, 557 }) 558 559 return options 560 561 @staticmethod 562 def validate_query(query, request, config): 563 """ 564 Validate input for a dataset query on the Twitter data source. 565 566 Will raise a QueryParametersException if invalid parameters are 567 encountered. Parameters are additionally sanitised. 568 569 Will also raise a QueryNeedsExplicitConfirmation if the 'counts' 570 endpoint of the Twitter API indicates that it will take more than 571 30 minutes to collect the dataset. In the front-end, this will 572 trigger a warning and confirmation request. 573 574 :param dict query: Query parameters, from client-side. 575 :param request: Flask request 576 :param ConfigManager|None config: Configuration reader (context-aware) 577 :return dict: Safe query parameters 578 """ 579 have_api_key = config.get("twitterv2-search.academic_api_key") 580 max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000) 581 582 # this is the bare minimum, else we can't narrow down the full data set 583 if not query.get("query", None): 584 raise QueryParametersException("Please provide a query.") 585 586 if not have_api_key: 587 if not query.get("api_bearer_token", None): 588 raise QueryParametersException("Please provide a valid bearer token.") 589 590 if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup": 591 raise QueryParametersException("X API queries cannot be longer than 1024 characters.") 592 593 if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"): 594 # reformat queries to be a comma-separated list with no wrapping 595 # whitespace 596 whitespace = re.compile(r"\s+") 597 items = whitespace.sub("", query.get("query").replace("\n", ",")) 598 # eliminate empty queries 599 twitter_query = ','.join([item for item in items.split(",") if item]) 600 else: 601 twitter_query = query.get("query") 602 603 # the dates need to make sense as a range to search within 604 # but, on Twitter, you can also specify before *or* after only 605 after, before = query.get("daterange") 606 if before and after and before < after: 607 raise QueryParametersException("Date range must start before it ends") 608 609 # if we made it this far, the query can be executed 610 params = { 611 "query": twitter_query, 612 "api_bearer_token": query.get("api_bearer_token"), 613 "api_type": query.get("api_type", "all"), 614 "query_type": query.get("query_type", "query"), 615 "min_date": after, 616 "max_date": before 617 } 618 619 # never query more tweets than allowed 620 tweets_to_collect = convert_to_int(query.get("amount"), 10) 621 622 if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0): 623 tweets_to_collect = max_tweets 624 params["amount"] = tweets_to_collect 625 626 # figure out how many tweets we expect to get back - we can use this 627 # to dissuade users from running huge queries that will take forever 628 # to process 629 if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key): 630 count_url = "https://api.x.com/2/tweets/counts/all" 631 count_params = { 632 "granularity": "day", 633 "query": params["query"], 634 } 635 636 # if we're doing a date range, pass this on to the counts endpoint in 637 # the right format 638 if after: 639 count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ") 640 641 if before: 642 count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ") 643 644 bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key 645 646 expected_tweets = 0 647 while True: 648 response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token}, 649 timeout=15) 650 if response.status_code == 200: 651 try: 652 # figure out how many tweets there are and estimate how much 653 # time it will take to process them. if it's going to take 654 # longer than half an hour, warn the user 655 expected_tweets += int(response.json()["meta"]["total_tweet_count"]) 656 except KeyError: 657 # no harm done, we just don't know how many tweets will be 658 # returned (but they will still be returned) 659 break 660 661 if "next_token" not in response.json().get("meta", {}): 662 break 663 else: 664 count_params["next_token"] = response.json()["meta"]["next_token"] 665 666 elif response.status_code == 401: 667 raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid " 668 "for the Research track of the X API.") 669 670 elif response.status_code == 400: 671 raise QueryParametersException("Your query is invalid. Please make sure the date range does not " 672 "extend into the future, or to before Twitter's founding, and that " 673 "your query is shorter than 1024 characters. Using AND in the query " 674 "is not possible (AND is implied; OR can be used). Use \"and\" to " 675 "search for the literal word.") 676 677 else: 678 # we can still continue without the expected tweets 679 break 680 681 warning = "" 682 if expected_tweets: 683 collectible_tweets = min(max_tweets, params["amount"]) 684 if collectible_tweets == 0: 685 collectible_tweets = max_tweets 686 687 if collectible_tweets > 0: 688 if collectible_tweets < expected_tweets: 689 warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets) 690 real_expected_tweets = min(expected_tweets, collectible_tweets) 691 else: 692 real_expected_tweets = expected_tweets 693 694 expected_seconds = int(real_expected_tweets / 30) # seems to be about this 695 expected_time = timify(expected_seconds) 696 params["expected-tweets"] = expected_tweets 697 698 if expected_seconds > 900: 699 warning += ". Collection will take approximately %s." % expected_time 700 701 if warning and not query.get("frontend-confirm"): 702 warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning) 703 warning += " Do you want to continue?" 704 raise QueryNeedsExplicitConfirmationException(warning) 705 706 params["amount"] = min(params["amount"], expected_tweets) 707 if max_tweets: 708 params["amount"] = min(max_tweets, params["amount"]) 709 710 return params 711 712 @staticmethod 713 def map_item(item): 714 """ 715 Map a nested Tweet object to a flat dictionary 716 717 Tweet objects are quite rich but 4CAT expects flat dictionaries per 718 item in many cases. Since it would be a shame to not store the data 719 retrieved from Twitter that cannot be stored in a flat file, we store 720 the full objects and only map them to a flat dictionary when needed. 721 This has a speed (and disk space) penalty, but makes sure we don't 722 throw away valuable data and allows for later changes that e.g. store 723 the tweets more efficiently as a MongoDB collection. 724 725 :param item: Tweet object as originally returned by the Twitter API 726 :return dict: Dictionary in the format expected by 4CAT 727 """ 728 tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z") 729 730 # For backward compatibility 731 author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"] 732 author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"] 733 author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else "" 734 735 hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])] 736 mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])] 737 urls = [tag.get("expanded_url", tag.get("display_url")) for tag in item.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)] 738 images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 739 video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 740 741 # by default, the text of retweets is returned as "RT [excerpt of 742 # retweeted tweet]". Since we have the full tweet text, we can complete 743 # the excerpt: 744 is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])]) 745 if is_retweet: 746 retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0] 747 if retweeted_tweet.get("text", False): 748 retweeted_body = retweeted_tweet.get("text") 749 # Get user's username that was retweeted 750 if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'): 751 item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body 752 elif item.get('entities', {}).get('mentions', []): 753 # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted 754 retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')] 755 if retweeting_users: 756 # should only ever be one, but this verifies that there IS one and not NONE 757 item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body 758 759 retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched 760 761 # Retweet entities are only included in the retweet if they occur in the first 140 characters 762 # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense 763 [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])] 764 [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])] 765 [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)] 766 # Images appear to be inheritted by retweets, but just in case 767 [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 768 [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 769 770 is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])]) 771 is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])]) 772 773 videos = [] 774 for video in video_items: 775 variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True) 776 if variants: 777 videos.append(variants[0].get('url')) 778 779 expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"} 780 public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics} 781 missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]] 782 warning = "" 783 if missing_metrics: 784 warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports." 785 786 return MappedItem({ 787 "id": item["id"], 788 "thread_id": item.get("conversation_id", item["id"]), 789 "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"), 790 "unix_timestamp": int(tweet_time.timestamp()), 791 'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')), 792 "subject": "", 793 "body": item["text"], 794 "author": author_username, 795 "author_fullname": author_fullname, 796 "author_id": item["author_id"], 797 "author_followers": author_followers, 798 "source": item.get("source"), 799 "language_guess": item.get("lang"), 800 "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no", 801 **public_metrics, 802 "is_retweet": "yes" if is_retweet else "no", 803 "retweeted_user": "" if not is_retweet else retweeted_user, 804 "is_quote_tweet": "yes" if is_quoted else "no", 805 "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""), 806 "is_reply": "yes" if is_reply else "no", 807 "replied_user": item.get("in_reply_to_user", {}).get("username", ""), 808 "hashtags": ','.join(set(hashtags)), 809 "urls": ','.join(set(urls)), 810 "images": ','.join(set(images)), 811 "videos": ','.join(set(videos)), 812 "mentions": ','.join(set(mentions)), 813 "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]), 814 'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''), 815 }, message=warning)
Get Tweets via the X API
64 def get_items(self, query): 65 """ 66 Use the Twitter v2 API historical search to get tweets 67 68 :param query: 69 :return: 70 """ 71 # Compile any errors to highlight at end of log 72 error_report = [] 73 # this is pretty sensitive so delete it immediately after storing in 74 # memory 75 have_api_key = self.config.get("twitterv2-search.academic_api_key") 76 bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key 77 api_type = query.get("api_type", "all") if not have_api_key else "all" 78 auth = {"Authorization": "Bearer %s" % bearer_token} 79 expected_tweets = query.get("expected-tweets", "unknown") 80 81 # these are all expansions and fields available at the time of writing 82 # since it does not cost anything extra in terms of rate limiting, go 83 # for as much data per tweet as possible... 84 tweet_fields = ( 85 "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id", 86 "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings", 87 "source", "text", "withheld") 88 user_fields = ( 89 "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url", 90 "protected", "public_metrics", "url", "username", "verified", "withheld") 91 place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type") 92 poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status") 93 expansions = ( 94 "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id", 95 "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id") 96 media_fields = ( 97 "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants", 98 "alt_text") 99 100 params = { 101 "expansions": ",".join(expansions), 102 "tweet.fields": ",".join(tweet_fields), 103 "user.fields": ",".join(user_fields), 104 "poll.fields": ",".join(poll_fields), 105 "place.fields": ",".join(place_fields), 106 "media.fields": ",".join(media_fields), 107 } 108 109 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 110 endpoint = "https://api.x.com/2/tweets" 111 112 tweet_ids = self.parameters.get("query", []).split(',') 113 114 # Only can lookup 100 tweets in each query per Twitter API 115 chunk_size = 100 116 queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)] 117 expected_tweets = len(tweet_ids) 118 119 amount = len(tweet_ids) 120 121 # Initiate collection of any IDs that are unavailable 122 collected_errors = [] 123 124 else: 125 # Query to all or search 126 endpoint = "https://api.x.com/2/tweets/search/" + api_type 127 128 queries = [self.parameters.get("query", "")] 129 130 amount = convert_to_int(self.parameters.get("amount"), 10) 131 132 params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100 # 100 = upper limit, 10 = lower 133 134 if self.parameters.get("min_date"): 135 params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime( 136 "%Y-%m-%dT%H:%M:%SZ") 137 138 if self.parameters.get("max_date"): 139 params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime( 140 "%Y-%m-%dT%H:%M:%SZ") 141 142 if type(expected_tweets) is int: 143 num_expected_tweets = expected_tweets 144 expected_tweets = "{:,}".format(expected_tweets) 145 else: 146 num_expected_tweets = None 147 148 tweets = 0 149 for query in queries: 150 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 151 params['ids'] = query 152 else: 153 params['query'] = query 154 self.dataset.log("Search parameters: %s" % repr(params)) 155 while True: 156 157 if self.interrupted: 158 raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API") 159 160 # there is a limit of one request per second, so stay on the safe side of this 161 while self.previous_request == int(time.time()): 162 time.sleep(0.1) 163 time.sleep(0.05) 164 self.previous_request = int(time.time()) 165 166 # now send the request, allowing for at least 5 retries if the connection seems unstable 167 retries = 5 168 api_response = None 169 while retries > 0: 170 try: 171 api_response = requests.get(endpoint, headers=auth, params=params, timeout=30) 172 break 173 except (ConnectionError, requests.exceptions.RequestException) as e: 174 retries -= 1 175 wait_time = (5 - retries) * 10 176 self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time)) 177 time.sleep(wait_time) 178 179 # rate limited - the limit at time of writing is 300 reqs per 15 180 # minutes 181 # usually you don't hit this when requesting batches of 500 at 182 # 1/second, but this is also returned when the user reaches the 183 # monthly tweet cap, albeit with different content in that case 184 if api_response.status_code == 429: 185 try: 186 structured_response = api_response.json() 187 if structured_response.get("title") == "UsageCapExceeded": 188 self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts " 189 "until your API quota resets. Dataset completed with posts " 190 "collected so far.", is_final=True) 191 return 192 except (json.JSONDecodeError, ValueError): 193 self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting " 194 "post collection.", is_final=True) 195 return 196 197 resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1 198 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 199 self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str) 200 while time.time() <= resume_at: 201 if self.interrupted: 202 raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset") 203 time.sleep(0.5) 204 continue 205 206 # API keys that are valid but don't have access or haven't been 207 # activated properly get a 403 208 elif api_response.status_code == 403: 209 try: 210 structured_response = api_response.json() 211 self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API " 212 "with this API key. %s" % structured_response.get("detail", ""), is_final=True) 213 except (json.JSONDecodeError, ValueError): 214 self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to " 215 "the full-archive search endpoint.", is_final=True) 216 finally: 217 return 218 219 # sometimes twitter says '503 service unavailable' for unclear 220 # reasons - in that case just wait a while and try again 221 elif api_response.status_code in (502, 503, 504): 222 resume_at = time.time() + 60 223 resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c") 224 self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % ( 225 api_response.status_code, resume_at_str)) 226 while time.time() <= resume_at: 227 time.sleep(0.5) 228 continue 229 230 # this usually means the query is too long or otherwise contains 231 # a syntax error 232 elif api_response.status_code == 400: 233 msg = "Response %i from the X API; " % api_response.status_code 234 try: 235 api_response = api_response.json() 236 msg += api_response.get("title", "") 237 if "detail" in api_response: 238 msg += ": " + api_response.get("detail", "") 239 except (json.JSONDecodeError, TypeError): 240 msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long." 241 242 self.dataset.update_status(msg, is_final=True) 243 return 244 245 # invalid API key 246 elif api_response.status_code == 401: 247 self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True) 248 return 249 250 # haven't seen one yet, but they probably exist 251 elif api_response.status_code != 200: 252 self.dataset.update_status( 253 "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True) 254 self.log.warning("X API v2 responded with status code %i. Response body: %s" % ( 255 api_response.status_code, api_response.text)) 256 return 257 258 elif not api_response: 259 self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True) 260 return 261 262 api_response = api_response.json() 263 264 # The API response contains tweets (of course) and 'includes', 265 # objects that can be referenced in tweets. Later we will splice 266 # this data into the tweets themselves to make them easier to 267 # process. So extract them first... 268 included_users = api_response.get("includes", {}).get("users", {}) 269 included_media = api_response.get("includes", {}).get("media", {}) 270 included_polls = api_response.get("includes", {}).get("polls", {}) 271 included_tweets = api_response.get("includes", {}).get("tweets", {}) 272 included_places = api_response.get("includes", {}).get("places", {}) 273 274 # Collect missing objects from Twitter API response by type 275 missing_objects = {} 276 for missing_object in api_response.get("errors", {}): 277 parameter_type = missing_object.get('resource_type', 'unknown') 278 if parameter_type in missing_objects: 279 missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object 280 else: 281 missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object} 282 num_missing_objects = sum([len(v) for v in missing_objects.values()]) 283 284 # Record any missing objects in log 285 if num_missing_objects > 0: 286 # Log amount 287 self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 288 if num_missing_objects > 50: 289 # Large amount of missing objects; possible error with Twitter API 290 self.import_issues = False 291 error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets)) 292 error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()])) 293 294 # Warn if new missing object is recorded (for developers to handle) 295 expected_error_types = ['user', 'media', 'poll', 'tweet', 'place'] 296 if any(key not in expected_error_types for key in missing_objects.keys()): 297 self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types])) 298 299 # Loop through and collect tweets 300 for tweet in api_response.get("data", []): 301 302 if 0 < amount <= tweets: 303 break 304 305 # splice referenced data back in 306 # we use copy.deepcopy here because else we run into a 307 # pass-by-reference quagmire 308 tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects) 309 310 tweets += 1 311 if tweets % 500 == 0: 312 self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets)) 313 if num_expected_tweets is not None: 314 self.dataset.update_progress(tweets / num_expected_tweets) 315 316 yield tweet 317 318 if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"): 319 # If id_lookup return errors in collecting tweets 320 for tweet_error in api_response.get("errors", []): 321 tweet_id = str(tweet_error.get('resource_id')) 322 if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors: 323 tweet_error = self.fix_tweet_error(tweet_error) 324 collected_errors.append(tweet_id) 325 yield tweet_error 326 327 # paginate 328 if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]: 329 params["next_token"] = api_response["meta"]["next_token"] 330 else: 331 break 332 333 if not self.import_issues: 334 self.dataset.log('Error Report:\n' + '\n'.join(error_report)) 335 self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)
Use the Twitter v2 API historical search to get tweets
Parameters
- query:
Returns
337 def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects): 338 """ 339 Enrich tweet with user and attachment metadata 340 341 Twitter API returns some of the tweet's metadata separately, as 342 'includes' that can be cross-referenced with a user ID or media key. 343 This makes sense to conserve bandwidth, but also means tweets are not 344 'standalone' objects as originally returned. 345 346 However, for processing, making them standalone greatly reduces 347 complexity, as we can simply read one single tweet object and process 348 that data without worrying about having to get other data from 349 elsewhere. So this method takes the metadata and the original tweet, 350 splices the metadata into it where appropriate, and returns the 351 enriched object. 352 353 **This is not an efficient way to store things** but it is more 354 convenient. 355 356 :param dict tweet: The tweet object 357 :param list users: User metadata, as a list of user objects 358 :param list media: Media metadata, as a list of media objects 359 :param list polls: Poll metadata, as a list of poll objects 360 :param list places: Place metadata, as a list of place objects 361 :param list referenced_tweets: Tweets referenced in the tweet, as a 362 list of tweet objects. These will be enriched in turn. 363 :param dict missing_objects: Dictionary with data on missing objects 364 from the API by type. 365 366 :return dict: Enriched tweet object 367 """ 368 # Copy the tweet so that updating this tweet has no effect on others 369 tweet = copy.deepcopy(tweet) 370 # first create temporary mappings so we can easily find the relevant 371 # object later 372 users_by_id = {user["id"]: user for user in users} 373 users_by_name = {user["username"]: user for user in users} 374 media_by_key = {item["media_key"]: item for item in media} 375 polls_by_id = {poll["id"]: poll for poll in polls} 376 places_by_id = {place["id"]: place for place in places} 377 tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets} 378 379 # add tweet author metadata 380 tweet["author_user"] = users_by_id.get(tweet["author_id"]) 381 382 # add place to geo metadata 383 # referenced_tweets also contain place_id, but these places may not included in the place objects 384 if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id: 385 tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id")) 386 elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}): 387 tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {}) 388 389 # add user metadata for mentioned users 390 for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])): 391 if mention["username"] in users_by_name: 392 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])} 393 # missing users can be stored by either user ID or Username in Twitter API's error data; we check both 394 elif mention["username"] in missing_objects.get('user', {}): 395 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}} 396 elif mention["id"] in missing_objects.get('user', {}): 397 tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}} 398 399 400 # add poll metadata 401 for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])): 402 if poll_id in polls_by_id: 403 tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id] 404 elif poll_id in missing_objects.get('poll', {}): 405 tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]} 406 407 # add media metadata - seems to be just the media type, the media URL 408 # etc is stored in the 'entities' attribute instead 409 for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])): 410 if media_key in media_by_key: 411 tweet["attachments"]["media_keys"][index] = media_by_key[media_key] 412 elif media_key in missing_objects.get('media', {}): 413 tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]} 414 415 # replied-to user metadata 416 if "in_reply_to_user_id" in tweet: 417 if tweet["in_reply_to_user_id"] in users_by_id: 418 tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]] 419 elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}): 420 tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]} 421 422 # enrich referenced tweets. Even though there should be no recursion - 423 # since tweets cannot be edited - we do not recursively enrich 424 # referenced tweets (should we?) 425 for index, reference in enumerate(tweet.get("referenced_tweets", [])): 426 if reference["id"] in tweets_by_id: 427 tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)} 428 elif reference["id"] in missing_objects.get('tweet', {}): 429 tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}} 430 431 return tweet
Enrich tweet with user and attachment metadata
Twitter API returns some of the tweet's metadata separately, as 'includes' that can be cross-referenced with a user ID or media key. This makes sense to conserve bandwidth, but also means tweets are not 'standalone' objects as originally returned.
However, for processing, making them standalone greatly reduces complexity, as we can simply read one single tweet object and process that data without worrying about having to get other data from elsewhere. So this method takes the metadata and the original tweet, splices the metadata into it where appropriate, and returns the enriched object.
This is not an efficient way to store things but it is more convenient.
Parameters
- dict tweet: The tweet object
- list users: User metadata, as a list of user objects
- list media: Media metadata, as a list of media objects
- list polls: Poll metadata, as a list of poll objects
- list places: Place metadata, as a list of place objects
- list referenced_tweets: Tweets referenced in the tweet, as a list of tweet objects. These will be enriched in turn.
- dict missing_objects: Dictionary with data on missing objects from the API by type.
Returns
Enriched tweet object
433 def fix_tweet_error(self, tweet_error): 434 """ 435 Add fields as needed by map_tweet and other functions for errors as they 436 do not conform to normal tweet fields. Specifically for ID Lookup as 437 complete tweet could be missing. 438 439 :param dict tweet_error: Tweet error object from the Twitter API 440 :return dict: A tweet object with the relevant fields sanitised 441 """ 442 modified_tweet = tweet_error 443 modified_tweet['id'] = tweet_error.get('resource_id') 444 modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z") 445 modified_tweet['text'] = '' 446 modified_tweet['author_user'] = {} 447 modified_tweet['author_user']['name'] = 'UNKNOWN' 448 modified_tweet['author_user']['username'] = 'UNKNOWN' 449 modified_tweet['author_id'] = 'UNKNOWN' 450 modified_tweet['public_metrics'] = {} 451 452 # putting detail info in 'subject' field which is normally blank for tweets 453 modified_tweet['subject'] = tweet_error.get('detail') 454 455 return modified_tweet
Add fields as needed by map_tweet and other functions for errors as they do not conform to normal tweet fields. Specifically for ID Lookup as complete tweet could be missing.
Parameters
- dict tweet_error: Tweet error object from the Twitter API
Returns
A tweet object with the relevant fields sanitised
457 @classmethod 458 def get_options(cls, parent_dataset=None, config=None): 459 """ 460 Get Twitter data source options 461 462 These are somewhat dynamic, because depending on settings users may or 463 may not need to provide their own API key, and may or may not be able 464 to enter a list of tweet IDs as their query. Hence the method. 465 466 :param config: 467 :param parent_dataset: Should always be None 468 :return dict: Data source options 469 """ 470 have_api_key = config.get("twitterv2-search.academic_api_key") 471 max_tweets = config.get("twitterv2-search.max_tweets") 472 473 if have_api_key: 474 intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve " 475 "historic tweets that match a given query.") 476 477 else: 478 intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the " 479 "it, you must have access to the Research track of the X API. You will need to provide a " 480 "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The " 481 "bearer token **will be sent to the 4CAT server**, where it will be deleted after data " 482 "collection has started. Note that any posts retrieved with 4CAT will count towards your " 483 "monthly post retrieval cap.") 484 485 intro_text += ("\n\nPlease refer to the [X API documentation](" 486 "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) " 487 "documentation for more information about this API endpoint and the syntax you can use in your " 488 "search query. Retweets are included by default; add `-is:retweet` to exclude them.") 489 490 options = { 491 "intro-1": { 492 "type": UserInput.OPTION_INFO, 493 "help": intro_text 494 }, 495 } 496 497 if not have_api_key: 498 # options.update({ 499 # "api_type": { 500 # "type": UserInput.OPTION_CHOICE, 501 # "help": "API track", 502 # "options": { 503 # "all": "Research API: Full-archive search", 504 # "recent": "Standard: Recent search (Tweets published in last 7 days)", 505 # }, 506 # "default": "all" 507 # } 508 # }) 509 options.update({ 510 "api_bearer_token": { 511 "type": UserInput.OPTION_TEXT, 512 "sensitive": True, 513 "cache": True, 514 "help": "API Bearer Token" 515 }, 516 }) 517 518 if config.get("twitterv2.id_lookup"): 519 options.update({ 520 "query_type": { 521 "type": UserInput.OPTION_CHOICE, 522 "help": "Query type", 523 "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup", 524 "options": { 525 "query": "Search query", 526 "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)", 527 }, 528 "default": "query" 529 } 530 }) 531 532 options.update({ 533 "query": { 534 "type": UserInput.OPTION_TEXT_LARGE, 535 "help": "Query" 536 }, 537 "amount": { 538 "type": UserInput.OPTION_TEXT, 539 "help": "Posts to retrieve", 540 "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)), 541 "min": 0, 542 "max": max_tweets if max_tweets else 10_000_000, 543 "default": 10 544 }, 545 "divider-2": { 546 "type": UserInput.OPTION_DIVIDER 547 }, 548 "daterange-info": { 549 "type": UserInput.OPTION_INFO, 550 "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you " 551 "need to explicitly set a date range." 552 }, 553 "daterange": { 554 "type": UserInput.OPTION_DATERANGE, 555 "help": "Date range" 556 }, 557 }) 558 559 return options
Get Twitter data source options
These are somewhat dynamic, because depending on settings users may or may not need to provide their own API key, and may or may not be able to enter a list of tweet IDs as their query. Hence the method.
Parameters
- config:
- parent_dataset: Should always be None
Returns
Data source options
561 @staticmethod 562 def validate_query(query, request, config): 563 """ 564 Validate input for a dataset query on the Twitter data source. 565 566 Will raise a QueryParametersException if invalid parameters are 567 encountered. Parameters are additionally sanitised. 568 569 Will also raise a QueryNeedsExplicitConfirmation if the 'counts' 570 endpoint of the Twitter API indicates that it will take more than 571 30 minutes to collect the dataset. In the front-end, this will 572 trigger a warning and confirmation request. 573 574 :param dict query: Query parameters, from client-side. 575 :param request: Flask request 576 :param ConfigManager|None config: Configuration reader (context-aware) 577 :return dict: Safe query parameters 578 """ 579 have_api_key = config.get("twitterv2-search.academic_api_key") 580 max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000) 581 582 # this is the bare minimum, else we can't narrow down the full data set 583 if not query.get("query", None): 584 raise QueryParametersException("Please provide a query.") 585 586 if not have_api_key: 587 if not query.get("api_bearer_token", None): 588 raise QueryParametersException("Please provide a valid bearer token.") 589 590 if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup": 591 raise QueryParametersException("X API queries cannot be longer than 1024 characters.") 592 593 if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"): 594 # reformat queries to be a comma-separated list with no wrapping 595 # whitespace 596 whitespace = re.compile(r"\s+") 597 items = whitespace.sub("", query.get("query").replace("\n", ",")) 598 # eliminate empty queries 599 twitter_query = ','.join([item for item in items.split(",") if item]) 600 else: 601 twitter_query = query.get("query") 602 603 # the dates need to make sense as a range to search within 604 # but, on Twitter, you can also specify before *or* after only 605 after, before = query.get("daterange") 606 if before and after and before < after: 607 raise QueryParametersException("Date range must start before it ends") 608 609 # if we made it this far, the query can be executed 610 params = { 611 "query": twitter_query, 612 "api_bearer_token": query.get("api_bearer_token"), 613 "api_type": query.get("api_type", "all"), 614 "query_type": query.get("query_type", "query"), 615 "min_date": after, 616 "max_date": before 617 } 618 619 # never query more tweets than allowed 620 tweets_to_collect = convert_to_int(query.get("amount"), 10) 621 622 if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0): 623 tweets_to_collect = max_tweets 624 params["amount"] = tweets_to_collect 625 626 # figure out how many tweets we expect to get back - we can use this 627 # to dissuade users from running huge queries that will take forever 628 # to process 629 if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key): 630 count_url = "https://api.x.com/2/tweets/counts/all" 631 count_params = { 632 "granularity": "day", 633 "query": params["query"], 634 } 635 636 # if we're doing a date range, pass this on to the counts endpoint in 637 # the right format 638 if after: 639 count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ") 640 641 if before: 642 count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ") 643 644 bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key 645 646 expected_tweets = 0 647 while True: 648 response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token}, 649 timeout=15) 650 if response.status_code == 200: 651 try: 652 # figure out how many tweets there are and estimate how much 653 # time it will take to process them. if it's going to take 654 # longer than half an hour, warn the user 655 expected_tweets += int(response.json()["meta"]["total_tweet_count"]) 656 except KeyError: 657 # no harm done, we just don't know how many tweets will be 658 # returned (but they will still be returned) 659 break 660 661 if "next_token" not in response.json().get("meta", {}): 662 break 663 else: 664 count_params["next_token"] = response.json()["meta"]["next_token"] 665 666 elif response.status_code == 401: 667 raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid " 668 "for the Research track of the X API.") 669 670 elif response.status_code == 400: 671 raise QueryParametersException("Your query is invalid. Please make sure the date range does not " 672 "extend into the future, or to before Twitter's founding, and that " 673 "your query is shorter than 1024 characters. Using AND in the query " 674 "is not possible (AND is implied; OR can be used). Use \"and\" to " 675 "search for the literal word.") 676 677 else: 678 # we can still continue without the expected tweets 679 break 680 681 warning = "" 682 if expected_tweets: 683 collectible_tweets = min(max_tweets, params["amount"]) 684 if collectible_tweets == 0: 685 collectible_tweets = max_tweets 686 687 if collectible_tweets > 0: 688 if collectible_tweets < expected_tweets: 689 warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets) 690 real_expected_tweets = min(expected_tweets, collectible_tweets) 691 else: 692 real_expected_tweets = expected_tweets 693 694 expected_seconds = int(real_expected_tweets / 30) # seems to be about this 695 expected_time = timify(expected_seconds) 696 params["expected-tweets"] = expected_tweets 697 698 if expected_seconds > 900: 699 warning += ". Collection will take approximately %s." % expected_time 700 701 if warning and not query.get("frontend-confirm"): 702 warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning) 703 warning += " Do you want to continue?" 704 raise QueryNeedsExplicitConfirmationException(warning) 705 706 params["amount"] = min(params["amount"], expected_tweets) 707 if max_tweets: 708 params["amount"] = min(max_tweets, params["amount"]) 709 710 return params
Validate input for a dataset query on the Twitter data source.
Will raise a QueryParametersException if invalid parameters are encountered. Parameters are additionally sanitised.
Will also raise a QueryNeedsExplicitConfirmation if the 'counts' endpoint of the Twitter API indicates that it will take more than 30 minutes to collect the dataset. In the front-end, this will trigger a warning and confirmation request.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
712 @staticmethod 713 def map_item(item): 714 """ 715 Map a nested Tweet object to a flat dictionary 716 717 Tweet objects are quite rich but 4CAT expects flat dictionaries per 718 item in many cases. Since it would be a shame to not store the data 719 retrieved from Twitter that cannot be stored in a flat file, we store 720 the full objects and only map them to a flat dictionary when needed. 721 This has a speed (and disk space) penalty, but makes sure we don't 722 throw away valuable data and allows for later changes that e.g. store 723 the tweets more efficiently as a MongoDB collection. 724 725 :param item: Tweet object as originally returned by the Twitter API 726 :return dict: Dictionary in the format expected by 4CAT 727 """ 728 tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z") 729 730 # For backward compatibility 731 author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"] 732 author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"] 733 author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else "" 734 735 hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])] 736 mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])] 737 urls = [tag.get("expanded_url", tag.get("display_url")) for tag in item.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)] 738 images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 739 video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 740 741 # by default, the text of retweets is returned as "RT [excerpt of 742 # retweeted tweet]". Since we have the full tweet text, we can complete 743 # the excerpt: 744 is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])]) 745 if is_retweet: 746 retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0] 747 if retweeted_tweet.get("text", False): 748 retweeted_body = retweeted_tweet.get("text") 749 # Get user's username that was retweeted 750 if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'): 751 item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body 752 elif item.get('entities', {}).get('mentions', []): 753 # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted 754 retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')] 755 if retweeting_users: 756 # should only ever be one, but this verifies that there IS one and not NONE 757 item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body 758 759 retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched 760 761 # Retweet entities are only included in the retweet if they occur in the first 140 characters 762 # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense 763 [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])] 764 [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])] 765 [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)] 766 # Images appear to be inheritted by retweets, but just in case 767 [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"] 768 [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"] 769 770 is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])]) 771 is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])]) 772 773 videos = [] 774 for video in video_items: 775 variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True) 776 if variants: 777 videos.append(variants[0].get('url')) 778 779 expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"} 780 public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics} 781 missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]] 782 warning = "" 783 if missing_metrics: 784 warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports." 785 786 return MappedItem({ 787 "id": item["id"], 788 "thread_id": item.get("conversation_id", item["id"]), 789 "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"), 790 "unix_timestamp": int(tweet_time.timestamp()), 791 'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')), 792 "subject": "", 793 "body": item["text"], 794 "author": author_username, 795 "author_fullname": author_fullname, 796 "author_id": item["author_id"], 797 "author_followers": author_followers, 798 "source": item.get("source"), 799 "language_guess": item.get("lang"), 800 "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no", 801 **public_metrics, 802 "is_retweet": "yes" if is_retweet else "no", 803 "retweeted_user": "" if not is_retweet else retweeted_user, 804 "is_quote_tweet": "yes" if is_quoted else "no", 805 "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""), 806 "is_reply": "yes" if is_reply else "no", 807 "replied_user": item.get("in_reply_to_user", {}).get("username", ""), 808 "hashtags": ','.join(set(hashtags)), 809 "urls": ','.join(set(urls)), 810 "images": ','.join(set(images)), 811 "videos": ','.join(set(videos)), 812 "mentions": ','.join(set(mentions)), 813 "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]), 814 'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''), 815 }, message=warning)
Map a nested Tweet object to a flat dictionary
Tweet objects are quite rich but 4CAT expects flat dictionaries per item in many cases. Since it would be a shame to not store the data retrieved from Twitter that cannot be stored in a flat file, we store the full objects and only map them to a flat dictionary when needed. This has a speed (and disk space) penalty, but makes sure we don't throw away valuable data and allows for later changes that e.g. store the tweets more efficiently as a MongoDB collection.
Parameters
- item: Tweet object as originally returned by the Twitter API
Returns
Dictionary in the format expected by 4CAT
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- description
- category
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor