Edit on GitHub

datasources.twitterv2.search_twitter

X/Twitter keyword search via the X API v2

  1"""
  2X/Twitter keyword search via the X API v2
  3"""
  4import requests
  5import datetime
  6import copy
  7import time
  8import json
  9import re
 10
 11from backend.lib.search import Search
 12from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, QueryNeedsExplicitConfirmationException
 13from common.lib.helpers import convert_to_int, UserInput, timify_long
 14from common.config_manager import config
 15from common.lib.item_mapping import MappedItem, MissingMappedField
 16
 17
 18class SearchWithTwitterAPIv2(Search):
 19    """
 20    Get Tweets via the X API
 21    """
 22    type = "twitterv2-search"  # job ID
 23    title = "X/Twitter API (v2)"
 24    extension = "ndjson"
 25    is_local = False    # Whether this datasource is locally scraped
 26    is_static = False   # Whether this datasource is still updated
 27
 28    previous_request = 0
 29    import_issues = True
 30
 31    references = [
 32        "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)"
 33    ]
 34
 35    config = {
 36        "twitterv2-search.academic_api_key": {
 37            "type": UserInput.OPTION_TEXT,
 38            "default": "",
 39            "help": "Research API Key",
 40            "tooltip": "An API key for the X/Twitter v2 Research API. If "
 41                       "provided, the user will not need to enter their own "
 42                       "key to retrieve tweets. Note that this API key should "
 43                       "have access to the Full Archive Search endpoint."
 44        },
 45        "twitterv2-search.max_tweets": {
 46            "type": UserInput.OPTION_TEXT,
 47            "default": 0,
 48            "min": 0,
 49            "max": 10_000_000,
 50            "help": "Max posts per dataset",
 51            "tooltip": "4CAT will never retrieve more than this amount of "
 52                       "posts per dataset. Enter '0' for unlimited posts."
 53        },
 54        "twitterv2-search.id_lookup": {
 55            "type": UserInput.OPTION_TOGGLE,
 56            "default": False,
 57            "help": "Allow lookup by ID",
 58            "tooltip": "If enabled, allow users to enter a list of post IDs "
 59                       "to retrieve. This is disabled by default because it "
 60                       "can be confusing to novice users."
 61        }
 62    }
 63
 64    def get_items(self, query):
 65        """
 66        Use the Twitter v2 API historical search to get tweets
 67
 68        :param query:
 69        :return:
 70        """
 71        # Compile any errors to highlight at end of log
 72        error_report = []
 73        # this is pretty sensitive so delete it immediately after storing in
 74        # memory
 75        have_api_key = self.config.get("twitterv2-search.academic_api_key")
 76        bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key
 77        api_type = query.get("api_type", "all") if not have_api_key else "all"
 78        auth = {"Authorization": "Bearer %s" % bearer_token}
 79        expected_tweets = query.get("expected-tweets", "unknown")
 80
 81        # these are all expansions and fields available at the time of writing
 82        # since it does not cost anything extra in terms of rate limiting, go
 83        # for as much data per tweet as possible...
 84        tweet_fields = (
 85        "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id",
 86        "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings",
 87        "source", "text", "withheld")
 88        user_fields = (
 89        "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url",
 90        "protected", "public_metrics", "url", "username", "verified", "withheld")
 91        place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type")
 92        poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status")
 93        expansions = (
 94        "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id",
 95        "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id")
 96        media_fields = (
 97        "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants",
 98        "alt_text")
 99
100        params = {
101            "expansions": ",".join(expansions),
102            "tweet.fields": ",".join(tweet_fields),
103            "user.fields": ",".join(user_fields),
104            "poll.fields": ",".join(poll_fields),
105            "place.fields": ",".join(place_fields),
106            "media.fields": ",".join(media_fields),
107        }
108
109        if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
110            endpoint = "https://api.x.com/2/tweets"
111
112            tweet_ids = self.parameters.get("query", []).split(',')
113
114            # Only can lookup 100 tweets in each query per Twitter API
115            chunk_size = 100
116            queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)]
117            expected_tweets = len(tweet_ids)
118
119            amount = len(tweet_ids)
120
121            # Initiate collection of any IDs that are unavailable
122            collected_errors = []
123
124        else:
125            # Query to all or search
126            endpoint = "https://api.x.com/2/tweets/search/" + api_type
127
128            queries = [self.parameters.get("query", "")]
129
130            amount = convert_to_int(self.parameters.get("amount"), 10)
131
132            params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100  # 100 = upper limit, 10 = lower
133
134            if self.parameters.get("min_date"):
135                params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime(
136                    "%Y-%m-%dT%H:%M:%SZ")
137
138            if self.parameters.get("max_date"):
139                params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime(
140                    "%Y-%m-%dT%H:%M:%SZ")
141
142        if type(expected_tweets) is int:
143            num_expected_tweets = expected_tweets
144            expected_tweets = "{:,}".format(expected_tweets)
145        else:
146            num_expected_tweets = None
147
148        tweets = 0
149        for query in queries:
150            if self.parameters.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"):
151                params['ids'] = query
152            else:
153                params['query'] = query
154            self.dataset.log("Search parameters: %s" % repr(params))
155            while True:
156
157                if self.interrupted:
158                    raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API")
159
160                # there is a limit of one request per second, so stay on the safe side of this
161                while self.previous_request == int(time.time()):
162                    time.sleep(0.1)
163                time.sleep(0.05)
164                self.previous_request = int(time.time())
165
166                # now send the request, allowing for at least 5 retries if the connection seems unstable
167                retries = 5
168                api_response = None
169                while retries > 0:
170                    try:
171                        api_response = requests.get(endpoint, headers=auth, params=params, timeout=30)
172                        break
173                    except (ConnectionError, requests.exceptions.RequestException) as e:
174                        retries -= 1
175                        wait_time = (5 - retries) * 10
176                        self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time))
177                        time.sleep(wait_time)
178
179                # rate limited - the limit at time of writing is 300 reqs per 15
180                # minutes
181                # usually you don't hit this when requesting batches of 500 at
182                # 1/second, but this is also returned when the user reaches the
183                # monthly tweet cap, albeit with different content in that case
184                if api_response.status_code == 429:
185                    try:
186                        structured_response = api_response.json()
187                        if structured_response.get("title") == "UsageCapExceeded":
188                            self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts "
189                                                       "until your API quota resets. Dataset completed with posts "
190                                                       "collected so far.", is_final=True)
191                            return
192                    except (json.JSONDecodeError, ValueError):
193                        self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting "
194                                                   "post collection.", is_final=True)
195                        return
196
197                    resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1
198                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
199                    self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str)
200                    while time.time() <= resume_at:
201                        if self.interrupted:
202                            raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset")
203                        time.sleep(0.5)
204                    continue
205
206                # API keys that are valid but don't have access or haven't been
207                # activated properly get a 403
208                elif api_response.status_code == 403:
209                    try:
210                        structured_response = api_response.json()
211                        self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API "
212                                                   "with this API key. %s" % structured_response.get("detail", ""), is_final=True)
213                    except (json.JSONDecodeError, ValueError):
214                        self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to "
215                                                   "the full-archive search endpoint.", is_final=True)
216                    finally:
217                        return
218
219                # sometimes twitter says '503 service unavailable' for unclear
220                # reasons - in that case just wait a while and try again
221                elif api_response.status_code in (502, 503, 504):
222                    resume_at = time.time() + 60
223                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
224                    self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % (
225                    api_response.status_code, resume_at_str))
226                    while time.time() <= resume_at:
227                        time.sleep(0.5)
228                    continue
229
230                # this usually means the query is too long or otherwise contains
231                # a syntax error
232                elif api_response.status_code == 400:
233                    msg = "Response %i from the X API; " % api_response.status_code
234                    try:
235                        api_response = api_response.json()
236                        msg += api_response.get("title", "")
237                        if "detail" in api_response:
238                            msg += ": " + api_response.get("detail", "")
239                    except (json.JSONDecodeError, TypeError):
240                        msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long."
241
242                    self.dataset.update_status(msg, is_final=True)
243                    return
244
245                # invalid API key
246                elif api_response.status_code == 401:
247                    self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True)
248                    return
249
250                # haven't seen one yet, but they probably exist
251                elif api_response.status_code != 200:
252                    self.dataset.update_status(
253                        "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True)
254                    self.log.warning("X API v2 responded with status code %i. Response body: %s" % (
255                    api_response.status_code, api_response.text))
256                    return
257
258                elif not api_response:
259                    self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True)
260                    return
261
262                api_response = api_response.json()
263
264                # The API response contains tweets (of course) and 'includes',
265                # objects that can be referenced in tweets. Later we will splice
266                # this data into the tweets themselves to make them easier to
267                # process. So extract them first...
268                included_users = api_response.get("includes", {}).get("users", {})
269                included_media = api_response.get("includes", {}).get("media", {})
270                included_polls = api_response.get("includes", {}).get("polls", {})
271                included_tweets = api_response.get("includes", {}).get("tweets", {})
272                included_places = api_response.get("includes", {}).get("places", {})
273
274                # Collect missing objects from Twitter API response by type
275                missing_objects = {}
276                for missing_object in api_response.get("errors", {}):
277                    parameter_type = missing_object.get('resource_type', 'unknown')
278                    if parameter_type in missing_objects:
279                        missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object
280                    else:
281                        missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object}
282                num_missing_objects = sum([len(v) for v in missing_objects.values()])
283
284                # Record any missing objects in log
285                if num_missing_objects > 0:
286                    # Log amount
287                    self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
288                if num_missing_objects > 50:
289                    # Large amount of missing objects; possible error with Twitter API
290                    self.import_issues = False
291                    error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets))
292                    error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
293
294                # Warn if new missing object is recorded (for developers to handle)
295                expected_error_types = ['user', 'media', 'poll', 'tweet', 'place']
296                if any(key not in expected_error_types for key in missing_objects.keys()):
297                    self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types]))
298
299                # Loop through and collect tweets
300                for tweet in api_response.get("data", []):
301
302                    if 0 < amount <= tweets:
303                        break
304
305                    # splice referenced data back in
306                    # we use copy.deepcopy here because else we run into a
307                    # pass-by-reference quagmire
308                    tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects)
309
310                    tweets += 1
311                    if tweets % 500 == 0:
312                        self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets))
313                        if num_expected_tweets is not None:
314                            self.dataset.update_progress(tweets / num_expected_tweets)
315
316                    yield tweet
317
318                if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
319                    # If id_lookup return errors in collecting tweets
320                    for tweet_error in api_response.get("errors", []):
321                        tweet_id = str(tweet_error.get('resource_id'))
322                        if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors:
323                            tweet_error = self.fix_tweet_error(tweet_error)
324                            collected_errors.append(tweet_id)
325                            yield tweet_error
326
327                # paginate
328                if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]:
329                    params["next_token"] = api_response["meta"]["next_token"]
330                else:
331                    break
332
333        if not self.import_issues:
334            self.dataset.log('Error Report:\n' + '\n'.join(error_report))
335            self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)
336
337    def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
338        """
339        Enrich tweet with user and attachment metadata
340
341        Twitter API returns some of the tweet's metadata separately, as
342        'includes' that can be cross-referenced with a user ID or media key.
343        This makes sense to conserve bandwidth, but also means tweets are not
344        'standalone' objects as originally returned.
345
346        However, for processing, making them standalone greatly reduces
347        complexity, as we can simply read one single tweet object and process
348        that data without worrying about having to get other data from
349        elsewhere. So this method takes the metadata and the original tweet,
350        splices the metadata into it where appropriate, and returns the
351        enriched object.
352
353        /!\ This is not an efficient way to store things /!\ but it is more
354        convenient.
355
356        :param dict tweet:  The tweet object
357        :param list users:  User metadata, as a list of user objects
358        :param list media:  Media metadata, as a list of media objects
359        :param list polls:  Poll metadata, as a list of poll objects
360        :param list places:  Place metadata, as a list of place objects
361        :param list referenced_tweets:  Tweets referenced in the tweet, as a
362        list of tweet objects. These will be enriched in turn.
363        :param dict missing_objects: Dictionary with data on missing objects
364                from the API by type.
365
366        :return dict:  Enriched tweet object
367        """
368        # Copy the tweet so that updating this tweet has no effect on others
369        tweet = copy.deepcopy(tweet)
370        # first create temporary mappings so we can easily find the relevant
371        # object later
372        users_by_id = {user["id"]: user for user in users}
373        users_by_name = {user["username"]: user for user in users}
374        media_by_key = {item["media_key"]: item for item in media}
375        polls_by_id = {poll["id"]: poll for poll in polls}
376        places_by_id = {place["id"]: place for place in places}
377        tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets}
378
379        # add tweet author metadata
380        tweet["author_user"] = users_by_id.get(tweet["author_id"])
381
382        # add place to geo metadata
383        # referenced_tweets also contain place_id, but these places may not included in the place objects
384        if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id:
385            tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id"))
386        elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}):
387            tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {})
388
389        # add user metadata for mentioned users
390        for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])):
391            if mention["username"] in users_by_name:
392                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])}
393            # missing users can be stored by either user ID or Username in Twitter API's error data; we check both
394            elif mention["username"] in missing_objects.get('user', {}):
395                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}}
396            elif mention["id"] in missing_objects.get('user', {}):
397                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}}
398
399
400        # add poll metadata
401        for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])):
402            if poll_id in polls_by_id:
403                tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id]
404            elif poll_id in missing_objects.get('poll', {}):
405                tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]}
406
407        # add media metadata - seems to be just the media type, the media URL
408        # etc is stored in the 'entities' attribute instead
409        for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])):
410            if media_key in media_by_key:
411                tweet["attachments"]["media_keys"][index] = media_by_key[media_key]
412            elif media_key in missing_objects.get('media', {}):
413                tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]}
414
415        # replied-to user metadata
416        if "in_reply_to_user_id" in tweet:
417            if tweet["in_reply_to_user_id"] in users_by_id:
418                tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]]
419            elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}):
420                tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]}
421
422        # enrich referenced tweets. Even though there should be no recursion -
423        # since tweets cannot be edited - we do not recursively enrich
424        # referenced tweets (should we?)
425        for index, reference in enumerate(tweet.get("referenced_tweets", [])):
426            if reference["id"] in tweets_by_id:
427                tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)}
428            elif reference["id"] in missing_objects.get('tweet', {}):
429                tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}}
430
431        return tweet
432
433    def fix_tweet_error(self, tweet_error):
434        """
435        Add fields as needed by map_tweet and other functions for errors as they
436        do not conform to normal tweet fields. Specifically for ID Lookup as
437        complete tweet could be missing.
438
439        :param dict tweet_error: Tweet error object from the Twitter API
440        :return dict:  A tweet object with the relevant fields sanitised
441        """
442        modified_tweet = tweet_error
443        modified_tweet['id'] = tweet_error.get('resource_id')
444        modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
445        modified_tweet['text'] = ''
446        modified_tweet['author_user'] = {}
447        modified_tweet['author_user']['name'] = 'UNKNOWN'
448        modified_tweet['author_user']['username'] = 'UNKNOWN'
449        modified_tweet['author_id'] = 'UNKNOWN'
450        modified_tweet['public_metrics'] = {}
451
452        # putting detail info in 'subject' field which is normally blank for tweets
453        modified_tweet['subject'] = tweet_error.get('detail')
454
455        return modified_tweet
456
457    @classmethod
458    def get_options(cls, parent_dataset=None, user=None):
459        """
460        Get Twitter data source options
461
462        These are somewhat dynamic, because depending on settings users may or
463        may not need to provide their own API key, and may or may not be able
464        to enter a list of tweet IDs as their query. Hence the method.
465
466        :param parent_dataset:  Should always be None
467        :param user:  User to provide options for
468        :return dict:  Data source options
469        """
470        have_api_key = config.get("twitterv2-search.academic_api_key", user=user)
471        max_tweets = config.get("twitterv2-search.max_tweets", user=user)
472
473        if have_api_key:
474            intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve "
475                          "historic tweets that match a given query.")
476
477        else:
478            intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the "
479                          "it, you must have access  to the Research track of the X API. You will need to provide a "
480                          "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The "
481                          "bearer token **will be sent to the 4CAT server**, where it will be deleted after data "
482                          "collection has started. Note that any posts retrieved with 4CAT will count towards your "
483                          "monthly post retrieval cap.")
484
485        intro_text += ("\n\nPlease refer to the [X API documentation]("
486                          "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) "
487                          "documentation for more information about this API endpoint and the syntax you can use in your "
488                          "search query. Retweets are included by default; add `-is:retweet` to exclude them.")
489
490        options = {
491            "intro-1": {
492                "type": UserInput.OPTION_INFO,
493                "help": intro_text
494            },
495        }
496
497        if not have_api_key:
498            # options.update({
499            #     "api_type": {
500            #         "type": UserInput.OPTION_CHOICE,
501            #         "help": "API track",
502            #         "options": {
503            #             "all": "Research API: Full-archive search",
504            #             "recent": "Standard: Recent search (Tweets published in last 7 days)",
505            #         },
506            #         "default": "all"
507            #     }
508            # })
509            options.update({
510                "api_bearer_token": {
511                    "type": UserInput.OPTION_TEXT,
512                    "sensitive": True,
513                    "cache": True,
514                    "help": "API Bearer Token"
515                },
516            })
517
518        if config.get("twitterv2.id_lookup", user=user):
519            options.update({
520                "query_type": {
521                    "type": UserInput.OPTION_CHOICE,
522                    "help": "Query type",
523                    "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup",
524                    "options": {
525                        "query": "Search query",
526                        "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)",
527                    },
528                    "default": "query"
529                }
530            })
531
532        options.update({
533            "query": {
534                "type": UserInput.OPTION_TEXT_LARGE,
535                "help": "Query"
536            },
537            "amount": {
538                "type": UserInput.OPTION_TEXT,
539                "help": "Posts to retrieve",
540                "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)),
541                "min": 0,
542                "max": max_tweets if max_tweets else 10_000_000,
543                "default": 10
544            },
545            "divider-2": {
546                "type": UserInput.OPTION_DIVIDER
547            },
548            "daterange-info": {
549                "type": UserInput.OPTION_INFO,
550                "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you "
551                        "need to explicitly set a date range."
552            },
553            "daterange": {
554                "type": UserInput.OPTION_DATERANGE,
555                "help": "Date range"
556            },
557        })
558
559        return options
560
561    @staticmethod
562    def validate_query(query, request, user):
563        """
564        Validate input for a dataset query on the Twitter data source.
565
566        Will raise a QueryParametersException if invalid parameters are
567        encountered. Parameters are additionally sanitised.
568
569        Will also raise a QueryNeedsExplicitConfirmation if the 'counts'
570        endpoint of the Twitter API indicates that it will take more than
571        30 minutes to collect the dataset. In the front-end, this will
572        trigger a warning and confirmation request.
573
574        :param dict query:  Query parameters, from client-side.
575        :param request:  Flask request
576        :param User user:  User object of user who has submitted the query
577        :return dict:  Safe query parameters
578        """
579        have_api_key = config.get("twitterv2-search.academic_api_key", user=user)
580        max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000, user=user)
581
582        # this is the bare minimum, else we can't narrow down the full data set
583        if not query.get("query", None):
584            raise QueryParametersException("Please provide a query.")
585
586        if not have_api_key:
587            if not query.get("api_bearer_token", None):
588                raise QueryParametersException("Please provide a valid bearer token.")
589
590        if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup":
591            raise QueryParametersException("X API queries cannot be longer than 1024 characters.")
592
593        if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup", user=user):
594            # reformat queries to be a comma-separated list with no wrapping
595            # whitespace
596            whitespace = re.compile(r"\s+")
597            items = whitespace.sub("", query.get("query").replace("\n", ","))
598            # eliminate empty queries
599            twitter_query = ','.join([item for item in items.split(",") if item])
600        else:
601            twitter_query = query.get("query")
602
603        # the dates need to make sense as a range to search within
604        # but, on Twitter, you can also specify before *or* after only
605        after, before = query.get("daterange")
606        if before and after and before < after:
607            raise QueryParametersException("Date range must start before it ends")
608
609        # if we made it this far, the query can be executed
610        params = {
611            "query": twitter_query,
612            "api_bearer_token": query.get("api_bearer_token"),
613            "api_type": query.get("api_type", "all"),
614            "query_type": query.get("query_type", "query"),
615            "min_date": after,
616            "max_date": before
617        }
618
619        # never query more tweets than allowed
620        tweets_to_collect = convert_to_int(query.get("amount"), 10)
621
622        if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0):
623            tweets_to_collect = max_tweets
624        params["amount"] = tweets_to_collect
625
626        # figure out how many tweets we expect to get back - we can use this
627        # to dissuade users from running huge queries that will take forever
628        # to process
629        if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key):
630            count_url = "https://api.x.com/2/tweets/counts/all"
631            count_params = {
632                "granularity": "day",
633                "query": params["query"],
634            }
635
636            # if we're doing a date range, pass this on to the counts endpoint in
637            # the right format
638            if after:
639                count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ")
640
641            if before:
642                count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ")
643
644            bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key
645
646            expected_tweets = 0
647            while True:
648                response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token},
649                                        timeout=15)
650                if response.status_code == 200:
651                    try:
652                        # figure out how many tweets there are and estimate how much
653                        # time it will take to process them. if it's going to take
654                        # longer than half an hour, warn the user
655                        expected_tweets += int(response.json()["meta"]["total_tweet_count"])
656                    except KeyError:
657                        # no harm done, we just don't know how many tweets will be
658                        # returned (but they will still be returned)
659                        break
660
661                    if "next_token" not in response.json().get("meta", {}):
662                        break
663                    else:
664                        count_params["next_token"] = response.json()["meta"]["next_token"]
665
666                elif response.status_code == 401:
667                    raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid "
668                                                   "for the Research track of the X API.")
669
670                elif response.status_code == 400:
671                    raise QueryParametersException("Your query is invalid. Please make sure the date range does not "
672                                                   "extend into the future, or to before Twitter's founding, and that "
673                                                   "your query is shorter than 1024 characters. Using AND in the query "
674                                                   "is not possible (AND is implied; OR can be used). Use \"and\" to "
675                                                   "search for the literal word.")
676
677                else:
678                    # we can still continue without the expected tweets
679                    break
680
681            warning = ""
682            if expected_tweets:
683                collectible_tweets = min(max_tweets, params["amount"])
684                if collectible_tweets == 0:
685                    collectible_tweets = max_tweets
686
687                if collectible_tweets > 0:
688                    if collectible_tweets < expected_tweets:
689                        warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets)
690                    real_expected_tweets = min(expected_tweets, collectible_tweets)
691                else:
692                    real_expected_tweets = expected_tweets
693
694                expected_seconds = int(real_expected_tweets / 30)  # seems to be about this
695                expected_time = timify_long(expected_seconds)
696                params["expected-tweets"] = expected_tweets
697
698                if expected_seconds > 900:
699                    warning += ". Collection will take approximately %s." % expected_time
700
701            if warning and not query.get("frontend-confirm"):
702                warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning)
703                warning += " Do you want to continue?"
704                raise QueryNeedsExplicitConfirmationException(warning)
705
706            params["amount"] = min(params["amount"], expected_tweets)
707            if max_tweets:
708                params["amount"] = min(max_tweets, params["amount"])
709
710        return params
711
712    @staticmethod
713    def map_item(item):
714        """
715        Map a nested Tweet object to a flat dictionary
716
717        Tweet objects are quite rich but 4CAT expects flat dictionaries per
718        item in many cases. Since it would be a shame to not store the data
719        retrieved from Twitter that cannot be stored in a flat file, we store
720        the full objects and only map them to a flat dictionary when needed.
721        This has a speed (and disk space) penalty, but makes sure we don't
722        throw away valuable data and allows for later changes that e.g. store
723        the tweets more efficiently as a MongoDB collection.
724
725        :param item:  Tweet object as originally returned by the Twitter API
726        :return dict:  Dictionary in the format expected by 4CAT
727        """
728        tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z")
729
730        # For backward compatibility
731        author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"]
732        author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"]
733        author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else ""
734
735        hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])]
736        mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])]
737        urls = [tag.get("expanded_url", tag["display_url"]) for tag in item.get("entities", {}).get("urls", [])]
738        images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
739        video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
740
741        # by default, the text of retweets is returned as "RT [excerpt of
742        # retweeted tweet]". Since we have the full tweet text, we can complete
743        # the excerpt:
744        is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])])
745        if is_retweet:
746            retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0]
747            if retweeted_tweet.get("text", False):
748                retweeted_body = retweeted_tweet.get("text")
749                # Get user's username that was retweeted
750                if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'):
751                    item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body
752                elif item.get('entities', {}).get('mentions', []):
753                    # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted
754                    retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')]
755                    if retweeting_users:
756                        # should only ever be one, but this verifies that there IS one and not NONE
757                        item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body
758
759            retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched
760
761            # Retweet entities are only included in the retweet if they occur in the first 140 characters
762            # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense
763            [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])]
764            [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])]
765            [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", [])]
766            # Images appear to be inheritted by retweets, but just in case
767            [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
768            [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
769
770        is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])])
771        is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])])
772
773        videos = []
774        for video in video_items:
775            variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True)
776            if variants:
777                videos.append(variants[0].get('url'))
778
779        expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"}
780        public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics}
781        missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]]
782        warning = ""
783        if missing_metrics:
784            warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports."
785
786        return MappedItem({
787            "id": item["id"],
788            "thread_id": item.get("conversation_id", item["id"]),
789            "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"),
790            "unix_timestamp": int(tweet_time.timestamp()),
791            'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')),
792            "subject": "",
793            "body": item["text"],
794            "author": author_username,
795            "author_fullname": author_fullname,
796            "author_id": item["author_id"],
797            "author_followers": author_followers,
798            "source": item.get("source"),
799            "language_guess": item.get("lang"),
800            "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no",
801            **public_metrics,
802            "is_retweet": "yes" if is_retweet else "no",
803            "retweeted_user": "" if not is_retweet else retweeted_user,
804            "is_quote_tweet": "yes" if is_quoted else "no",
805            "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""),
806            "is_reply": "yes" if is_reply else "no",
807            "replied_user": item.get("in_reply_to_user", {}).get("username", ""),
808            "hashtags": ','.join(set(hashtags)),
809            "urls": ','.join(set(urls)),
810            "images": ','.join(set(images)),
811            "videos": ','.join(set(videos)),
812            "mentions": ','.join(set(mentions)),
813            "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]),
814            'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''),
815        }, message=warning)
class SearchWithTwitterAPIv2(backend.lib.search.Search):
 19class SearchWithTwitterAPIv2(Search):
 20    """
 21    Get Tweets via the X API
 22    """
 23    type = "twitterv2-search"  # job ID
 24    title = "X/Twitter API (v2)"
 25    extension = "ndjson"
 26    is_local = False    # Whether this datasource is locally scraped
 27    is_static = False   # Whether this datasource is still updated
 28
 29    previous_request = 0
 30    import_issues = True
 31
 32    references = [
 33        "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)"
 34    ]
 35
 36    config = {
 37        "twitterv2-search.academic_api_key": {
 38            "type": UserInput.OPTION_TEXT,
 39            "default": "",
 40            "help": "Research API Key",
 41            "tooltip": "An API key for the X/Twitter v2 Research API. If "
 42                       "provided, the user will not need to enter their own "
 43                       "key to retrieve tweets. Note that this API key should "
 44                       "have access to the Full Archive Search endpoint."
 45        },
 46        "twitterv2-search.max_tweets": {
 47            "type": UserInput.OPTION_TEXT,
 48            "default": 0,
 49            "min": 0,
 50            "max": 10_000_000,
 51            "help": "Max posts per dataset",
 52            "tooltip": "4CAT will never retrieve more than this amount of "
 53                       "posts per dataset. Enter '0' for unlimited posts."
 54        },
 55        "twitterv2-search.id_lookup": {
 56            "type": UserInput.OPTION_TOGGLE,
 57            "default": False,
 58            "help": "Allow lookup by ID",
 59            "tooltip": "If enabled, allow users to enter a list of post IDs "
 60                       "to retrieve. This is disabled by default because it "
 61                       "can be confusing to novice users."
 62        }
 63    }
 64
 65    def get_items(self, query):
 66        """
 67        Use the Twitter v2 API historical search to get tweets
 68
 69        :param query:
 70        :return:
 71        """
 72        # Compile any errors to highlight at end of log
 73        error_report = []
 74        # this is pretty sensitive so delete it immediately after storing in
 75        # memory
 76        have_api_key = self.config.get("twitterv2-search.academic_api_key")
 77        bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key
 78        api_type = query.get("api_type", "all") if not have_api_key else "all"
 79        auth = {"Authorization": "Bearer %s" % bearer_token}
 80        expected_tweets = query.get("expected-tweets", "unknown")
 81
 82        # these are all expansions and fields available at the time of writing
 83        # since it does not cost anything extra in terms of rate limiting, go
 84        # for as much data per tweet as possible...
 85        tweet_fields = (
 86        "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id",
 87        "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings",
 88        "source", "text", "withheld")
 89        user_fields = (
 90        "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url",
 91        "protected", "public_metrics", "url", "username", "verified", "withheld")
 92        place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type")
 93        poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status")
 94        expansions = (
 95        "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id",
 96        "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id")
 97        media_fields = (
 98        "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants",
 99        "alt_text")
100
101        params = {
102            "expansions": ",".join(expansions),
103            "tweet.fields": ",".join(tweet_fields),
104            "user.fields": ",".join(user_fields),
105            "poll.fields": ",".join(poll_fields),
106            "place.fields": ",".join(place_fields),
107            "media.fields": ",".join(media_fields),
108        }
109
110        if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
111            endpoint = "https://api.x.com/2/tweets"
112
113            tweet_ids = self.parameters.get("query", []).split(',')
114
115            # Only can lookup 100 tweets in each query per Twitter API
116            chunk_size = 100
117            queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)]
118            expected_tweets = len(tweet_ids)
119
120            amount = len(tweet_ids)
121
122            # Initiate collection of any IDs that are unavailable
123            collected_errors = []
124
125        else:
126            # Query to all or search
127            endpoint = "https://api.x.com/2/tweets/search/" + api_type
128
129            queries = [self.parameters.get("query", "")]
130
131            amount = convert_to_int(self.parameters.get("amount"), 10)
132
133            params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100  # 100 = upper limit, 10 = lower
134
135            if self.parameters.get("min_date"):
136                params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime(
137                    "%Y-%m-%dT%H:%M:%SZ")
138
139            if self.parameters.get("max_date"):
140                params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime(
141                    "%Y-%m-%dT%H:%M:%SZ")
142
143        if type(expected_tweets) is int:
144            num_expected_tweets = expected_tweets
145            expected_tweets = "{:,}".format(expected_tweets)
146        else:
147            num_expected_tweets = None
148
149        tweets = 0
150        for query in queries:
151            if self.parameters.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"):
152                params['ids'] = query
153            else:
154                params['query'] = query
155            self.dataset.log("Search parameters: %s" % repr(params))
156            while True:
157
158                if self.interrupted:
159                    raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API")
160
161                # there is a limit of one request per second, so stay on the safe side of this
162                while self.previous_request == int(time.time()):
163                    time.sleep(0.1)
164                time.sleep(0.05)
165                self.previous_request = int(time.time())
166
167                # now send the request, allowing for at least 5 retries if the connection seems unstable
168                retries = 5
169                api_response = None
170                while retries > 0:
171                    try:
172                        api_response = requests.get(endpoint, headers=auth, params=params, timeout=30)
173                        break
174                    except (ConnectionError, requests.exceptions.RequestException) as e:
175                        retries -= 1
176                        wait_time = (5 - retries) * 10
177                        self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time))
178                        time.sleep(wait_time)
179
180                # rate limited - the limit at time of writing is 300 reqs per 15
181                # minutes
182                # usually you don't hit this when requesting batches of 500 at
183                # 1/second, but this is also returned when the user reaches the
184                # monthly tweet cap, albeit with different content in that case
185                if api_response.status_code == 429:
186                    try:
187                        structured_response = api_response.json()
188                        if structured_response.get("title") == "UsageCapExceeded":
189                            self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts "
190                                                       "until your API quota resets. Dataset completed with posts "
191                                                       "collected so far.", is_final=True)
192                            return
193                    except (json.JSONDecodeError, ValueError):
194                        self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting "
195                                                   "post collection.", is_final=True)
196                        return
197
198                    resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1
199                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
200                    self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str)
201                    while time.time() <= resume_at:
202                        if self.interrupted:
203                            raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset")
204                        time.sleep(0.5)
205                    continue
206
207                # API keys that are valid but don't have access or haven't been
208                # activated properly get a 403
209                elif api_response.status_code == 403:
210                    try:
211                        structured_response = api_response.json()
212                        self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API "
213                                                   "with this API key. %s" % structured_response.get("detail", ""), is_final=True)
214                    except (json.JSONDecodeError, ValueError):
215                        self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to "
216                                                   "the full-archive search endpoint.", is_final=True)
217                    finally:
218                        return
219
220                # sometimes twitter says '503 service unavailable' for unclear
221                # reasons - in that case just wait a while and try again
222                elif api_response.status_code in (502, 503, 504):
223                    resume_at = time.time() + 60
224                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
225                    self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % (
226                    api_response.status_code, resume_at_str))
227                    while time.time() <= resume_at:
228                        time.sleep(0.5)
229                    continue
230
231                # this usually means the query is too long or otherwise contains
232                # a syntax error
233                elif api_response.status_code == 400:
234                    msg = "Response %i from the X API; " % api_response.status_code
235                    try:
236                        api_response = api_response.json()
237                        msg += api_response.get("title", "")
238                        if "detail" in api_response:
239                            msg += ": " + api_response.get("detail", "")
240                    except (json.JSONDecodeError, TypeError):
241                        msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long."
242
243                    self.dataset.update_status(msg, is_final=True)
244                    return
245
246                # invalid API key
247                elif api_response.status_code == 401:
248                    self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True)
249                    return
250
251                # haven't seen one yet, but they probably exist
252                elif api_response.status_code != 200:
253                    self.dataset.update_status(
254                        "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True)
255                    self.log.warning("X API v2 responded with status code %i. Response body: %s" % (
256                    api_response.status_code, api_response.text))
257                    return
258
259                elif not api_response:
260                    self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True)
261                    return
262
263                api_response = api_response.json()
264
265                # The API response contains tweets (of course) and 'includes',
266                # objects that can be referenced in tweets. Later we will splice
267                # this data into the tweets themselves to make them easier to
268                # process. So extract them first...
269                included_users = api_response.get("includes", {}).get("users", {})
270                included_media = api_response.get("includes", {}).get("media", {})
271                included_polls = api_response.get("includes", {}).get("polls", {})
272                included_tweets = api_response.get("includes", {}).get("tweets", {})
273                included_places = api_response.get("includes", {}).get("places", {})
274
275                # Collect missing objects from Twitter API response by type
276                missing_objects = {}
277                for missing_object in api_response.get("errors", {}):
278                    parameter_type = missing_object.get('resource_type', 'unknown')
279                    if parameter_type in missing_objects:
280                        missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object
281                    else:
282                        missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object}
283                num_missing_objects = sum([len(v) for v in missing_objects.values()])
284
285                # Record any missing objects in log
286                if num_missing_objects > 0:
287                    # Log amount
288                    self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
289                if num_missing_objects > 50:
290                    # Large amount of missing objects; possible error with Twitter API
291                    self.import_issues = False
292                    error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets))
293                    error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
294
295                # Warn if new missing object is recorded (for developers to handle)
296                expected_error_types = ['user', 'media', 'poll', 'tweet', 'place']
297                if any(key not in expected_error_types for key in missing_objects.keys()):
298                    self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types]))
299
300                # Loop through and collect tweets
301                for tweet in api_response.get("data", []):
302
303                    if 0 < amount <= tweets:
304                        break
305
306                    # splice referenced data back in
307                    # we use copy.deepcopy here because else we run into a
308                    # pass-by-reference quagmire
309                    tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects)
310
311                    tweets += 1
312                    if tweets % 500 == 0:
313                        self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets))
314                        if num_expected_tweets is not None:
315                            self.dataset.update_progress(tweets / num_expected_tweets)
316
317                    yield tweet
318
319                if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
320                    # If id_lookup return errors in collecting tweets
321                    for tweet_error in api_response.get("errors", []):
322                        tweet_id = str(tweet_error.get('resource_id'))
323                        if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors:
324                            tweet_error = self.fix_tweet_error(tweet_error)
325                            collected_errors.append(tweet_id)
326                            yield tweet_error
327
328                # paginate
329                if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]:
330                    params["next_token"] = api_response["meta"]["next_token"]
331                else:
332                    break
333
334        if not self.import_issues:
335            self.dataset.log('Error Report:\n' + '\n'.join(error_report))
336            self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)
337
338    def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
339        """
340        Enrich tweet with user and attachment metadata
341
342        Twitter API returns some of the tweet's metadata separately, as
343        'includes' that can be cross-referenced with a user ID or media key.
344        This makes sense to conserve bandwidth, but also means tweets are not
345        'standalone' objects as originally returned.
346
347        However, for processing, making them standalone greatly reduces
348        complexity, as we can simply read one single tweet object and process
349        that data without worrying about having to get other data from
350        elsewhere. So this method takes the metadata and the original tweet,
351        splices the metadata into it where appropriate, and returns the
352        enriched object.
353
354        /!\ This is not an efficient way to store things /!\ but it is more
355        convenient.
356
357        :param dict tweet:  The tweet object
358        :param list users:  User metadata, as a list of user objects
359        :param list media:  Media metadata, as a list of media objects
360        :param list polls:  Poll metadata, as a list of poll objects
361        :param list places:  Place metadata, as a list of place objects
362        :param list referenced_tweets:  Tweets referenced in the tweet, as a
363        list of tweet objects. These will be enriched in turn.
364        :param dict missing_objects: Dictionary with data on missing objects
365                from the API by type.
366
367        :return dict:  Enriched tweet object
368        """
369        # Copy the tweet so that updating this tweet has no effect on others
370        tweet = copy.deepcopy(tweet)
371        # first create temporary mappings so we can easily find the relevant
372        # object later
373        users_by_id = {user["id"]: user for user in users}
374        users_by_name = {user["username"]: user for user in users}
375        media_by_key = {item["media_key"]: item for item in media}
376        polls_by_id = {poll["id"]: poll for poll in polls}
377        places_by_id = {place["id"]: place for place in places}
378        tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets}
379
380        # add tweet author metadata
381        tweet["author_user"] = users_by_id.get(tweet["author_id"])
382
383        # add place to geo metadata
384        # referenced_tweets also contain place_id, but these places may not included in the place objects
385        if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id:
386            tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id"))
387        elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}):
388            tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {})
389
390        # add user metadata for mentioned users
391        for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])):
392            if mention["username"] in users_by_name:
393                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])}
394            # missing users can be stored by either user ID or Username in Twitter API's error data; we check both
395            elif mention["username"] in missing_objects.get('user', {}):
396                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}}
397            elif mention["id"] in missing_objects.get('user', {}):
398                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}}
399
400
401        # add poll metadata
402        for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])):
403            if poll_id in polls_by_id:
404                tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id]
405            elif poll_id in missing_objects.get('poll', {}):
406                tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]}
407
408        # add media metadata - seems to be just the media type, the media URL
409        # etc is stored in the 'entities' attribute instead
410        for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])):
411            if media_key in media_by_key:
412                tweet["attachments"]["media_keys"][index] = media_by_key[media_key]
413            elif media_key in missing_objects.get('media', {}):
414                tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]}
415
416        # replied-to user metadata
417        if "in_reply_to_user_id" in tweet:
418            if tweet["in_reply_to_user_id"] in users_by_id:
419                tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]]
420            elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}):
421                tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]}
422
423        # enrich referenced tweets. Even though there should be no recursion -
424        # since tweets cannot be edited - we do not recursively enrich
425        # referenced tweets (should we?)
426        for index, reference in enumerate(tweet.get("referenced_tweets", [])):
427            if reference["id"] in tweets_by_id:
428                tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)}
429            elif reference["id"] in missing_objects.get('tweet', {}):
430                tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}}
431
432        return tweet
433
434    def fix_tweet_error(self, tweet_error):
435        """
436        Add fields as needed by map_tweet and other functions for errors as they
437        do not conform to normal tweet fields. Specifically for ID Lookup as
438        complete tweet could be missing.
439
440        :param dict tweet_error: Tweet error object from the Twitter API
441        :return dict:  A tweet object with the relevant fields sanitised
442        """
443        modified_tweet = tweet_error
444        modified_tweet['id'] = tweet_error.get('resource_id')
445        modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
446        modified_tweet['text'] = ''
447        modified_tweet['author_user'] = {}
448        modified_tweet['author_user']['name'] = 'UNKNOWN'
449        modified_tweet['author_user']['username'] = 'UNKNOWN'
450        modified_tweet['author_id'] = 'UNKNOWN'
451        modified_tweet['public_metrics'] = {}
452
453        # putting detail info in 'subject' field which is normally blank for tweets
454        modified_tweet['subject'] = tweet_error.get('detail')
455
456        return modified_tweet
457
458    @classmethod
459    def get_options(cls, parent_dataset=None, user=None):
460        """
461        Get Twitter data source options
462
463        These are somewhat dynamic, because depending on settings users may or
464        may not need to provide their own API key, and may or may not be able
465        to enter a list of tweet IDs as their query. Hence the method.
466
467        :param parent_dataset:  Should always be None
468        :param user:  User to provide options for
469        :return dict:  Data source options
470        """
471        have_api_key = config.get("twitterv2-search.academic_api_key", user=user)
472        max_tweets = config.get("twitterv2-search.max_tweets", user=user)
473
474        if have_api_key:
475            intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve "
476                          "historic tweets that match a given query.")
477
478        else:
479            intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the "
480                          "it, you must have access  to the Research track of the X API. You will need to provide a "
481                          "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The "
482                          "bearer token **will be sent to the 4CAT server**, where it will be deleted after data "
483                          "collection has started. Note that any posts retrieved with 4CAT will count towards your "
484                          "monthly post retrieval cap.")
485
486        intro_text += ("\n\nPlease refer to the [X API documentation]("
487                          "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) "
488                          "documentation for more information about this API endpoint and the syntax you can use in your "
489                          "search query. Retweets are included by default; add `-is:retweet` to exclude them.")
490
491        options = {
492            "intro-1": {
493                "type": UserInput.OPTION_INFO,
494                "help": intro_text
495            },
496        }
497
498        if not have_api_key:
499            # options.update({
500            #     "api_type": {
501            #         "type": UserInput.OPTION_CHOICE,
502            #         "help": "API track",
503            #         "options": {
504            #             "all": "Research API: Full-archive search",
505            #             "recent": "Standard: Recent search (Tweets published in last 7 days)",
506            #         },
507            #         "default": "all"
508            #     }
509            # })
510            options.update({
511                "api_bearer_token": {
512                    "type": UserInput.OPTION_TEXT,
513                    "sensitive": True,
514                    "cache": True,
515                    "help": "API Bearer Token"
516                },
517            })
518
519        if config.get("twitterv2.id_lookup", user=user):
520            options.update({
521                "query_type": {
522                    "type": UserInput.OPTION_CHOICE,
523                    "help": "Query type",
524                    "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup",
525                    "options": {
526                        "query": "Search query",
527                        "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)",
528                    },
529                    "default": "query"
530                }
531            })
532
533        options.update({
534            "query": {
535                "type": UserInput.OPTION_TEXT_LARGE,
536                "help": "Query"
537            },
538            "amount": {
539                "type": UserInput.OPTION_TEXT,
540                "help": "Posts to retrieve",
541                "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)),
542                "min": 0,
543                "max": max_tweets if max_tweets else 10_000_000,
544                "default": 10
545            },
546            "divider-2": {
547                "type": UserInput.OPTION_DIVIDER
548            },
549            "daterange-info": {
550                "type": UserInput.OPTION_INFO,
551                "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you "
552                        "need to explicitly set a date range."
553            },
554            "daterange": {
555                "type": UserInput.OPTION_DATERANGE,
556                "help": "Date range"
557            },
558        })
559
560        return options
561
562    @staticmethod
563    def validate_query(query, request, user):
564        """
565        Validate input for a dataset query on the Twitter data source.
566
567        Will raise a QueryParametersException if invalid parameters are
568        encountered. Parameters are additionally sanitised.
569
570        Will also raise a QueryNeedsExplicitConfirmation if the 'counts'
571        endpoint of the Twitter API indicates that it will take more than
572        30 minutes to collect the dataset. In the front-end, this will
573        trigger a warning and confirmation request.
574
575        :param dict query:  Query parameters, from client-side.
576        :param request:  Flask request
577        :param User user:  User object of user who has submitted the query
578        :return dict:  Safe query parameters
579        """
580        have_api_key = config.get("twitterv2-search.academic_api_key", user=user)
581        max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000, user=user)
582
583        # this is the bare minimum, else we can't narrow down the full data set
584        if not query.get("query", None):
585            raise QueryParametersException("Please provide a query.")
586
587        if not have_api_key:
588            if not query.get("api_bearer_token", None):
589                raise QueryParametersException("Please provide a valid bearer token.")
590
591        if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup":
592            raise QueryParametersException("X API queries cannot be longer than 1024 characters.")
593
594        if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup", user=user):
595            # reformat queries to be a comma-separated list with no wrapping
596            # whitespace
597            whitespace = re.compile(r"\s+")
598            items = whitespace.sub("", query.get("query").replace("\n", ","))
599            # eliminate empty queries
600            twitter_query = ','.join([item for item in items.split(",") if item])
601        else:
602            twitter_query = query.get("query")
603
604        # the dates need to make sense as a range to search within
605        # but, on Twitter, you can also specify before *or* after only
606        after, before = query.get("daterange")
607        if before and after and before < after:
608            raise QueryParametersException("Date range must start before it ends")
609
610        # if we made it this far, the query can be executed
611        params = {
612            "query": twitter_query,
613            "api_bearer_token": query.get("api_bearer_token"),
614            "api_type": query.get("api_type", "all"),
615            "query_type": query.get("query_type", "query"),
616            "min_date": after,
617            "max_date": before
618        }
619
620        # never query more tweets than allowed
621        tweets_to_collect = convert_to_int(query.get("amount"), 10)
622
623        if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0):
624            tweets_to_collect = max_tweets
625        params["amount"] = tweets_to_collect
626
627        # figure out how many tweets we expect to get back - we can use this
628        # to dissuade users from running huge queries that will take forever
629        # to process
630        if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key):
631            count_url = "https://api.x.com/2/tweets/counts/all"
632            count_params = {
633                "granularity": "day",
634                "query": params["query"],
635            }
636
637            # if we're doing a date range, pass this on to the counts endpoint in
638            # the right format
639            if after:
640                count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ")
641
642            if before:
643                count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ")
644
645            bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key
646
647            expected_tweets = 0
648            while True:
649                response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token},
650                                        timeout=15)
651                if response.status_code == 200:
652                    try:
653                        # figure out how many tweets there are and estimate how much
654                        # time it will take to process them. if it's going to take
655                        # longer than half an hour, warn the user
656                        expected_tweets += int(response.json()["meta"]["total_tweet_count"])
657                    except KeyError:
658                        # no harm done, we just don't know how many tweets will be
659                        # returned (but they will still be returned)
660                        break
661
662                    if "next_token" not in response.json().get("meta", {}):
663                        break
664                    else:
665                        count_params["next_token"] = response.json()["meta"]["next_token"]
666
667                elif response.status_code == 401:
668                    raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid "
669                                                   "for the Research track of the X API.")
670
671                elif response.status_code == 400:
672                    raise QueryParametersException("Your query is invalid. Please make sure the date range does not "
673                                                   "extend into the future, or to before Twitter's founding, and that "
674                                                   "your query is shorter than 1024 characters. Using AND in the query "
675                                                   "is not possible (AND is implied; OR can be used). Use \"and\" to "
676                                                   "search for the literal word.")
677
678                else:
679                    # we can still continue without the expected tweets
680                    break
681
682            warning = ""
683            if expected_tweets:
684                collectible_tweets = min(max_tweets, params["amount"])
685                if collectible_tweets == 0:
686                    collectible_tweets = max_tweets
687
688                if collectible_tweets > 0:
689                    if collectible_tweets < expected_tweets:
690                        warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets)
691                    real_expected_tweets = min(expected_tweets, collectible_tweets)
692                else:
693                    real_expected_tweets = expected_tweets
694
695                expected_seconds = int(real_expected_tweets / 30)  # seems to be about this
696                expected_time = timify_long(expected_seconds)
697                params["expected-tweets"] = expected_tweets
698
699                if expected_seconds > 900:
700                    warning += ". Collection will take approximately %s." % expected_time
701
702            if warning and not query.get("frontend-confirm"):
703                warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning)
704                warning += " Do you want to continue?"
705                raise QueryNeedsExplicitConfirmationException(warning)
706
707            params["amount"] = min(params["amount"], expected_tweets)
708            if max_tweets:
709                params["amount"] = min(max_tweets, params["amount"])
710
711        return params
712
713    @staticmethod
714    def map_item(item):
715        """
716        Map a nested Tweet object to a flat dictionary
717
718        Tweet objects are quite rich but 4CAT expects flat dictionaries per
719        item in many cases. Since it would be a shame to not store the data
720        retrieved from Twitter that cannot be stored in a flat file, we store
721        the full objects and only map them to a flat dictionary when needed.
722        This has a speed (and disk space) penalty, but makes sure we don't
723        throw away valuable data and allows for later changes that e.g. store
724        the tweets more efficiently as a MongoDB collection.
725
726        :param item:  Tweet object as originally returned by the Twitter API
727        :return dict:  Dictionary in the format expected by 4CAT
728        """
729        tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z")
730
731        # For backward compatibility
732        author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"]
733        author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"]
734        author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else ""
735
736        hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])]
737        mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])]
738        urls = [tag.get("expanded_url", tag["display_url"]) for tag in item.get("entities", {}).get("urls", [])]
739        images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
740        video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
741
742        # by default, the text of retweets is returned as "RT [excerpt of
743        # retweeted tweet]". Since we have the full tweet text, we can complete
744        # the excerpt:
745        is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])])
746        if is_retweet:
747            retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0]
748            if retweeted_tweet.get("text", False):
749                retweeted_body = retweeted_tweet.get("text")
750                # Get user's username that was retweeted
751                if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'):
752                    item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body
753                elif item.get('entities', {}).get('mentions', []):
754                    # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted
755                    retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')]
756                    if retweeting_users:
757                        # should only ever be one, but this verifies that there IS one and not NONE
758                        item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body
759
760            retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched
761
762            # Retweet entities are only included in the retweet if they occur in the first 140 characters
763            # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense
764            [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])]
765            [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])]
766            [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", [])]
767            # Images appear to be inheritted by retweets, but just in case
768            [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
769            [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
770
771        is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])])
772        is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])])
773
774        videos = []
775        for video in video_items:
776            variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True)
777            if variants:
778                videos.append(variants[0].get('url'))
779
780        expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"}
781        public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics}
782        missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]]
783        warning = ""
784        if missing_metrics:
785            warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports."
786
787        return MappedItem({
788            "id": item["id"],
789            "thread_id": item.get("conversation_id", item["id"]),
790            "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"),
791            "unix_timestamp": int(tweet_time.timestamp()),
792            'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')),
793            "subject": "",
794            "body": item["text"],
795            "author": author_username,
796            "author_fullname": author_fullname,
797            "author_id": item["author_id"],
798            "author_followers": author_followers,
799            "source": item.get("source"),
800            "language_guess": item.get("lang"),
801            "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no",
802            **public_metrics,
803            "is_retweet": "yes" if is_retweet else "no",
804            "retweeted_user": "" if not is_retweet else retweeted_user,
805            "is_quote_tweet": "yes" if is_quoted else "no",
806            "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""),
807            "is_reply": "yes" if is_reply else "no",
808            "replied_user": item.get("in_reply_to_user", {}).get("username", ""),
809            "hashtags": ','.join(set(hashtags)),
810            "urls": ','.join(set(urls)),
811            "images": ','.join(set(images)),
812            "videos": ','.join(set(videos)),
813            "mentions": ','.join(set(mentions)),
814            "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]),
815            'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''),
816        }, message=warning)

Get Tweets via the X API

type = 'twitterv2-search'
title = 'X/Twitter API (v2)'
extension = 'ndjson'
is_local = False
is_static = False
previous_request = 0
import_issues = True
references = ['[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)']
config = {'twitterv2-search.academic_api_key': {'type': 'string', 'default': '', 'help': 'Research API Key', 'tooltip': 'An API key for the X/Twitter v2 Research API. If provided, the user will not need to enter their own key to retrieve tweets. Note that this API key should have access to the Full Archive Search endpoint.'}, 'twitterv2-search.max_tweets': {'type': 'string', 'default': 0, 'min': 0, 'max': 10000000, 'help': 'Max posts per dataset', 'tooltip': "4CAT will never retrieve more than this amount of posts per dataset. Enter '0' for unlimited posts."}, 'twitterv2-search.id_lookup': {'type': 'toggle', 'default': False, 'help': 'Allow lookup by ID', 'tooltip': 'If enabled, allow users to enter a list of post IDs to retrieve. This is disabled by default because it can be confusing to novice users.'}}
def get_items(self, query):
 65    def get_items(self, query):
 66        """
 67        Use the Twitter v2 API historical search to get tweets
 68
 69        :param query:
 70        :return:
 71        """
 72        # Compile any errors to highlight at end of log
 73        error_report = []
 74        # this is pretty sensitive so delete it immediately after storing in
 75        # memory
 76        have_api_key = self.config.get("twitterv2-search.academic_api_key")
 77        bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key
 78        api_type = query.get("api_type", "all") if not have_api_key else "all"
 79        auth = {"Authorization": "Bearer %s" % bearer_token}
 80        expected_tweets = query.get("expected-tweets", "unknown")
 81
 82        # these are all expansions and fields available at the time of writing
 83        # since it does not cost anything extra in terms of rate limiting, go
 84        # for as much data per tweet as possible...
 85        tweet_fields = (
 86        "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id",
 87        "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings",
 88        "source", "text", "withheld")
 89        user_fields = (
 90        "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url",
 91        "protected", "public_metrics", "url", "username", "verified", "withheld")
 92        place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type")
 93        poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status")
 94        expansions = (
 95        "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id",
 96        "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id")
 97        media_fields = (
 98        "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants",
 99        "alt_text")
100
101        params = {
102            "expansions": ",".join(expansions),
103            "tweet.fields": ",".join(tweet_fields),
104            "user.fields": ",".join(user_fields),
105            "poll.fields": ",".join(poll_fields),
106            "place.fields": ",".join(place_fields),
107            "media.fields": ",".join(media_fields),
108        }
109
110        if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
111            endpoint = "https://api.x.com/2/tweets"
112
113            tweet_ids = self.parameters.get("query", []).split(',')
114
115            # Only can lookup 100 tweets in each query per Twitter API
116            chunk_size = 100
117            queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)]
118            expected_tweets = len(tweet_ids)
119
120            amount = len(tweet_ids)
121
122            # Initiate collection of any IDs that are unavailable
123            collected_errors = []
124
125        else:
126            # Query to all or search
127            endpoint = "https://api.x.com/2/tweets/search/" + api_type
128
129            queries = [self.parameters.get("query", "")]
130
131            amount = convert_to_int(self.parameters.get("amount"), 10)
132
133            params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100  # 100 = upper limit, 10 = lower
134
135            if self.parameters.get("min_date"):
136                params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime(
137                    "%Y-%m-%dT%H:%M:%SZ")
138
139            if self.parameters.get("max_date"):
140                params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime(
141                    "%Y-%m-%dT%H:%M:%SZ")
142
143        if type(expected_tweets) is int:
144            num_expected_tweets = expected_tweets
145            expected_tweets = "{:,}".format(expected_tweets)
146        else:
147            num_expected_tweets = None
148
149        tweets = 0
150        for query in queries:
151            if self.parameters.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"):
152                params['ids'] = query
153            else:
154                params['query'] = query
155            self.dataset.log("Search parameters: %s" % repr(params))
156            while True:
157
158                if self.interrupted:
159                    raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API")
160
161                # there is a limit of one request per second, so stay on the safe side of this
162                while self.previous_request == int(time.time()):
163                    time.sleep(0.1)
164                time.sleep(0.05)
165                self.previous_request = int(time.time())
166
167                # now send the request, allowing for at least 5 retries if the connection seems unstable
168                retries = 5
169                api_response = None
170                while retries > 0:
171                    try:
172                        api_response = requests.get(endpoint, headers=auth, params=params, timeout=30)
173                        break
174                    except (ConnectionError, requests.exceptions.RequestException) as e:
175                        retries -= 1
176                        wait_time = (5 - retries) * 10
177                        self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time))
178                        time.sleep(wait_time)
179
180                # rate limited - the limit at time of writing is 300 reqs per 15
181                # minutes
182                # usually you don't hit this when requesting batches of 500 at
183                # 1/second, but this is also returned when the user reaches the
184                # monthly tweet cap, albeit with different content in that case
185                if api_response.status_code == 429:
186                    try:
187                        structured_response = api_response.json()
188                        if structured_response.get("title") == "UsageCapExceeded":
189                            self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts "
190                                                       "until your API quota resets. Dataset completed with posts "
191                                                       "collected so far.", is_final=True)
192                            return
193                    except (json.JSONDecodeError, ValueError):
194                        self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting "
195                                                   "post collection.", is_final=True)
196                        return
197
198                    resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1
199                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
200                    self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str)
201                    while time.time() <= resume_at:
202                        if self.interrupted:
203                            raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset")
204                        time.sleep(0.5)
205                    continue
206
207                # API keys that are valid but don't have access or haven't been
208                # activated properly get a 403
209                elif api_response.status_code == 403:
210                    try:
211                        structured_response = api_response.json()
212                        self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API "
213                                                   "with this API key. %s" % structured_response.get("detail", ""), is_final=True)
214                    except (json.JSONDecodeError, ValueError):
215                        self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to "
216                                                   "the full-archive search endpoint.", is_final=True)
217                    finally:
218                        return
219
220                # sometimes twitter says '503 service unavailable' for unclear
221                # reasons - in that case just wait a while and try again
222                elif api_response.status_code in (502, 503, 504):
223                    resume_at = time.time() + 60
224                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
225                    self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % (
226                    api_response.status_code, resume_at_str))
227                    while time.time() <= resume_at:
228                        time.sleep(0.5)
229                    continue
230
231                # this usually means the query is too long or otherwise contains
232                # a syntax error
233                elif api_response.status_code == 400:
234                    msg = "Response %i from the X API; " % api_response.status_code
235                    try:
236                        api_response = api_response.json()
237                        msg += api_response.get("title", "")
238                        if "detail" in api_response:
239                            msg += ": " + api_response.get("detail", "")
240                    except (json.JSONDecodeError, TypeError):
241                        msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long."
242
243                    self.dataset.update_status(msg, is_final=True)
244                    return
245
246                # invalid API key
247                elif api_response.status_code == 401:
248                    self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True)
249                    return
250
251                # haven't seen one yet, but they probably exist
252                elif api_response.status_code != 200:
253                    self.dataset.update_status(
254                        "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True)
255                    self.log.warning("X API v2 responded with status code %i. Response body: %s" % (
256                    api_response.status_code, api_response.text))
257                    return
258
259                elif not api_response:
260                    self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True)
261                    return
262
263                api_response = api_response.json()
264
265                # The API response contains tweets (of course) and 'includes',
266                # objects that can be referenced in tweets. Later we will splice
267                # this data into the tweets themselves to make them easier to
268                # process. So extract them first...
269                included_users = api_response.get("includes", {}).get("users", {})
270                included_media = api_response.get("includes", {}).get("media", {})
271                included_polls = api_response.get("includes", {}).get("polls", {})
272                included_tweets = api_response.get("includes", {}).get("tweets", {})
273                included_places = api_response.get("includes", {}).get("places", {})
274
275                # Collect missing objects from Twitter API response by type
276                missing_objects = {}
277                for missing_object in api_response.get("errors", {}):
278                    parameter_type = missing_object.get('resource_type', 'unknown')
279                    if parameter_type in missing_objects:
280                        missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object
281                    else:
282                        missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object}
283                num_missing_objects = sum([len(v) for v in missing_objects.values()])
284
285                # Record any missing objects in log
286                if num_missing_objects > 0:
287                    # Log amount
288                    self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
289                if num_missing_objects > 50:
290                    # Large amount of missing objects; possible error with Twitter API
291                    self.import_issues = False
292                    error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets))
293                    error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
294
295                # Warn if new missing object is recorded (for developers to handle)
296                expected_error_types = ['user', 'media', 'poll', 'tweet', 'place']
297                if any(key not in expected_error_types for key in missing_objects.keys()):
298                    self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types]))
299
300                # Loop through and collect tweets
301                for tweet in api_response.get("data", []):
302
303                    if 0 < amount <= tweets:
304                        break
305
306                    # splice referenced data back in
307                    # we use copy.deepcopy here because else we run into a
308                    # pass-by-reference quagmire
309                    tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects)
310
311                    tweets += 1
312                    if tweets % 500 == 0:
313                        self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets))
314                        if num_expected_tweets is not None:
315                            self.dataset.update_progress(tweets / num_expected_tweets)
316
317                    yield tweet
318
319                if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
320                    # If id_lookup return errors in collecting tweets
321                    for tweet_error in api_response.get("errors", []):
322                        tweet_id = str(tweet_error.get('resource_id'))
323                        if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors:
324                            tweet_error = self.fix_tweet_error(tweet_error)
325                            collected_errors.append(tweet_id)
326                            yield tweet_error
327
328                # paginate
329                if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]:
330                    params["next_token"] = api_response["meta"]["next_token"]
331                else:
332                    break
333
334        if not self.import_issues:
335            self.dataset.log('Error Report:\n' + '\n'.join(error_report))
336            self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)

Use the Twitter v2 API historical search to get tweets

Parameters
  • query:
Returns
def enrich_tweet( self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
338    def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
339        """
340        Enrich tweet with user and attachment metadata
341
342        Twitter API returns some of the tweet's metadata separately, as
343        'includes' that can be cross-referenced with a user ID or media key.
344        This makes sense to conserve bandwidth, but also means tweets are not
345        'standalone' objects as originally returned.
346
347        However, for processing, making them standalone greatly reduces
348        complexity, as we can simply read one single tweet object and process
349        that data without worrying about having to get other data from
350        elsewhere. So this method takes the metadata and the original tweet,
351        splices the metadata into it where appropriate, and returns the
352        enriched object.
353
354        /!\ This is not an efficient way to store things /!\ but it is more
355        convenient.
356
357        :param dict tweet:  The tweet object
358        :param list users:  User metadata, as a list of user objects
359        :param list media:  Media metadata, as a list of media objects
360        :param list polls:  Poll metadata, as a list of poll objects
361        :param list places:  Place metadata, as a list of place objects
362        :param list referenced_tweets:  Tweets referenced in the tweet, as a
363        list of tweet objects. These will be enriched in turn.
364        :param dict missing_objects: Dictionary with data on missing objects
365                from the API by type.
366
367        :return dict:  Enriched tweet object
368        """
369        # Copy the tweet so that updating this tweet has no effect on others
370        tweet = copy.deepcopy(tweet)
371        # first create temporary mappings so we can easily find the relevant
372        # object later
373        users_by_id = {user["id"]: user for user in users}
374        users_by_name = {user["username"]: user for user in users}
375        media_by_key = {item["media_key"]: item for item in media}
376        polls_by_id = {poll["id"]: poll for poll in polls}
377        places_by_id = {place["id"]: place for place in places}
378        tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets}
379
380        # add tweet author metadata
381        tweet["author_user"] = users_by_id.get(tweet["author_id"])
382
383        # add place to geo metadata
384        # referenced_tweets also contain place_id, but these places may not included in the place objects
385        if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id:
386            tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id"))
387        elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}):
388            tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {})
389
390        # add user metadata for mentioned users
391        for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])):
392            if mention["username"] in users_by_name:
393                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])}
394            # missing users can be stored by either user ID or Username in Twitter API's error data; we check both
395            elif mention["username"] in missing_objects.get('user', {}):
396                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}}
397            elif mention["id"] in missing_objects.get('user', {}):
398                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}}
399
400
401        # add poll metadata
402        for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])):
403            if poll_id in polls_by_id:
404                tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id]
405            elif poll_id in missing_objects.get('poll', {}):
406                tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]}
407
408        # add media metadata - seems to be just the media type, the media URL
409        # etc is stored in the 'entities' attribute instead
410        for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])):
411            if media_key in media_by_key:
412                tweet["attachments"]["media_keys"][index] = media_by_key[media_key]
413            elif media_key in missing_objects.get('media', {}):
414                tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]}
415
416        # replied-to user metadata
417        if "in_reply_to_user_id" in tweet:
418            if tweet["in_reply_to_user_id"] in users_by_id:
419                tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]]
420            elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}):
421                tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]}
422
423        # enrich referenced tweets. Even though there should be no recursion -
424        # since tweets cannot be edited - we do not recursively enrich
425        # referenced tweets (should we?)
426        for index, reference in enumerate(tweet.get("referenced_tweets", [])):
427            if reference["id"] in tweets_by_id:
428                tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)}
429            elif reference["id"] in missing_objects.get('tweet', {}):
430                tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}}
431
432        return tweet

Enrich tweet with user and attachment metadata

Twitter API returns some of the tweet's metadata separately, as 'includes' that can be cross-referenced with a user ID or media key. This makes sense to conserve bandwidth, but also means tweets are not 'standalone' objects as originally returned.

However, for processing, making them standalone greatly reduces complexity, as we can simply read one single tweet object and process that data without worrying about having to get other data from elsewhere. So this method takes the metadata and the original tweet, splices the metadata into it where appropriate, and returns the enriched object.

/!\ This is not an efficient way to store things /!\ but it is more convenient.

Parameters
  • dict tweet: The tweet object
  • list users: User metadata, as a list of user objects
  • list media: Media metadata, as a list of media objects
  • list polls: Poll metadata, as a list of poll objects
  • list places: Place metadata, as a list of place objects
  • list referenced_tweets: Tweets referenced in the tweet, as a list of tweet objects. These will be enriched in turn.
  • dict missing_objects: Dictionary with data on missing objects from the API by type.
Returns

Enriched tweet object

def fix_tweet_error(self, tweet_error):
434    def fix_tweet_error(self, tweet_error):
435        """
436        Add fields as needed by map_tweet and other functions for errors as they
437        do not conform to normal tweet fields. Specifically for ID Lookup as
438        complete tweet could be missing.
439
440        :param dict tweet_error: Tweet error object from the Twitter API
441        :return dict:  A tweet object with the relevant fields sanitised
442        """
443        modified_tweet = tweet_error
444        modified_tweet['id'] = tweet_error.get('resource_id')
445        modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
446        modified_tweet['text'] = ''
447        modified_tweet['author_user'] = {}
448        modified_tweet['author_user']['name'] = 'UNKNOWN'
449        modified_tweet['author_user']['username'] = 'UNKNOWN'
450        modified_tweet['author_id'] = 'UNKNOWN'
451        modified_tweet['public_metrics'] = {}
452
453        # putting detail info in 'subject' field which is normally blank for tweets
454        modified_tweet['subject'] = tweet_error.get('detail')
455
456        return modified_tweet

Add fields as needed by map_tweet and other functions for errors as they do not conform to normal tweet fields. Specifically for ID Lookup as complete tweet could be missing.

Parameters
  • dict tweet_error: Tweet error object from the Twitter API
Returns

A tweet object with the relevant fields sanitised

@classmethod
def get_options(cls, parent_dataset=None, user=None):
458    @classmethod
459    def get_options(cls, parent_dataset=None, user=None):
460        """
461        Get Twitter data source options
462
463        These are somewhat dynamic, because depending on settings users may or
464        may not need to provide their own API key, and may or may not be able
465        to enter a list of tweet IDs as their query. Hence the method.
466
467        :param parent_dataset:  Should always be None
468        :param user:  User to provide options for
469        :return dict:  Data source options
470        """
471        have_api_key = config.get("twitterv2-search.academic_api_key", user=user)
472        max_tweets = config.get("twitterv2-search.max_tweets", user=user)
473
474        if have_api_key:
475            intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve "
476                          "historic tweets that match a given query.")
477
478        else:
479            intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the "
480                          "it, you must have access  to the Research track of the X API. You will need to provide a "
481                          "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The "
482                          "bearer token **will be sent to the 4CAT server**, where it will be deleted after data "
483                          "collection has started. Note that any posts retrieved with 4CAT will count towards your "
484                          "monthly post retrieval cap.")
485
486        intro_text += ("\n\nPlease refer to the [X API documentation]("
487                          "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) "
488                          "documentation for more information about this API endpoint and the syntax you can use in your "
489                          "search query. Retweets are included by default; add `-is:retweet` to exclude them.")
490
491        options = {
492            "intro-1": {
493                "type": UserInput.OPTION_INFO,
494                "help": intro_text
495            },
496        }
497
498        if not have_api_key:
499            # options.update({
500            #     "api_type": {
501            #         "type": UserInput.OPTION_CHOICE,
502            #         "help": "API track",
503            #         "options": {
504            #             "all": "Research API: Full-archive search",
505            #             "recent": "Standard: Recent search (Tweets published in last 7 days)",
506            #         },
507            #         "default": "all"
508            #     }
509            # })
510            options.update({
511                "api_bearer_token": {
512                    "type": UserInput.OPTION_TEXT,
513                    "sensitive": True,
514                    "cache": True,
515                    "help": "API Bearer Token"
516                },
517            })
518
519        if config.get("twitterv2.id_lookup", user=user):
520            options.update({
521                "query_type": {
522                    "type": UserInput.OPTION_CHOICE,
523                    "help": "Query type",
524                    "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup",
525                    "options": {
526                        "query": "Search query",
527                        "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)",
528                    },
529                    "default": "query"
530                }
531            })
532
533        options.update({
534            "query": {
535                "type": UserInput.OPTION_TEXT_LARGE,
536                "help": "Query"
537            },
538            "amount": {
539                "type": UserInput.OPTION_TEXT,
540                "help": "Posts to retrieve",
541                "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)),
542                "min": 0,
543                "max": max_tweets if max_tweets else 10_000_000,
544                "default": 10
545            },
546            "divider-2": {
547                "type": UserInput.OPTION_DIVIDER
548            },
549            "daterange-info": {
550                "type": UserInput.OPTION_INFO,
551                "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you "
552                        "need to explicitly set a date range."
553            },
554            "daterange": {
555                "type": UserInput.OPTION_DATERANGE,
556                "help": "Date range"
557            },
558        })
559
560        return options

Get Twitter data source options

These are somewhat dynamic, because depending on settings users may or may not need to provide their own API key, and may or may not be able to enter a list of tweet IDs as their query. Hence the method.

Parameters
  • parent_dataset: Should always be None
  • user: User to provide options for
Returns

Data source options

@staticmethod
def validate_query(query, request, user):
562    @staticmethod
563    def validate_query(query, request, user):
564        """
565        Validate input for a dataset query on the Twitter data source.
566
567        Will raise a QueryParametersException if invalid parameters are
568        encountered. Parameters are additionally sanitised.
569
570        Will also raise a QueryNeedsExplicitConfirmation if the 'counts'
571        endpoint of the Twitter API indicates that it will take more than
572        30 minutes to collect the dataset. In the front-end, this will
573        trigger a warning and confirmation request.
574
575        :param dict query:  Query parameters, from client-side.
576        :param request:  Flask request
577        :param User user:  User object of user who has submitted the query
578        :return dict:  Safe query parameters
579        """
580        have_api_key = config.get("twitterv2-search.academic_api_key", user=user)
581        max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000, user=user)
582
583        # this is the bare minimum, else we can't narrow down the full data set
584        if not query.get("query", None):
585            raise QueryParametersException("Please provide a query.")
586
587        if not have_api_key:
588            if not query.get("api_bearer_token", None):
589                raise QueryParametersException("Please provide a valid bearer token.")
590
591        if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup":
592            raise QueryParametersException("X API queries cannot be longer than 1024 characters.")
593
594        if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup", user=user):
595            # reformat queries to be a comma-separated list with no wrapping
596            # whitespace
597            whitespace = re.compile(r"\s+")
598            items = whitespace.sub("", query.get("query").replace("\n", ","))
599            # eliminate empty queries
600            twitter_query = ','.join([item for item in items.split(",") if item])
601        else:
602            twitter_query = query.get("query")
603
604        # the dates need to make sense as a range to search within
605        # but, on Twitter, you can also specify before *or* after only
606        after, before = query.get("daterange")
607        if before and after and before < after:
608            raise QueryParametersException("Date range must start before it ends")
609
610        # if we made it this far, the query can be executed
611        params = {
612            "query": twitter_query,
613            "api_bearer_token": query.get("api_bearer_token"),
614            "api_type": query.get("api_type", "all"),
615            "query_type": query.get("query_type", "query"),
616            "min_date": after,
617            "max_date": before
618        }
619
620        # never query more tweets than allowed
621        tweets_to_collect = convert_to_int(query.get("amount"), 10)
622
623        if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0):
624            tweets_to_collect = max_tweets
625        params["amount"] = tweets_to_collect
626
627        # figure out how many tweets we expect to get back - we can use this
628        # to dissuade users from running huge queries that will take forever
629        # to process
630        if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key):
631            count_url = "https://api.x.com/2/tweets/counts/all"
632            count_params = {
633                "granularity": "day",
634                "query": params["query"],
635            }
636
637            # if we're doing a date range, pass this on to the counts endpoint in
638            # the right format
639            if after:
640                count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ")
641
642            if before:
643                count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ")
644
645            bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key
646
647            expected_tweets = 0
648            while True:
649                response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token},
650                                        timeout=15)
651                if response.status_code == 200:
652                    try:
653                        # figure out how many tweets there are and estimate how much
654                        # time it will take to process them. if it's going to take
655                        # longer than half an hour, warn the user
656                        expected_tweets += int(response.json()["meta"]["total_tweet_count"])
657                    except KeyError:
658                        # no harm done, we just don't know how many tweets will be
659                        # returned (but they will still be returned)
660                        break
661
662                    if "next_token" not in response.json().get("meta", {}):
663                        break
664                    else:
665                        count_params["next_token"] = response.json()["meta"]["next_token"]
666
667                elif response.status_code == 401:
668                    raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid "
669                                                   "for the Research track of the X API.")
670
671                elif response.status_code == 400:
672                    raise QueryParametersException("Your query is invalid. Please make sure the date range does not "
673                                                   "extend into the future, or to before Twitter's founding, and that "
674                                                   "your query is shorter than 1024 characters. Using AND in the query "
675                                                   "is not possible (AND is implied; OR can be used). Use \"and\" to "
676                                                   "search for the literal word.")
677
678                else:
679                    # we can still continue without the expected tweets
680                    break
681
682            warning = ""
683            if expected_tweets:
684                collectible_tweets = min(max_tweets, params["amount"])
685                if collectible_tweets == 0:
686                    collectible_tweets = max_tweets
687
688                if collectible_tweets > 0:
689                    if collectible_tweets < expected_tweets:
690                        warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets)
691                    real_expected_tweets = min(expected_tweets, collectible_tweets)
692                else:
693                    real_expected_tweets = expected_tweets
694
695                expected_seconds = int(real_expected_tweets / 30)  # seems to be about this
696                expected_time = timify_long(expected_seconds)
697                params["expected-tweets"] = expected_tweets
698
699                if expected_seconds > 900:
700                    warning += ". Collection will take approximately %s." % expected_time
701
702            if warning and not query.get("frontend-confirm"):
703                warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning)
704                warning += " Do you want to continue?"
705                raise QueryNeedsExplicitConfirmationException(warning)
706
707            params["amount"] = min(params["amount"], expected_tweets)
708            if max_tweets:
709                params["amount"] = min(max_tweets, params["amount"])
710
711        return params

Validate input for a dataset query on the Twitter data source.

Will raise a QueryParametersException if invalid parameters are encountered. Parameters are additionally sanitised.

Will also raise a QueryNeedsExplicitConfirmation if the 'counts' endpoint of the Twitter API indicates that it will take more than 30 minutes to collect the dataset. In the front-end, this will trigger a warning and confirmation request.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters

@staticmethod
def map_item(item):
713    @staticmethod
714    def map_item(item):
715        """
716        Map a nested Tweet object to a flat dictionary
717
718        Tweet objects are quite rich but 4CAT expects flat dictionaries per
719        item in many cases. Since it would be a shame to not store the data
720        retrieved from Twitter that cannot be stored in a flat file, we store
721        the full objects and only map them to a flat dictionary when needed.
722        This has a speed (and disk space) penalty, but makes sure we don't
723        throw away valuable data and allows for later changes that e.g. store
724        the tweets more efficiently as a MongoDB collection.
725
726        :param item:  Tweet object as originally returned by the Twitter API
727        :return dict:  Dictionary in the format expected by 4CAT
728        """
729        tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z")
730
731        # For backward compatibility
732        author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"]
733        author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"]
734        author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else ""
735
736        hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])]
737        mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])]
738        urls = [tag.get("expanded_url", tag["display_url"]) for tag in item.get("entities", {}).get("urls", [])]
739        images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
740        video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
741
742        # by default, the text of retweets is returned as "RT [excerpt of
743        # retweeted tweet]". Since we have the full tweet text, we can complete
744        # the excerpt:
745        is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])])
746        if is_retweet:
747            retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0]
748            if retweeted_tweet.get("text", False):
749                retweeted_body = retweeted_tweet.get("text")
750                # Get user's username that was retweeted
751                if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'):
752                    item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body
753                elif item.get('entities', {}).get('mentions', []):
754                    # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted
755                    retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')]
756                    if retweeting_users:
757                        # should only ever be one, but this verifies that there IS one and not NONE
758                        item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body
759
760            retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched
761
762            # Retweet entities are only included in the retweet if they occur in the first 140 characters
763            # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense
764            [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])]
765            [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])]
766            [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", [])]
767            # Images appear to be inheritted by retweets, but just in case
768            [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
769            [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
770
771        is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])])
772        is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])])
773
774        videos = []
775        for video in video_items:
776            variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True)
777            if variants:
778                videos.append(variants[0].get('url'))
779
780        expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"}
781        public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics}
782        missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]]
783        warning = ""
784        if missing_metrics:
785            warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports."
786
787        return MappedItem({
788            "id": item["id"],
789            "thread_id": item.get("conversation_id", item["id"]),
790            "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"),
791            "unix_timestamp": int(tweet_time.timestamp()),
792            'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')),
793            "subject": "",
794            "body": item["text"],
795            "author": author_username,
796            "author_fullname": author_fullname,
797            "author_id": item["author_id"],
798            "author_followers": author_followers,
799            "source": item.get("source"),
800            "language_guess": item.get("lang"),
801            "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no",
802            **public_metrics,
803            "is_retweet": "yes" if is_retweet else "no",
804            "retweeted_user": "" if not is_retweet else retweeted_user,
805            "is_quote_tweet": "yes" if is_quoted else "no",
806            "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""),
807            "is_reply": "yes" if is_reply else "no",
808            "replied_user": item.get("in_reply_to_user", {}).get("username", ""),
809            "hashtags": ','.join(set(hashtags)),
810            "urls": ','.join(set(urls)),
811            "images": ','.join(set(images)),
812            "videos": ','.join(set(videos)),
813            "mentions": ','.join(set(mentions)),
814            "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]),
815            'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''),
816        }, message=warning)

Map a nested Tweet object to a flat dictionary

Tweet objects are quite rich but 4CAT expects flat dictionaries per item in many cases. Since it would be a shame to not store the data retrieved from Twitter that cannot be stored in a flat file, we store the full objects and only map them to a flat dictionary when needed. This has a speed (and disk space) penalty, but makes sure we don't throw away valuable data and allows for later changes that e.g. store the tweets more efficiently as a MongoDB collection.

Parameters
  • item: Tweet object as originally returned by the Twitter API
Returns

Dictionary in the format expected by 4CAT