Edit on GitHub

datasources.twitterv2.search_twitter

X/Twitter keyword search via the X API v2

  1"""
  2X/Twitter keyword search via the X API v2
  3"""
  4import requests
  5import datetime
  6import copy
  7import time
  8import json
  9import re
 10
 11from backend.lib.search import Search
 12from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, QueryNeedsExplicitConfirmationException
 13from common.lib.helpers import convert_to_int, UserInput, timify
 14from common.lib.item_mapping import MappedItem, MissingMappedField
 15
 16
 17class SearchWithTwitterAPIv2(Search):
 18    """
 19    Get Tweets via the X API
 20    """
 21    type = "twitterv2-search"  # job ID
 22    title = "X/Twitter API (v2)"
 23    extension = "ndjson"
 24    is_local = False    # Whether this datasource is locally scraped
 25    is_static = False   # Whether this datasource is still updated
 26
 27    previous_request = 0
 28    import_issues = True
 29
 30    references = [
 31        "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)"
 32    ]
 33
 34    config = {
 35        "twitterv2-search.academic_api_key": {
 36            "type": UserInput.OPTION_TEXT,
 37            "default": "",
 38            "help": "Research API Key",
 39            "tooltip": "An API key for the X/Twitter v2 Research API. If "
 40                       "provided, the user will not need to enter their own "
 41                       "key to retrieve tweets. Note that this API key should "
 42                       "have access to the Full Archive Search endpoint."
 43        },
 44        "twitterv2-search.max_tweets": {
 45            "type": UserInput.OPTION_TEXT,
 46            "default": 0,
 47            "min": 0,
 48            "max": 10_000_000,
 49            "help": "Max posts per dataset",
 50            "tooltip": "4CAT will never retrieve more than this amount of "
 51                       "posts per dataset. Enter '0' for unlimited posts."
 52        },
 53        "twitterv2-search.id_lookup": {
 54            "type": UserInput.OPTION_TOGGLE,
 55            "default": False,
 56            "help": "Allow lookup by ID",
 57            "tooltip": "If enabled, allow users to enter a list of post IDs "
 58                       "to retrieve. This is disabled by default because it "
 59                       "can be confusing to novice users."
 60        }
 61    }
 62
 63    def get_items(self, query):
 64        """
 65        Use the Twitter v2 API historical search to get tweets
 66
 67        :param query:
 68        :return:
 69        """
 70        # Compile any errors to highlight at end of log
 71        error_report = []
 72        # this is pretty sensitive so delete it immediately after storing in
 73        # memory
 74        have_api_key = self.config.get("twitterv2-search.academic_api_key")
 75        bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key
 76        api_type = query.get("api_type", "all") if not have_api_key else "all"
 77        auth = {"Authorization": "Bearer %s" % bearer_token}
 78        expected_tweets = query.get("expected-tweets", "unknown")
 79
 80        # these are all expansions and fields available at the time of writing
 81        # since it does not cost anything extra in terms of rate limiting, go
 82        # for as much data per tweet as possible...
 83        tweet_fields = (
 84        "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id",
 85        "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings",
 86        "source", "text", "withheld")
 87        user_fields = (
 88        "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url",
 89        "protected", "public_metrics", "url", "username", "verified", "withheld")
 90        place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type")
 91        poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status")
 92        expansions = (
 93        "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id",
 94        "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id")
 95        media_fields = (
 96        "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants",
 97        "alt_text")
 98
 99        params = {
100            "expansions": ",".join(expansions),
101            "tweet.fields": ",".join(tweet_fields),
102            "user.fields": ",".join(user_fields),
103            "poll.fields": ",".join(poll_fields),
104            "place.fields": ",".join(place_fields),
105            "media.fields": ",".join(media_fields),
106        }
107
108        if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
109            endpoint = "https://api.x.com/2/tweets"
110
111            tweet_ids = self.parameters.get("query", []).split(',')
112
113            # Only can lookup 100 tweets in each query per Twitter API
114            chunk_size = 100
115            queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)]
116            expected_tweets = len(tweet_ids)
117
118            amount = len(tweet_ids)
119
120            # Initiate collection of any IDs that are unavailable
121            collected_errors = []
122
123        else:
124            # Query to all or search
125            endpoint = "https://api.x.com/2/tweets/search/" + api_type
126
127            queries = [self.parameters.get("query", "")]
128
129            amount = convert_to_int(self.parameters.get("amount"), 10)
130
131            params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100  # 100 = upper limit, 10 = lower
132
133            if self.parameters.get("min_date"):
134                params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime(
135                    "%Y-%m-%dT%H:%M:%SZ")
136
137            if self.parameters.get("max_date"):
138                params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime(
139                    "%Y-%m-%dT%H:%M:%SZ")
140
141        if type(expected_tweets) is int:
142            num_expected_tweets = expected_tweets
143            expected_tweets = "{:,}".format(expected_tweets)
144        else:
145            num_expected_tweets = None
146
147        tweets = 0
148        for query in queries:
149            if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
150                params['ids'] = query
151            else:
152                params['query'] = query
153            self.dataset.log("Search parameters: %s" % repr(params))
154            while True:
155
156                if self.interrupted:
157                    raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API")
158
159                # there is a limit of one request per second, so stay on the safe side of this
160                while self.previous_request == int(time.time()):
161                    time.sleep(0.1)
162                time.sleep(0.05)
163                self.previous_request = int(time.time())
164
165                # now send the request, allowing for at least 5 retries if the connection seems unstable
166                retries = 5
167                api_response = None
168                while retries > 0:
169                    try:
170                        api_response = requests.get(endpoint, headers=auth, params=params, timeout=30)
171                        break
172                    except (ConnectionError, requests.exceptions.RequestException) as e:
173                        retries -= 1
174                        wait_time = (5 - retries) * 10
175                        self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time))
176                        time.sleep(wait_time)
177
178                # rate limited - the limit at time of writing is 300 reqs per 15
179                # minutes
180                # usually you don't hit this when requesting batches of 500 at
181                # 1/second, but this is also returned when the user reaches the
182                # monthly tweet cap, albeit with different content in that case
183                if api_response.status_code == 429:
184                    try:
185                        structured_response = api_response.json()
186                        if structured_response.get("title") == "UsageCapExceeded":
187                            self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts "
188                                                       "until your API quota resets. Dataset completed with posts "
189                                                       "collected so far.", is_final=True)
190                            return
191                    except (json.JSONDecodeError, ValueError):
192                        self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting "
193                                                   "post collection.", is_final=True)
194                        return
195
196                    resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1
197                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
198                    self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str)
199                    while time.time() <= resume_at:
200                        if self.interrupted:
201                            raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset")
202                        time.sleep(0.5)
203                    continue
204
205                # API keys that are valid but don't have access or haven't been
206                # activated properly get a 403
207                elif api_response.status_code == 403:
208                    try:
209                        structured_response = api_response.json()
210                        self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API "
211                                                   "with this API key. %s" % structured_response.get("detail", ""), is_final=True)
212                    except (json.JSONDecodeError, ValueError):
213                        self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to "
214                                                   "the full-archive search endpoint.", is_final=True)
215                    finally:
216                        return
217
218                # sometimes twitter says '503 service unavailable' for unclear
219                # reasons - in that case just wait a while and try again
220                elif api_response.status_code in (502, 503, 504):
221                    resume_at = time.time() + 60
222                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
223                    self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % (
224                    api_response.status_code, resume_at_str))
225                    while time.time() <= resume_at:
226                        time.sleep(0.5)
227                    continue
228
229                # this usually means the query is too long or otherwise contains
230                # a syntax error
231                elif api_response.status_code == 400:
232                    msg = "Response %i from the X API; " % api_response.status_code
233                    try:
234                        api_response = api_response.json()
235                        msg += api_response.get("title", "")
236                        if "detail" in api_response:
237                            msg += ": " + api_response.get("detail", "")
238                    except (json.JSONDecodeError, TypeError):
239                        msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long."
240
241                    self.dataset.update_status(msg, is_final=True)
242                    return
243
244                # invalid API key
245                elif api_response.status_code == 401:
246                    self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True)
247                    return
248
249                # haven't seen one yet, but they probably exist
250                elif api_response.status_code != 200:
251                    self.dataset.update_status(
252                        "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True)
253                    self.log.warning("X API v2 responded with status code %i. Response body: %s" % (
254                    api_response.status_code, api_response.text))
255                    return
256
257                elif not api_response:
258                    self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True)
259                    return
260
261                api_response = api_response.json()
262
263                # The API response contains tweets (of course) and 'includes',
264                # objects that can be referenced in tweets. Later we will splice
265                # this data into the tweets themselves to make them easier to
266                # process. So extract them first...
267                included_users = api_response.get("includes", {}).get("users", {})
268                included_media = api_response.get("includes", {}).get("media", {})
269                included_polls = api_response.get("includes", {}).get("polls", {})
270                included_tweets = api_response.get("includes", {}).get("tweets", {})
271                included_places = api_response.get("includes", {}).get("places", {})
272
273                # Collect missing objects from Twitter API response by type
274                missing_objects = {}
275                for missing_object in api_response.get("errors", {}):
276                    parameter_type = missing_object.get('resource_type', 'unknown')
277                    if parameter_type in missing_objects:
278                        missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object
279                    else:
280                        missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object}
281                num_missing_objects = sum([len(v) for v in missing_objects.values()])
282
283                # Record any missing objects in log
284                if num_missing_objects > 0:
285                    # Log amount
286                    self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
287                if num_missing_objects > 50:
288                    # Large amount of missing objects; possible error with Twitter API
289                    self.import_issues = False
290                    error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets))
291                    error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
292
293                # Warn if new missing object is recorded (for developers to handle)
294                expected_error_types = ['user', 'media', 'poll', 'tweet', 'place']
295                if any(key not in expected_error_types for key in missing_objects.keys()):
296                    self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types]))
297
298                # Loop through and collect tweets
299                for tweet in api_response.get("data", []):
300
301                    if 0 < amount <= tweets:
302                        break
303
304                    # splice referenced data back in
305                    # we use copy.deepcopy here because else we run into a
306                    # pass-by-reference quagmire
307                    tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects)
308
309                    tweets += 1
310                    if tweets % 500 == 0:
311                        self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets))
312                        if num_expected_tweets is not None:
313                            self.dataset.update_progress(tweets / num_expected_tweets)
314
315                    yield tweet
316
317                if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
318                    # If id_lookup return errors in collecting tweets
319                    for tweet_error in api_response.get("errors", []):
320                        tweet_id = str(tweet_error.get('resource_id'))
321                        if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors:
322                            tweet_error = self.fix_tweet_error(tweet_error)
323                            collected_errors.append(tweet_id)
324                            yield tweet_error
325
326                # paginate
327                if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]:
328                    params["next_token"] = api_response["meta"]["next_token"]
329                else:
330                    break
331
332        if not self.import_issues:
333            self.dataset.log('Error Report:\n' + '\n'.join(error_report))
334            self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)
335
336    def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
337        """
338        Enrich tweet with user and attachment metadata
339
340        Twitter API returns some of the tweet's metadata separately, as
341        'includes' that can be cross-referenced with a user ID or media key.
342        This makes sense to conserve bandwidth, but also means tweets are not
343        'standalone' objects as originally returned.
344
345        However, for processing, making them standalone greatly reduces
346        complexity, as we can simply read one single tweet object and process
347        that data without worrying about having to get other data from
348        elsewhere. So this method takes the metadata and the original tweet,
349        splices the metadata into it where appropriate, and returns the
350        enriched object.
351
352        **This is not an efficient way to store things** but it is more
353        convenient.
354
355        :param dict tweet:  The tweet object
356        :param list users:  User metadata, as a list of user objects
357        :param list media:  Media metadata, as a list of media objects
358        :param list polls:  Poll metadata, as a list of poll objects
359        :param list places:  Place metadata, as a list of place objects
360        :param list referenced_tweets:  Tweets referenced in the tweet, as a
361        list of tweet objects. These will be enriched in turn.
362        :param dict missing_objects: Dictionary with data on missing objects
363                from the API by type.
364
365        :return dict:  Enriched tweet object
366        """
367        # Copy the tweet so that updating this tweet has no effect on others
368        tweet = copy.deepcopy(tweet)
369        # first create temporary mappings so we can easily find the relevant
370        # object later
371        users_by_id = {user["id"]: user for user in users}
372        users_by_name = {user["username"]: user for user in users}
373        media_by_key = {item["media_key"]: item for item in media}
374        polls_by_id = {poll["id"]: poll for poll in polls}
375        places_by_id = {place["id"]: place for place in places}
376        tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets}
377
378        # add tweet author metadata
379        tweet["author_user"] = users_by_id.get(tweet["author_id"])
380
381        # add place to geo metadata
382        # referenced_tweets also contain place_id, but these places may not included in the place objects
383        if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id:
384            tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id"))
385        elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}):
386            tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {})
387
388        # add user metadata for mentioned users
389        for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])):
390            if mention["username"] in users_by_name:
391                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])}
392            # missing users can be stored by either user ID or Username in Twitter API's error data; we check both
393            elif mention["username"] in missing_objects.get('user', {}):
394                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}}
395            elif mention["id"] in missing_objects.get('user', {}):
396                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}}
397
398
399        # add poll metadata
400        for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])):
401            if poll_id in polls_by_id:
402                tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id]
403            elif poll_id in missing_objects.get('poll', {}):
404                tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]}
405
406        # add media metadata - seems to be just the media type, the media URL
407        # etc is stored in the 'entities' attribute instead
408        for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])):
409            if media_key in media_by_key:
410                tweet["attachments"]["media_keys"][index] = media_by_key[media_key]
411            elif media_key in missing_objects.get('media', {}):
412                tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]}
413
414        # replied-to user metadata
415        if "in_reply_to_user_id" in tweet:
416            if tweet["in_reply_to_user_id"] in users_by_id:
417                tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]]
418            elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}):
419                tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]}
420
421        # enrich referenced tweets. Even though there should be no recursion -
422        # since tweets cannot be edited - we do not recursively enrich
423        # referenced tweets (should we?)
424        for index, reference in enumerate(tweet.get("referenced_tweets", [])):
425            if reference["id"] in tweets_by_id:
426                tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)}
427            elif reference["id"] in missing_objects.get('tweet', {}):
428                tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}}
429
430        return tweet
431
432    def fix_tweet_error(self, tweet_error):
433        """
434        Add fields as needed by map_tweet and other functions for errors as they
435        do not conform to normal tweet fields. Specifically for ID Lookup as
436        complete tweet could be missing.
437
438        :param dict tweet_error: Tweet error object from the Twitter API
439        :return dict:  A tweet object with the relevant fields sanitised
440        """
441        modified_tweet = tweet_error
442        modified_tweet['id'] = tweet_error.get('resource_id')
443        modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
444        modified_tweet['text'] = ''
445        modified_tweet['author_user'] = {}
446        modified_tweet['author_user']['name'] = 'UNKNOWN'
447        modified_tweet['author_user']['username'] = 'UNKNOWN'
448        modified_tweet['author_id'] = 'UNKNOWN'
449        modified_tweet['public_metrics'] = {}
450
451        # putting detail info in 'subject' field which is normally blank for tweets
452        modified_tweet['subject'] = tweet_error.get('detail')
453
454        return modified_tweet
455
456    @classmethod
457    def get_options(cls, parent_dataset=None, config=None):
458        """
459        Get Twitter data source options
460
461        These are somewhat dynamic, because depending on settings users may or
462        may not need to provide their own API key, and may or may not be able
463        to enter a list of tweet IDs as their query. Hence the method.
464
465        :param config:
466        :param parent_dataset:  Should always be None
467        :return dict:  Data source options
468        """
469        have_api_key = config.get("twitterv2-search.academic_api_key")
470        max_tweets = config.get("twitterv2-search.max_tweets")
471
472        if have_api_key:
473            intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve "
474                          "historic tweets that match a given query.")
475
476        else:
477            intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the "
478                          "it, you must have access  to the Research track of the X API. You will need to provide a "
479                          "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The "
480                          "bearer token **will be sent to the 4CAT server**, where it will be deleted after data "
481                          "collection has started. Note that any posts retrieved with 4CAT will count towards your "
482                          "monthly post retrieval cap.")
483
484        intro_text += ("\n\nPlease refer to the [X API documentation]("
485                          "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) "
486                          "documentation for more information about this API endpoint and the syntax you can use in your "
487                          "search query. Retweets are included by default; add `-is:retweet` to exclude them.")
488
489        options = {
490            "intro-1": {
491                "type": UserInput.OPTION_INFO,
492                "help": intro_text
493            },
494        }
495
496        if not have_api_key:
497            # options.update({
498            #     "api_type": {
499            #         "type": UserInput.OPTION_CHOICE,
500            #         "help": "API track",
501            #         "options": {
502            #             "all": "Research API: Full-archive search",
503            #             "recent": "Standard: Recent search (Tweets published in last 7 days)",
504            #         },
505            #         "default": "all"
506            #     }
507            # })
508            options.update({
509                "api_bearer_token": {
510                    "type": UserInput.OPTION_TEXT,
511                    "sensitive": True,
512                    "cache": True,
513                    "help": "API Bearer Token"
514                },
515            })
516
517        if config.get("twitterv2.id_lookup"):
518            options.update({
519                "query_type": {
520                    "type": UserInput.OPTION_CHOICE,
521                    "help": "Query type",
522                    "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup",
523                    "options": {
524                        "query": "Search query",
525                        "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)",
526                    },
527                    "default": "query"
528                }
529            })
530
531        options.update({
532            "query": {
533                "type": UserInput.OPTION_TEXT_LARGE,
534                "help": "Query"
535            },
536            "amount": {
537                "type": UserInput.OPTION_TEXT,
538                "help": "Posts to retrieve",
539                "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)),
540                "min": 0,
541                "max": max_tweets if max_tweets else 10_000_000,
542                "default": 10
543            },
544            "divider-2": {
545                "type": UserInput.OPTION_DIVIDER
546            },
547            "daterange-info": {
548                "type": UserInput.OPTION_INFO,
549                "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you "
550                        "need to explicitly set a date range."
551            },
552            "daterange": {
553                "type": UserInput.OPTION_DATERANGE,
554                "help": "Date range"
555            },
556        })
557
558        return options
559
560    @staticmethod
561    def validate_query(query, request, config):
562        """
563        Validate input for a dataset query on the Twitter data source.
564
565        Will raise a QueryParametersException if invalid parameters are
566        encountered. Parameters are additionally sanitised.
567
568        Will also raise a QueryNeedsExplicitConfirmation if the 'counts'
569        endpoint of the Twitter API indicates that it will take more than
570        30 minutes to collect the dataset. In the front-end, this will
571        trigger a warning and confirmation request.
572
573        :param dict query:  Query parameters, from client-side.
574        :param request:  Flask request
575        :param ConfigManager|None config:  Configuration reader (context-aware)
576        :return dict:  Safe query parameters
577        """
578        have_api_key = config.get("twitterv2-search.academic_api_key")
579        max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000)
580
581        # this is the bare minimum, else we can't narrow down the full data set
582        if not query.get("query", None):
583            raise QueryParametersException("Please provide a query.")
584
585        if not have_api_key:
586            if not query.get("api_bearer_token", None):
587                raise QueryParametersException("Please provide a valid bearer token.")
588
589        if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup":
590            raise QueryParametersException("X API queries cannot be longer than 1024 characters.")
591
592        if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"):
593            # reformat queries to be a comma-separated list with no wrapping
594            # whitespace
595            whitespace = re.compile(r"\s+")
596            items = whitespace.sub("", query.get("query").replace("\n", ","))
597            # eliminate empty queries
598            twitter_query = ','.join([item for item in items.split(",") if item])
599        else:
600            twitter_query = query.get("query")
601
602        # the dates need to make sense as a range to search within
603        # but, on Twitter, you can also specify before *or* after only
604        after, before = query.get("daterange")
605        if before and after and before < after:
606            raise QueryParametersException("Date range must start before it ends")
607
608        # if we made it this far, the query can be executed
609        params = {
610            "query": twitter_query,
611            "api_bearer_token": query.get("api_bearer_token"),
612            "api_type": query.get("api_type", "all"),
613            "query_type": query.get("query_type", "query"),
614            "min_date": after,
615            "max_date": before
616        }
617
618        # never query more tweets than allowed
619        tweets_to_collect = convert_to_int(query.get("amount"), 10)
620
621        if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0):
622            tweets_to_collect = max_tweets
623        params["amount"] = tweets_to_collect
624
625        # figure out how many tweets we expect to get back - we can use this
626        # to dissuade users from running huge queries that will take forever
627        # to process
628        if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key):
629            count_url = "https://api.x.com/2/tweets/counts/all"
630            count_params = {
631                "granularity": "day",
632                "query": params["query"],
633            }
634
635            # if we're doing a date range, pass this on to the counts endpoint in
636            # the right format
637            if after:
638                count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ")
639
640            if before:
641                count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ")
642
643            bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key
644
645            expected_tweets = 0
646            while True:
647                response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token},
648                                        timeout=15)
649                if response.status_code == 200:
650                    try:
651                        # figure out how many tweets there are and estimate how much
652                        # time it will take to process them. if it's going to take
653                        # longer than half an hour, warn the user
654                        expected_tweets += int(response.json()["meta"]["total_tweet_count"])
655                    except KeyError:
656                        # no harm done, we just don't know how many tweets will be
657                        # returned (but they will still be returned)
658                        break
659
660                    if "next_token" not in response.json().get("meta", {}):
661                        break
662                    else:
663                        count_params["next_token"] = response.json()["meta"]["next_token"]
664
665                elif response.status_code == 401:
666                    raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid "
667                                                   "for the Research track of the X API.")
668
669                elif response.status_code == 400:
670                    raise QueryParametersException("Your query is invalid. Please make sure the date range does not "
671                                                   "extend into the future, or to before Twitter's founding, and that "
672                                                   "your query is shorter than 1024 characters. Using AND in the query "
673                                                   "is not possible (AND is implied; OR can be used). Use \"and\" to "
674                                                   "search for the literal word.")
675
676                else:
677                    # we can still continue without the expected tweets
678                    break
679
680            warning = ""
681            if expected_tweets:
682                collectible_tweets = min(max_tweets, params["amount"])
683                if collectible_tweets == 0:
684                    collectible_tweets = max_tweets
685
686                if collectible_tweets > 0:
687                    if collectible_tweets < expected_tweets:
688                        warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets)
689                    real_expected_tweets = min(expected_tweets, collectible_tweets)
690                else:
691                    real_expected_tweets = expected_tweets
692
693                expected_seconds = int(real_expected_tweets / 30)  # seems to be about this
694                expected_time = timify(expected_seconds)
695                params["expected-tweets"] = expected_tweets
696
697                if expected_seconds > 900:
698                    warning += ". Collection will take approximately %s." % expected_time
699
700            if warning and not query.get("frontend-confirm"):
701                warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning)
702                warning += " Do you want to continue?"
703                raise QueryNeedsExplicitConfirmationException(warning)
704
705            params["amount"] = min(params["amount"], expected_tweets)
706            if max_tweets:
707                params["amount"] = min(max_tweets, params["amount"])
708
709        return params
710
711    @staticmethod
712    def map_item(item):
713        """
714        Map a nested Tweet object to a flat dictionary
715
716        Tweet objects are quite rich but 4CAT expects flat dictionaries per
717        item in many cases. Since it would be a shame to not store the data
718        retrieved from Twitter that cannot be stored in a flat file, we store
719        the full objects and only map them to a flat dictionary when needed.
720        This has a speed (and disk space) penalty, but makes sure we don't
721        throw away valuable data and allows for later changes that e.g. store
722        the tweets more efficiently as a MongoDB collection.
723
724        :param item:  Tweet object as originally returned by the Twitter API
725        :return dict:  Dictionary in the format expected by 4CAT
726        """
727        tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z")
728
729        # For backward compatibility
730        author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"]
731        author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"]
732        author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else ""
733
734        hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])]
735        mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])]
736        urls = [tag.get("expanded_url", tag.get("display_url")) for tag in item.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)]
737        images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
738        video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
739
740        # by default, the text of retweets is returned as "RT [excerpt of
741        # retweeted tweet]". Since we have the full tweet text, we can complete
742        # the excerpt:
743        is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])])
744        if is_retweet:
745            retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0]
746            if retweeted_tweet.get("text", False):
747                retweeted_body = retweeted_tweet.get("text")
748                # Get user's username that was retweeted
749                if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'):
750                    item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body
751                elif item.get('entities', {}).get('mentions', []):
752                    # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted
753                    retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')]
754                    if retweeting_users:
755                        # should only ever be one, but this verifies that there IS one and not NONE
756                        item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body
757
758            retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched
759
760            # Retweet entities are only included in the retweet if they occur in the first 140 characters
761            # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense
762            [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])]
763            [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])]
764            [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)]
765            # Images appear to be inheritted by retweets, but just in case
766            [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
767            [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
768
769        is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])])
770        is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])])
771
772        videos = []
773        for video in video_items:
774            variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True)
775            if variants:
776                videos.append(variants[0].get('url'))
777
778        expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"}
779        public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics}
780        missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]]
781        warning = ""
782        if missing_metrics:
783            warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports."
784
785        return MappedItem({
786            "id": item["id"],
787            "thread_id": item.get("conversation_id", item["id"]),
788            "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"),
789            "unix_timestamp": int(tweet_time.timestamp()),
790            'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')),
791            "subject": "",
792            "body": item["text"],
793            "author": author_username,
794            "author_fullname": author_fullname,
795            "author_id": item["author_id"],
796            "author_followers": author_followers,
797            "source": item.get("source"),
798            "language_guess": item.get("lang"),
799            "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no",
800            **public_metrics,
801            "is_retweet": "yes" if is_retweet else "no",
802            "retweeted_user": "" if not is_retweet else retweeted_user,
803            "is_quote_tweet": "yes" if is_quoted else "no",
804            "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""),
805            "is_reply": "yes" if is_reply else "no",
806            "replied_user": item.get("in_reply_to_user", {}).get("username", ""),
807            "hashtags": ','.join(set(hashtags)),
808            "urls": ','.join(set(urls)),
809            "images": ','.join(set(images)),
810            "videos": ','.join(set(videos)),
811            "mentions": ','.join(set(mentions)),
812            "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]),
813            'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''),
814        }, message=warning)
class SearchWithTwitterAPIv2(backend.lib.search.Search):
 18class SearchWithTwitterAPIv2(Search):
 19    """
 20    Get Tweets via the X API
 21    """
 22    type = "twitterv2-search"  # job ID
 23    title = "X/Twitter API (v2)"
 24    extension = "ndjson"
 25    is_local = False    # Whether this datasource is locally scraped
 26    is_static = False   # Whether this datasource is still updated
 27
 28    previous_request = 0
 29    import_issues = True
 30
 31    references = [
 32        "[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)"
 33    ]
 34
 35    config = {
 36        "twitterv2-search.academic_api_key": {
 37            "type": UserInput.OPTION_TEXT,
 38            "default": "",
 39            "help": "Research API Key",
 40            "tooltip": "An API key for the X/Twitter v2 Research API. If "
 41                       "provided, the user will not need to enter their own "
 42                       "key to retrieve tweets. Note that this API key should "
 43                       "have access to the Full Archive Search endpoint."
 44        },
 45        "twitterv2-search.max_tweets": {
 46            "type": UserInput.OPTION_TEXT,
 47            "default": 0,
 48            "min": 0,
 49            "max": 10_000_000,
 50            "help": "Max posts per dataset",
 51            "tooltip": "4CAT will never retrieve more than this amount of "
 52                       "posts per dataset. Enter '0' for unlimited posts."
 53        },
 54        "twitterv2-search.id_lookup": {
 55            "type": UserInput.OPTION_TOGGLE,
 56            "default": False,
 57            "help": "Allow lookup by ID",
 58            "tooltip": "If enabled, allow users to enter a list of post IDs "
 59                       "to retrieve. This is disabled by default because it "
 60                       "can be confusing to novice users."
 61        }
 62    }
 63
 64    def get_items(self, query):
 65        """
 66        Use the Twitter v2 API historical search to get tweets
 67
 68        :param query:
 69        :return:
 70        """
 71        # Compile any errors to highlight at end of log
 72        error_report = []
 73        # this is pretty sensitive so delete it immediately after storing in
 74        # memory
 75        have_api_key = self.config.get("twitterv2-search.academic_api_key")
 76        bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key
 77        api_type = query.get("api_type", "all") if not have_api_key else "all"
 78        auth = {"Authorization": "Bearer %s" % bearer_token}
 79        expected_tweets = query.get("expected-tweets", "unknown")
 80
 81        # these are all expansions and fields available at the time of writing
 82        # since it does not cost anything extra in terms of rate limiting, go
 83        # for as much data per tweet as possible...
 84        tweet_fields = (
 85        "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id",
 86        "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings",
 87        "source", "text", "withheld")
 88        user_fields = (
 89        "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url",
 90        "protected", "public_metrics", "url", "username", "verified", "withheld")
 91        place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type")
 92        poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status")
 93        expansions = (
 94        "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id",
 95        "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id")
 96        media_fields = (
 97        "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants",
 98        "alt_text")
 99
100        params = {
101            "expansions": ",".join(expansions),
102            "tweet.fields": ",".join(tweet_fields),
103            "user.fields": ",".join(user_fields),
104            "poll.fields": ",".join(poll_fields),
105            "place.fields": ",".join(place_fields),
106            "media.fields": ",".join(media_fields),
107        }
108
109        if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
110            endpoint = "https://api.x.com/2/tweets"
111
112            tweet_ids = self.parameters.get("query", []).split(',')
113
114            # Only can lookup 100 tweets in each query per Twitter API
115            chunk_size = 100
116            queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)]
117            expected_tweets = len(tweet_ids)
118
119            amount = len(tweet_ids)
120
121            # Initiate collection of any IDs that are unavailable
122            collected_errors = []
123
124        else:
125            # Query to all or search
126            endpoint = "https://api.x.com/2/tweets/search/" + api_type
127
128            queries = [self.parameters.get("query", "")]
129
130            amount = convert_to_int(self.parameters.get("amount"), 10)
131
132            params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100  # 100 = upper limit, 10 = lower
133
134            if self.parameters.get("min_date"):
135                params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime(
136                    "%Y-%m-%dT%H:%M:%SZ")
137
138            if self.parameters.get("max_date"):
139                params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime(
140                    "%Y-%m-%dT%H:%M:%SZ")
141
142        if type(expected_tweets) is int:
143            num_expected_tweets = expected_tweets
144            expected_tweets = "{:,}".format(expected_tweets)
145        else:
146            num_expected_tweets = None
147
148        tweets = 0
149        for query in queries:
150            if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
151                params['ids'] = query
152            else:
153                params['query'] = query
154            self.dataset.log("Search parameters: %s" % repr(params))
155            while True:
156
157                if self.interrupted:
158                    raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API")
159
160                # there is a limit of one request per second, so stay on the safe side of this
161                while self.previous_request == int(time.time()):
162                    time.sleep(0.1)
163                time.sleep(0.05)
164                self.previous_request = int(time.time())
165
166                # now send the request, allowing for at least 5 retries if the connection seems unstable
167                retries = 5
168                api_response = None
169                while retries > 0:
170                    try:
171                        api_response = requests.get(endpoint, headers=auth, params=params, timeout=30)
172                        break
173                    except (ConnectionError, requests.exceptions.RequestException) as e:
174                        retries -= 1
175                        wait_time = (5 - retries) * 10
176                        self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time))
177                        time.sleep(wait_time)
178
179                # rate limited - the limit at time of writing is 300 reqs per 15
180                # minutes
181                # usually you don't hit this when requesting batches of 500 at
182                # 1/second, but this is also returned when the user reaches the
183                # monthly tweet cap, albeit with different content in that case
184                if api_response.status_code == 429:
185                    try:
186                        structured_response = api_response.json()
187                        if structured_response.get("title") == "UsageCapExceeded":
188                            self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts "
189                                                       "until your API quota resets. Dataset completed with posts "
190                                                       "collected so far.", is_final=True)
191                            return
192                    except (json.JSONDecodeError, ValueError):
193                        self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting "
194                                                   "post collection.", is_final=True)
195                        return
196
197                    resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1
198                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
199                    self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str)
200                    while time.time() <= resume_at:
201                        if self.interrupted:
202                            raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset")
203                        time.sleep(0.5)
204                    continue
205
206                # API keys that are valid but don't have access or haven't been
207                # activated properly get a 403
208                elif api_response.status_code == 403:
209                    try:
210                        structured_response = api_response.json()
211                        self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API "
212                                                   "with this API key. %s" % structured_response.get("detail", ""), is_final=True)
213                    except (json.JSONDecodeError, ValueError):
214                        self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to "
215                                                   "the full-archive search endpoint.", is_final=True)
216                    finally:
217                        return
218
219                # sometimes twitter says '503 service unavailable' for unclear
220                # reasons - in that case just wait a while and try again
221                elif api_response.status_code in (502, 503, 504):
222                    resume_at = time.time() + 60
223                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
224                    self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % (
225                    api_response.status_code, resume_at_str))
226                    while time.time() <= resume_at:
227                        time.sleep(0.5)
228                    continue
229
230                # this usually means the query is too long or otherwise contains
231                # a syntax error
232                elif api_response.status_code == 400:
233                    msg = "Response %i from the X API; " % api_response.status_code
234                    try:
235                        api_response = api_response.json()
236                        msg += api_response.get("title", "")
237                        if "detail" in api_response:
238                            msg += ": " + api_response.get("detail", "")
239                    except (json.JSONDecodeError, TypeError):
240                        msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long."
241
242                    self.dataset.update_status(msg, is_final=True)
243                    return
244
245                # invalid API key
246                elif api_response.status_code == 401:
247                    self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True)
248                    return
249
250                # haven't seen one yet, but they probably exist
251                elif api_response.status_code != 200:
252                    self.dataset.update_status(
253                        "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True)
254                    self.log.warning("X API v2 responded with status code %i. Response body: %s" % (
255                    api_response.status_code, api_response.text))
256                    return
257
258                elif not api_response:
259                    self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True)
260                    return
261
262                api_response = api_response.json()
263
264                # The API response contains tweets (of course) and 'includes',
265                # objects that can be referenced in tweets. Later we will splice
266                # this data into the tweets themselves to make them easier to
267                # process. So extract them first...
268                included_users = api_response.get("includes", {}).get("users", {})
269                included_media = api_response.get("includes", {}).get("media", {})
270                included_polls = api_response.get("includes", {}).get("polls", {})
271                included_tweets = api_response.get("includes", {}).get("tweets", {})
272                included_places = api_response.get("includes", {}).get("places", {})
273
274                # Collect missing objects from Twitter API response by type
275                missing_objects = {}
276                for missing_object in api_response.get("errors", {}):
277                    parameter_type = missing_object.get('resource_type', 'unknown')
278                    if parameter_type in missing_objects:
279                        missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object
280                    else:
281                        missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object}
282                num_missing_objects = sum([len(v) for v in missing_objects.values()])
283
284                # Record any missing objects in log
285                if num_missing_objects > 0:
286                    # Log amount
287                    self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
288                if num_missing_objects > 50:
289                    # Large amount of missing objects; possible error with Twitter API
290                    self.import_issues = False
291                    error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets))
292                    error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
293
294                # Warn if new missing object is recorded (for developers to handle)
295                expected_error_types = ['user', 'media', 'poll', 'tweet', 'place']
296                if any(key not in expected_error_types for key in missing_objects.keys()):
297                    self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types]))
298
299                # Loop through and collect tweets
300                for tweet in api_response.get("data", []):
301
302                    if 0 < amount <= tweets:
303                        break
304
305                    # splice referenced data back in
306                    # we use copy.deepcopy here because else we run into a
307                    # pass-by-reference quagmire
308                    tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects)
309
310                    tweets += 1
311                    if tweets % 500 == 0:
312                        self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets))
313                        if num_expected_tweets is not None:
314                            self.dataset.update_progress(tweets / num_expected_tweets)
315
316                    yield tweet
317
318                if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
319                    # If id_lookup return errors in collecting tweets
320                    for tweet_error in api_response.get("errors", []):
321                        tweet_id = str(tweet_error.get('resource_id'))
322                        if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors:
323                            tweet_error = self.fix_tweet_error(tweet_error)
324                            collected_errors.append(tweet_id)
325                            yield tweet_error
326
327                # paginate
328                if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]:
329                    params["next_token"] = api_response["meta"]["next_token"]
330                else:
331                    break
332
333        if not self.import_issues:
334            self.dataset.log('Error Report:\n' + '\n'.join(error_report))
335            self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)
336
337    def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
338        """
339        Enrich tweet with user and attachment metadata
340
341        Twitter API returns some of the tweet's metadata separately, as
342        'includes' that can be cross-referenced with a user ID or media key.
343        This makes sense to conserve bandwidth, but also means tweets are not
344        'standalone' objects as originally returned.
345
346        However, for processing, making them standalone greatly reduces
347        complexity, as we can simply read one single tweet object and process
348        that data without worrying about having to get other data from
349        elsewhere. So this method takes the metadata and the original tweet,
350        splices the metadata into it where appropriate, and returns the
351        enriched object.
352
353        **This is not an efficient way to store things** but it is more
354        convenient.
355
356        :param dict tweet:  The tweet object
357        :param list users:  User metadata, as a list of user objects
358        :param list media:  Media metadata, as a list of media objects
359        :param list polls:  Poll metadata, as a list of poll objects
360        :param list places:  Place metadata, as a list of place objects
361        :param list referenced_tweets:  Tweets referenced in the tweet, as a
362        list of tweet objects. These will be enriched in turn.
363        :param dict missing_objects: Dictionary with data on missing objects
364                from the API by type.
365
366        :return dict:  Enriched tweet object
367        """
368        # Copy the tweet so that updating this tweet has no effect on others
369        tweet = copy.deepcopy(tweet)
370        # first create temporary mappings so we can easily find the relevant
371        # object later
372        users_by_id = {user["id"]: user for user in users}
373        users_by_name = {user["username"]: user for user in users}
374        media_by_key = {item["media_key"]: item for item in media}
375        polls_by_id = {poll["id"]: poll for poll in polls}
376        places_by_id = {place["id"]: place for place in places}
377        tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets}
378
379        # add tweet author metadata
380        tweet["author_user"] = users_by_id.get(tweet["author_id"])
381
382        # add place to geo metadata
383        # referenced_tweets also contain place_id, but these places may not included in the place objects
384        if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id:
385            tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id"))
386        elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}):
387            tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {})
388
389        # add user metadata for mentioned users
390        for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])):
391            if mention["username"] in users_by_name:
392                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])}
393            # missing users can be stored by either user ID or Username in Twitter API's error data; we check both
394            elif mention["username"] in missing_objects.get('user', {}):
395                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}}
396            elif mention["id"] in missing_objects.get('user', {}):
397                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}}
398
399
400        # add poll metadata
401        for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])):
402            if poll_id in polls_by_id:
403                tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id]
404            elif poll_id in missing_objects.get('poll', {}):
405                tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]}
406
407        # add media metadata - seems to be just the media type, the media URL
408        # etc is stored in the 'entities' attribute instead
409        for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])):
410            if media_key in media_by_key:
411                tweet["attachments"]["media_keys"][index] = media_by_key[media_key]
412            elif media_key in missing_objects.get('media', {}):
413                tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]}
414
415        # replied-to user metadata
416        if "in_reply_to_user_id" in tweet:
417            if tweet["in_reply_to_user_id"] in users_by_id:
418                tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]]
419            elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}):
420                tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]}
421
422        # enrich referenced tweets. Even though there should be no recursion -
423        # since tweets cannot be edited - we do not recursively enrich
424        # referenced tweets (should we?)
425        for index, reference in enumerate(tweet.get("referenced_tweets", [])):
426            if reference["id"] in tweets_by_id:
427                tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)}
428            elif reference["id"] in missing_objects.get('tweet', {}):
429                tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}}
430
431        return tweet
432
433    def fix_tweet_error(self, tweet_error):
434        """
435        Add fields as needed by map_tweet and other functions for errors as they
436        do not conform to normal tweet fields. Specifically for ID Lookup as
437        complete tweet could be missing.
438
439        :param dict tweet_error: Tweet error object from the Twitter API
440        :return dict:  A tweet object with the relevant fields sanitised
441        """
442        modified_tweet = tweet_error
443        modified_tweet['id'] = tweet_error.get('resource_id')
444        modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
445        modified_tweet['text'] = ''
446        modified_tweet['author_user'] = {}
447        modified_tweet['author_user']['name'] = 'UNKNOWN'
448        modified_tweet['author_user']['username'] = 'UNKNOWN'
449        modified_tweet['author_id'] = 'UNKNOWN'
450        modified_tweet['public_metrics'] = {}
451
452        # putting detail info in 'subject' field which is normally blank for tweets
453        modified_tweet['subject'] = tweet_error.get('detail')
454
455        return modified_tweet
456
457    @classmethod
458    def get_options(cls, parent_dataset=None, config=None):
459        """
460        Get Twitter data source options
461
462        These are somewhat dynamic, because depending on settings users may or
463        may not need to provide their own API key, and may or may not be able
464        to enter a list of tweet IDs as their query. Hence the method.
465
466        :param config:
467        :param parent_dataset:  Should always be None
468        :return dict:  Data source options
469        """
470        have_api_key = config.get("twitterv2-search.academic_api_key")
471        max_tweets = config.get("twitterv2-search.max_tweets")
472
473        if have_api_key:
474            intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve "
475                          "historic tweets that match a given query.")
476
477        else:
478            intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the "
479                          "it, you must have access  to the Research track of the X API. You will need to provide a "
480                          "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The "
481                          "bearer token **will be sent to the 4CAT server**, where it will be deleted after data "
482                          "collection has started. Note that any posts retrieved with 4CAT will count towards your "
483                          "monthly post retrieval cap.")
484
485        intro_text += ("\n\nPlease refer to the [X API documentation]("
486                          "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) "
487                          "documentation for more information about this API endpoint and the syntax you can use in your "
488                          "search query. Retweets are included by default; add `-is:retweet` to exclude them.")
489
490        options = {
491            "intro-1": {
492                "type": UserInput.OPTION_INFO,
493                "help": intro_text
494            },
495        }
496
497        if not have_api_key:
498            # options.update({
499            #     "api_type": {
500            #         "type": UserInput.OPTION_CHOICE,
501            #         "help": "API track",
502            #         "options": {
503            #             "all": "Research API: Full-archive search",
504            #             "recent": "Standard: Recent search (Tweets published in last 7 days)",
505            #         },
506            #         "default": "all"
507            #     }
508            # })
509            options.update({
510                "api_bearer_token": {
511                    "type": UserInput.OPTION_TEXT,
512                    "sensitive": True,
513                    "cache": True,
514                    "help": "API Bearer Token"
515                },
516            })
517
518        if config.get("twitterv2.id_lookup"):
519            options.update({
520                "query_type": {
521                    "type": UserInput.OPTION_CHOICE,
522                    "help": "Query type",
523                    "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup",
524                    "options": {
525                        "query": "Search query",
526                        "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)",
527                    },
528                    "default": "query"
529                }
530            })
531
532        options.update({
533            "query": {
534                "type": UserInput.OPTION_TEXT_LARGE,
535                "help": "Query"
536            },
537            "amount": {
538                "type": UserInput.OPTION_TEXT,
539                "help": "Posts to retrieve",
540                "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)),
541                "min": 0,
542                "max": max_tweets if max_tweets else 10_000_000,
543                "default": 10
544            },
545            "divider-2": {
546                "type": UserInput.OPTION_DIVIDER
547            },
548            "daterange-info": {
549                "type": UserInput.OPTION_INFO,
550                "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you "
551                        "need to explicitly set a date range."
552            },
553            "daterange": {
554                "type": UserInput.OPTION_DATERANGE,
555                "help": "Date range"
556            },
557        })
558
559        return options
560
561    @staticmethod
562    def validate_query(query, request, config):
563        """
564        Validate input for a dataset query on the Twitter data source.
565
566        Will raise a QueryParametersException if invalid parameters are
567        encountered. Parameters are additionally sanitised.
568
569        Will also raise a QueryNeedsExplicitConfirmation if the 'counts'
570        endpoint of the Twitter API indicates that it will take more than
571        30 minutes to collect the dataset. In the front-end, this will
572        trigger a warning and confirmation request.
573
574        :param dict query:  Query parameters, from client-side.
575        :param request:  Flask request
576        :param ConfigManager|None config:  Configuration reader (context-aware)
577        :return dict:  Safe query parameters
578        """
579        have_api_key = config.get("twitterv2-search.academic_api_key")
580        max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000)
581
582        # this is the bare minimum, else we can't narrow down the full data set
583        if not query.get("query", None):
584            raise QueryParametersException("Please provide a query.")
585
586        if not have_api_key:
587            if not query.get("api_bearer_token", None):
588                raise QueryParametersException("Please provide a valid bearer token.")
589
590        if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup":
591            raise QueryParametersException("X API queries cannot be longer than 1024 characters.")
592
593        if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"):
594            # reformat queries to be a comma-separated list with no wrapping
595            # whitespace
596            whitespace = re.compile(r"\s+")
597            items = whitespace.sub("", query.get("query").replace("\n", ","))
598            # eliminate empty queries
599            twitter_query = ','.join([item for item in items.split(",") if item])
600        else:
601            twitter_query = query.get("query")
602
603        # the dates need to make sense as a range to search within
604        # but, on Twitter, you can also specify before *or* after only
605        after, before = query.get("daterange")
606        if before and after and before < after:
607            raise QueryParametersException("Date range must start before it ends")
608
609        # if we made it this far, the query can be executed
610        params = {
611            "query": twitter_query,
612            "api_bearer_token": query.get("api_bearer_token"),
613            "api_type": query.get("api_type", "all"),
614            "query_type": query.get("query_type", "query"),
615            "min_date": after,
616            "max_date": before
617        }
618
619        # never query more tweets than allowed
620        tweets_to_collect = convert_to_int(query.get("amount"), 10)
621
622        if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0):
623            tweets_to_collect = max_tweets
624        params["amount"] = tweets_to_collect
625
626        # figure out how many tweets we expect to get back - we can use this
627        # to dissuade users from running huge queries that will take forever
628        # to process
629        if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key):
630            count_url = "https://api.x.com/2/tweets/counts/all"
631            count_params = {
632                "granularity": "day",
633                "query": params["query"],
634            }
635
636            # if we're doing a date range, pass this on to the counts endpoint in
637            # the right format
638            if after:
639                count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ")
640
641            if before:
642                count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ")
643
644            bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key
645
646            expected_tweets = 0
647            while True:
648                response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token},
649                                        timeout=15)
650                if response.status_code == 200:
651                    try:
652                        # figure out how many tweets there are and estimate how much
653                        # time it will take to process them. if it's going to take
654                        # longer than half an hour, warn the user
655                        expected_tweets += int(response.json()["meta"]["total_tweet_count"])
656                    except KeyError:
657                        # no harm done, we just don't know how many tweets will be
658                        # returned (but they will still be returned)
659                        break
660
661                    if "next_token" not in response.json().get("meta", {}):
662                        break
663                    else:
664                        count_params["next_token"] = response.json()["meta"]["next_token"]
665
666                elif response.status_code == 401:
667                    raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid "
668                                                   "for the Research track of the X API.")
669
670                elif response.status_code == 400:
671                    raise QueryParametersException("Your query is invalid. Please make sure the date range does not "
672                                                   "extend into the future, or to before Twitter's founding, and that "
673                                                   "your query is shorter than 1024 characters. Using AND in the query "
674                                                   "is not possible (AND is implied; OR can be used). Use \"and\" to "
675                                                   "search for the literal word.")
676
677                else:
678                    # we can still continue without the expected tweets
679                    break
680
681            warning = ""
682            if expected_tweets:
683                collectible_tweets = min(max_tweets, params["amount"])
684                if collectible_tweets == 0:
685                    collectible_tweets = max_tweets
686
687                if collectible_tweets > 0:
688                    if collectible_tweets < expected_tweets:
689                        warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets)
690                    real_expected_tweets = min(expected_tweets, collectible_tweets)
691                else:
692                    real_expected_tweets = expected_tweets
693
694                expected_seconds = int(real_expected_tweets / 30)  # seems to be about this
695                expected_time = timify(expected_seconds)
696                params["expected-tweets"] = expected_tweets
697
698                if expected_seconds > 900:
699                    warning += ". Collection will take approximately %s." % expected_time
700
701            if warning and not query.get("frontend-confirm"):
702                warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning)
703                warning += " Do you want to continue?"
704                raise QueryNeedsExplicitConfirmationException(warning)
705
706            params["amount"] = min(params["amount"], expected_tweets)
707            if max_tweets:
708                params["amount"] = min(max_tweets, params["amount"])
709
710        return params
711
712    @staticmethod
713    def map_item(item):
714        """
715        Map a nested Tweet object to a flat dictionary
716
717        Tweet objects are quite rich but 4CAT expects flat dictionaries per
718        item in many cases. Since it would be a shame to not store the data
719        retrieved from Twitter that cannot be stored in a flat file, we store
720        the full objects and only map them to a flat dictionary when needed.
721        This has a speed (and disk space) penalty, but makes sure we don't
722        throw away valuable data and allows for later changes that e.g. store
723        the tweets more efficiently as a MongoDB collection.
724
725        :param item:  Tweet object as originally returned by the Twitter API
726        :return dict:  Dictionary in the format expected by 4CAT
727        """
728        tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z")
729
730        # For backward compatibility
731        author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"]
732        author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"]
733        author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else ""
734
735        hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])]
736        mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])]
737        urls = [tag.get("expanded_url", tag.get("display_url")) for tag in item.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)]
738        images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
739        video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
740
741        # by default, the text of retweets is returned as "RT [excerpt of
742        # retweeted tweet]". Since we have the full tweet text, we can complete
743        # the excerpt:
744        is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])])
745        if is_retweet:
746            retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0]
747            if retweeted_tweet.get("text", False):
748                retweeted_body = retweeted_tweet.get("text")
749                # Get user's username that was retweeted
750                if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'):
751                    item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body
752                elif item.get('entities', {}).get('mentions', []):
753                    # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted
754                    retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')]
755                    if retweeting_users:
756                        # should only ever be one, but this verifies that there IS one and not NONE
757                        item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body
758
759            retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched
760
761            # Retweet entities are only included in the retweet if they occur in the first 140 characters
762            # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense
763            [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])]
764            [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])]
765            [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)]
766            # Images appear to be inheritted by retweets, but just in case
767            [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
768            [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
769
770        is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])])
771        is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])])
772
773        videos = []
774        for video in video_items:
775            variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True)
776            if variants:
777                videos.append(variants[0].get('url'))
778
779        expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"}
780        public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics}
781        missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]]
782        warning = ""
783        if missing_metrics:
784            warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports."
785
786        return MappedItem({
787            "id": item["id"],
788            "thread_id": item.get("conversation_id", item["id"]),
789            "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"),
790            "unix_timestamp": int(tweet_time.timestamp()),
791            'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')),
792            "subject": "",
793            "body": item["text"],
794            "author": author_username,
795            "author_fullname": author_fullname,
796            "author_id": item["author_id"],
797            "author_followers": author_followers,
798            "source": item.get("source"),
799            "language_guess": item.get("lang"),
800            "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no",
801            **public_metrics,
802            "is_retweet": "yes" if is_retweet else "no",
803            "retweeted_user": "" if not is_retweet else retweeted_user,
804            "is_quote_tweet": "yes" if is_quoted else "no",
805            "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""),
806            "is_reply": "yes" if is_reply else "no",
807            "replied_user": item.get("in_reply_to_user", {}).get("username", ""),
808            "hashtags": ','.join(set(hashtags)),
809            "urls": ','.join(set(urls)),
810            "images": ','.join(set(images)),
811            "videos": ','.join(set(videos)),
812            "mentions": ','.join(set(mentions)),
813            "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]),
814            'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''),
815        }, message=warning)

Get Tweets via the X API

type = 'twitterv2-search'
title = 'X/Twitter API (v2)'
extension = 'ndjson'
is_local = False
is_static = False
previous_request = 0
import_issues = True
references = ['[X/Twitter API documentation](https://developer.x.com/en/docs/x-api)']
config = {'twitterv2-search.academic_api_key': {'type': 'string', 'default': '', 'help': 'Research API Key', 'tooltip': 'An API key for the X/Twitter v2 Research API. If provided, the user will not need to enter their own key to retrieve tweets. Note that this API key should have access to the Full Archive Search endpoint.'}, 'twitterv2-search.max_tweets': {'type': 'string', 'default': 0, 'min': 0, 'max': 10000000, 'help': 'Max posts per dataset', 'tooltip': "4CAT will never retrieve more than this amount of posts per dataset. Enter '0' for unlimited posts."}, 'twitterv2-search.id_lookup': {'type': 'toggle', 'default': False, 'help': 'Allow lookup by ID', 'tooltip': 'If enabled, allow users to enter a list of post IDs to retrieve. This is disabled by default because it can be confusing to novice users.'}}
def get_items(self, query):
 64    def get_items(self, query):
 65        """
 66        Use the Twitter v2 API historical search to get tweets
 67
 68        :param query:
 69        :return:
 70        """
 71        # Compile any errors to highlight at end of log
 72        error_report = []
 73        # this is pretty sensitive so delete it immediately after storing in
 74        # memory
 75        have_api_key = self.config.get("twitterv2-search.academic_api_key")
 76        bearer_token = self.parameters.get("api_bearer_token") if not have_api_key else have_api_key
 77        api_type = query.get("api_type", "all") if not have_api_key else "all"
 78        auth = {"Authorization": "Bearer %s" % bearer_token}
 79        expected_tweets = query.get("expected-tweets", "unknown")
 80
 81        # these are all expansions and fields available at the time of writing
 82        # since it does not cost anything extra in terms of rate limiting, go
 83        # for as much data per tweet as possible...
 84        tweet_fields = (
 85        "attachments", "author_id", "context_annotations", "conversation_id", "created_at", "entities", "geo", "id",
 86        "in_reply_to_user_id", "lang", "public_metrics", "possibly_sensitive", "referenced_tweets", "reply_settings",
 87        "source", "text", "withheld")
 88        user_fields = (
 89        "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id", "profile_image_url",
 90        "protected", "public_metrics", "url", "username", "verified", "withheld")
 91        place_fields = ("contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type")
 92        poll_fields = ("duration_minutes", "end_datetime", "id", "options", "voting_status")
 93        expansions = (
 94        "attachments.poll_ids", "attachments.media_keys", "author_id", "entities.mentions.username", "geo.place_id",
 95        "in_reply_to_user_id", "referenced_tweets.id", "referenced_tweets.id.author_id")
 96        media_fields = (
 97        "duration_ms", "height", "media_key", "preview_image_url", "public_metrics", "type", "url", "width", "variants",
 98        "alt_text")
 99
100        params = {
101            "expansions": ",".join(expansions),
102            "tweet.fields": ",".join(tweet_fields),
103            "user.fields": ",".join(user_fields),
104            "poll.fields": ",".join(poll_fields),
105            "place.fields": ",".join(place_fields),
106            "media.fields": ",".join(media_fields),
107        }
108
109        if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
110            endpoint = "https://api.x.com/2/tweets"
111
112            tweet_ids = self.parameters.get("query", []).split(',')
113
114            # Only can lookup 100 tweets in each query per Twitter API
115            chunk_size = 100
116            queries = [','.join(tweet_ids[i:i+chunk_size]) for i in range(0, len(tweet_ids), chunk_size)]
117            expected_tweets = len(tweet_ids)
118
119            amount = len(tweet_ids)
120
121            # Initiate collection of any IDs that are unavailable
122            collected_errors = []
123
124        else:
125            # Query to all or search
126            endpoint = "https://api.x.com/2/tweets/search/" + api_type
127
128            queries = [self.parameters.get("query", "")]
129
130            amount = convert_to_int(self.parameters.get("amount"), 10)
131
132            params['max_results'] = max(10, min(amount, 100)) if amount > 0 else 100  # 100 = upper limit, 10 = lower
133
134            if self.parameters.get("min_date"):
135                params["start_time"] = datetime.datetime.fromtimestamp(self.parameters["min_date"]).strftime(
136                    "%Y-%m-%dT%H:%M:%SZ")
137
138            if self.parameters.get("max_date"):
139                params["end_time"] = datetime.datetime.fromtimestamp(self.parameters["max_date"]).strftime(
140                    "%Y-%m-%dT%H:%M:%SZ")
141
142        if type(expected_tweets) is int:
143            num_expected_tweets = expected_tweets
144            expected_tweets = "{:,}".format(expected_tweets)
145        else:
146            num_expected_tweets = None
147
148        tweets = 0
149        for query in queries:
150            if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
151                params['ids'] = query
152            else:
153                params['query'] = query
154            self.dataset.log("Search parameters: %s" % repr(params))
155            while True:
156
157                if self.interrupted:
158                    raise ProcessorInterruptedException("Interrupted while getting posts from the Twitter API")
159
160                # there is a limit of one request per second, so stay on the safe side of this
161                while self.previous_request == int(time.time()):
162                    time.sleep(0.1)
163                time.sleep(0.05)
164                self.previous_request = int(time.time())
165
166                # now send the request, allowing for at least 5 retries if the connection seems unstable
167                retries = 5
168                api_response = None
169                while retries > 0:
170                    try:
171                        api_response = requests.get(endpoint, headers=auth, params=params, timeout=30)
172                        break
173                    except (ConnectionError, requests.exceptions.RequestException) as e:
174                        retries -= 1
175                        wait_time = (5 - retries) * 10
176                        self.dataset.update_status("Got %s, waiting %i seconds before retrying" % (str(e), wait_time))
177                        time.sleep(wait_time)
178
179                # rate limited - the limit at time of writing is 300 reqs per 15
180                # minutes
181                # usually you don't hit this when requesting batches of 500 at
182                # 1/second, but this is also returned when the user reaches the
183                # monthly tweet cap, albeit with different content in that case
184                if api_response.status_code == 429:
185                    try:
186                        structured_response = api_response.json()
187                        if structured_response.get("title") == "UsageCapExceeded":
188                            self.dataset.update_status("Hit the monthly post cap. You cannot capture more posts "
189                                                       "until your API quota resets. Dataset completed with posts "
190                                                       "collected so far.", is_final=True)
191                            return
192                    except (json.JSONDecodeError, ValueError):
193                        self.dataset.update_status("Hit X's rate limit, but could not figure out why. Halting "
194                                                   "post collection.", is_final=True)
195                        return
196
197                    resume_at = convert_to_int(api_response.headers["x-rate-limit-reset"]) + 1
198                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
199                    self.dataset.update_status("Hit X's rate limit - waiting until %s to continue." % resume_at_str)
200                    while time.time() <= resume_at:
201                        if self.interrupted:
202                            raise ProcessorInterruptedException("Interrupted while waiting for rate limit to reset")
203                        time.sleep(0.5)
204                    continue
205
206                # API keys that are valid but don't have access or haven't been
207                # activated properly get a 403
208                elif api_response.status_code == 403:
209                    try:
210                        structured_response = api_response.json()
211                        self.dataset.update_status("'Forbidden' error from the X API. Could not connect to X API "
212                                                   "with this API key. %s" % structured_response.get("detail", ""), is_final=True)
213                    except (json.JSONDecodeError, ValueError):
214                        self.dataset.update_status("'Forbidden' error from the X API. Your key may not have access to "
215                                                   "the full-archive search endpoint.", is_final=True)
216                    finally:
217                        return
218
219                # sometimes twitter says '503 service unavailable' for unclear
220                # reasons - in that case just wait a while and try again
221                elif api_response.status_code in (502, 503, 504):
222                    resume_at = time.time() + 60
223                    resume_at_str = datetime.datetime.fromtimestamp(int(resume_at)).strftime("%c")
224                    self.dataset.update_status("X unavailable (status %i) - waiting until %s to continue." % (
225                    api_response.status_code, resume_at_str))
226                    while time.time() <= resume_at:
227                        time.sleep(0.5)
228                    continue
229
230                # this usually means the query is too long or otherwise contains
231                # a syntax error
232                elif api_response.status_code == 400:
233                    msg = "Response %i from the X API; " % api_response.status_code
234                    try:
235                        api_response = api_response.json()
236                        msg += api_response.get("title", "")
237                        if "detail" in api_response:
238                            msg += ": " + api_response.get("detail", "")
239                    except (json.JSONDecodeError, TypeError):
240                        msg += "Some of your parameters (e.g. date range) may be invalid, or the query may be too long."
241
242                    self.dataset.update_status(msg, is_final=True)
243                    return
244
245                # invalid API key
246                elif api_response.status_code == 401:
247                    self.dataset.update_status("Invalid API key - could not connect to X API", is_final=True)
248                    return
249
250                # haven't seen one yet, but they probably exist
251                elif api_response.status_code != 200:
252                    self.dataset.update_status(
253                        "Unexpected HTTP status %i. Halting tweet collection." % api_response.status_code, is_final=True)
254                    self.log.warning("X API v2 responded with status code %i. Response body: %s" % (
255                    api_response.status_code, api_response.text))
256                    return
257
258                elif not api_response:
259                    self.dataset.update_status("Could not connect to X. Cancelling.", is_final=True)
260                    return
261
262                api_response = api_response.json()
263
264                # The API response contains tweets (of course) and 'includes',
265                # objects that can be referenced in tweets. Later we will splice
266                # this data into the tweets themselves to make them easier to
267                # process. So extract them first...
268                included_users = api_response.get("includes", {}).get("users", {})
269                included_media = api_response.get("includes", {}).get("media", {})
270                included_polls = api_response.get("includes", {}).get("polls", {})
271                included_tweets = api_response.get("includes", {}).get("tweets", {})
272                included_places = api_response.get("includes", {}).get("places", {})
273
274                # Collect missing objects from Twitter API response by type
275                missing_objects = {}
276                for missing_object in api_response.get("errors", {}):
277                    parameter_type = missing_object.get('resource_type', 'unknown')
278                    if parameter_type in missing_objects:
279                        missing_objects[parameter_type][missing_object.get('resource_id')] = missing_object
280                    else:
281                        missing_objects[parameter_type] = {missing_object.get('resource_id'): missing_object}
282                num_missing_objects = sum([len(v) for v in missing_objects.values()])
283
284                # Record any missing objects in log
285                if num_missing_objects > 0:
286                    # Log amount
287                    self.dataset.log('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
288                if num_missing_objects > 50:
289                    # Large amount of missing objects; possible error with Twitter API
290                    self.import_issues = False
291                    error_report.append('%i missing objects received following post number %i. Possible issue with X API.' % (num_missing_objects, tweets))
292                    error_report.append('Missing objects collected: ' + ', '.join(['%s: %s' % (k, len(v)) for k, v in missing_objects.items()]))
293
294                # Warn if new missing object is recorded (for developers to handle)
295                expected_error_types = ['user', 'media', 'poll', 'tweet', 'place']
296                if any(key not in expected_error_types for key in missing_objects.keys()):
297                    self.log.warning("X API v2 returned unknown error types: %s" % str([key for key in missing_objects.keys() if key not in expected_error_types]))
298
299                # Loop through and collect tweets
300                for tweet in api_response.get("data", []):
301
302                    if 0 < amount <= tweets:
303                        break
304
305                    # splice referenced data back in
306                    # we use copy.deepcopy here because else we run into a
307                    # pass-by-reference quagmire
308                    tweet = self.enrich_tweet(tweet, included_users, included_media, included_polls, included_places, copy.deepcopy(included_tweets), missing_objects)
309
310                    tweets += 1
311                    if tweets % 500 == 0:
312                        self.dataset.update_status("Received %s of ~%s tweets from the X API" % ("{:,}".format(tweets), expected_tweets))
313                        if num_expected_tweets is not None:
314                            self.dataset.update_progress(tweets / num_expected_tweets)
315
316                    yield tweet
317
318                if self.parameters.get("query_type", "query") == "id_lookup" and self.config.get("twitterv2-search.id_lookup"):
319                    # If id_lookup return errors in collecting tweets
320                    for tweet_error in api_response.get("errors", []):
321                        tweet_id = str(tweet_error.get('resource_id'))
322                        if tweet_error.get('resource_type') == "tweet" and tweet_id in tweet_ids and tweet_id not in collected_errors:
323                            tweet_error = self.fix_tweet_error(tweet_error)
324                            collected_errors.append(tweet_id)
325                            yield tweet_error
326
327                # paginate
328                if (amount <= 0 or tweets < amount) and api_response.get("meta") and "next_token" in api_response["meta"]:
329                    params["next_token"] = api_response["meta"]["next_token"]
330                else:
331                    break
332
333        if not self.import_issues:
334            self.dataset.log('Error Report:\n' + '\n'.join(error_report))
335            self.dataset.update_status("Completed with errors; Check log for Error Report.", is_final=True)

Use the Twitter v2 API historical search to get tweets

Parameters
  • query:
Returns
def enrich_tweet( self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
337    def enrich_tweet(self, tweet, users, media, polls, places, referenced_tweets, missing_objects):
338        """
339        Enrich tweet with user and attachment metadata
340
341        Twitter API returns some of the tweet's metadata separately, as
342        'includes' that can be cross-referenced with a user ID or media key.
343        This makes sense to conserve bandwidth, but also means tweets are not
344        'standalone' objects as originally returned.
345
346        However, for processing, making them standalone greatly reduces
347        complexity, as we can simply read one single tweet object and process
348        that data without worrying about having to get other data from
349        elsewhere. So this method takes the metadata and the original tweet,
350        splices the metadata into it where appropriate, and returns the
351        enriched object.
352
353        **This is not an efficient way to store things** but it is more
354        convenient.
355
356        :param dict tweet:  The tweet object
357        :param list users:  User metadata, as a list of user objects
358        :param list media:  Media metadata, as a list of media objects
359        :param list polls:  Poll metadata, as a list of poll objects
360        :param list places:  Place metadata, as a list of place objects
361        :param list referenced_tweets:  Tweets referenced in the tweet, as a
362        list of tweet objects. These will be enriched in turn.
363        :param dict missing_objects: Dictionary with data on missing objects
364                from the API by type.
365
366        :return dict:  Enriched tweet object
367        """
368        # Copy the tweet so that updating this tweet has no effect on others
369        tweet = copy.deepcopy(tweet)
370        # first create temporary mappings so we can easily find the relevant
371        # object later
372        users_by_id = {user["id"]: user for user in users}
373        users_by_name = {user["username"]: user for user in users}
374        media_by_key = {item["media_key"]: item for item in media}
375        polls_by_id = {poll["id"]: poll for poll in polls}
376        places_by_id = {place["id"]: place for place in places}
377        tweets_by_id = {ref["id"]: ref.copy() for ref in referenced_tweets}
378
379        # add tweet author metadata
380        tweet["author_user"] = users_by_id.get(tweet["author_id"])
381
382        # add place to geo metadata
383        # referenced_tweets also contain place_id, but these places may not included in the place objects
384        if 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in places_by_id:
385            tweet["geo"]["place"] = places_by_id.get(tweet.get("geo").get("place_id"))
386        elif 'place_id' in tweet.get('geo', {}) and tweet.get("geo").get("place_id") in missing_objects.get('place', {}):
387            tweet["geo"]["place"] = missing_objects.get('place', {}).get(tweet.get("geo").get("place_id"), {})
388
389        # add user metadata for mentioned users
390        for index, mention in enumerate(tweet.get("entities", {}).get("mentions", [])):
391            if mention["username"] in users_by_name:
392                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **users_by_name.get(mention["username"])}
393            # missing users can be stored by either user ID or Username in Twitter API's error data; we check both
394            elif mention["username"] in missing_objects.get('user', {}):
395                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["username"]]}}
396            elif mention["id"] in missing_objects.get('user', {}):
397                tweet["entities"]["mentions"][index] = {**tweet["entities"]["mentions"][index], **{'error': missing_objects['user'][mention["id"]]}}
398
399
400        # add poll metadata
401        for index, poll_id in enumerate(tweet.get("attachments", {}).get("poll_ids", [])):
402            if poll_id in polls_by_id:
403                tweet["attachments"]["poll_ids"][index] = polls_by_id[poll_id]
404            elif poll_id in missing_objects.get('poll', {}):
405                tweet["attachments"]["poll_ids"][index] = {'poll_id': poll_id, 'error': missing_objects['poll'][poll_id]}
406
407        # add media metadata - seems to be just the media type, the media URL
408        # etc is stored in the 'entities' attribute instead
409        for index, media_key in enumerate(tweet.get("attachments", {}).get("media_keys", [])):
410            if media_key in media_by_key:
411                tweet["attachments"]["media_keys"][index] = media_by_key[media_key]
412            elif media_key in missing_objects.get('media', {}):
413                tweet["attachments"]["media_keys"][index] = {'media_key': media_key, 'error': missing_objects['media'][media_key]}
414
415        # replied-to user metadata
416        if "in_reply_to_user_id" in tweet:
417            if tweet["in_reply_to_user_id"] in users_by_id:
418                tweet["in_reply_to_user"] = users_by_id[tweet["in_reply_to_user_id"]]
419            elif tweet["in_reply_to_user_id"] in missing_objects.get('user', {}):
420                tweet["in_reply_to_user"] = {'in_reply_to_user_id': tweet["in_reply_to_user_id"], 'error': missing_objects['user'][tweet["in_reply_to_user_id"]]}
421
422        # enrich referenced tweets. Even though there should be no recursion -
423        # since tweets cannot be edited - we do not recursively enrich
424        # referenced tweets (should we?)
425        for index, reference in enumerate(tweet.get("referenced_tweets", [])):
426            if reference["id"] in tweets_by_id:
427                tweet["referenced_tweets"][index] = {**reference, **self.enrich_tweet(tweets_by_id[reference["id"]], users, media, polls, places, [], missing_objects)}
428            elif reference["id"] in missing_objects.get('tweet', {}):
429                tweet["referenced_tweets"][index] = {**reference, **{'error': missing_objects['tweet'][reference["id"]]}}
430
431        return tweet

Enrich tweet with user and attachment metadata

Twitter API returns some of the tweet's metadata separately, as 'includes' that can be cross-referenced with a user ID or media key. This makes sense to conserve bandwidth, but also means tweets are not 'standalone' objects as originally returned.

However, for processing, making them standalone greatly reduces complexity, as we can simply read one single tweet object and process that data without worrying about having to get other data from elsewhere. So this method takes the metadata and the original tweet, splices the metadata into it where appropriate, and returns the enriched object.

This is not an efficient way to store things but it is more convenient.

Parameters
  • dict tweet: The tweet object
  • list users: User metadata, as a list of user objects
  • list media: Media metadata, as a list of media objects
  • list polls: Poll metadata, as a list of poll objects
  • list places: Place metadata, as a list of place objects
  • list referenced_tweets: Tweets referenced in the tweet, as a list of tweet objects. These will be enriched in turn.
  • dict missing_objects: Dictionary with data on missing objects from the API by type.
Returns

Enriched tweet object

def fix_tweet_error(self, tweet_error):
433    def fix_tweet_error(self, tweet_error):
434        """
435        Add fields as needed by map_tweet and other functions for errors as they
436        do not conform to normal tweet fields. Specifically for ID Lookup as
437        complete tweet could be missing.
438
439        :param dict tweet_error: Tweet error object from the Twitter API
440        :return dict:  A tweet object with the relevant fields sanitised
441        """
442        modified_tweet = tweet_error
443        modified_tweet['id'] = tweet_error.get('resource_id')
444        modified_tweet['created_at'] = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
445        modified_tweet['text'] = ''
446        modified_tweet['author_user'] = {}
447        modified_tweet['author_user']['name'] = 'UNKNOWN'
448        modified_tweet['author_user']['username'] = 'UNKNOWN'
449        modified_tweet['author_id'] = 'UNKNOWN'
450        modified_tweet['public_metrics'] = {}
451
452        # putting detail info in 'subject' field which is normally blank for tweets
453        modified_tweet['subject'] = tweet_error.get('detail')
454
455        return modified_tweet

Add fields as needed by map_tweet and other functions for errors as they do not conform to normal tweet fields. Specifically for ID Lookup as complete tweet could be missing.

Parameters
  • dict tweet_error: Tweet error object from the Twitter API
Returns

A tweet object with the relevant fields sanitised

@classmethod
def get_options(cls, parent_dataset=None, config=None):
457    @classmethod
458    def get_options(cls, parent_dataset=None, config=None):
459        """
460        Get Twitter data source options
461
462        These are somewhat dynamic, because depending on settings users may or
463        may not need to provide their own API key, and may or may not be able
464        to enter a list of tweet IDs as their query. Hence the method.
465
466        :param config:
467        :param parent_dataset:  Should always be None
468        :return dict:  Data source options
469        """
470        have_api_key = config.get("twitterv2-search.academic_api_key")
471        max_tweets = config.get("twitterv2-search.max_tweets")
472
473        if have_api_key:
474            intro_text = ("This data source uses the full-archive search endpoint of the X API (v2) to retrieve "
475                          "historic tweets that match a given query.")
476
477        else:
478            intro_text = ("This data source uses the full-archive search endpoint of the X/Twitter API, v2. To use the "
479                          "it, you must have access  to the Research track of the X API. You will need to provide a "
480                          "valid [bearer token](https://developer.x.com/en/docs/authentication/oauth-2-0). The "
481                          "bearer token **will be sent to the 4CAT server**, where it will be deleted after data "
482                          "collection has started. Note that any posts retrieved with 4CAT will count towards your "
483                          "monthly post retrieval cap.")
484
485        intro_text += ("\n\nPlease refer to the [X API documentation]("
486                          "https://developer.x.com/en/docs/twitter-api/tweets/search/integrate/build-a-query) "
487                          "documentation for more information about this API endpoint and the syntax you can use in your "
488                          "search query. Retweets are included by default; add `-is:retweet` to exclude them.")
489
490        options = {
491            "intro-1": {
492                "type": UserInput.OPTION_INFO,
493                "help": intro_text
494            },
495        }
496
497        if not have_api_key:
498            # options.update({
499            #     "api_type": {
500            #         "type": UserInput.OPTION_CHOICE,
501            #         "help": "API track",
502            #         "options": {
503            #             "all": "Research API: Full-archive search",
504            #             "recent": "Standard: Recent search (Tweets published in last 7 days)",
505            #         },
506            #         "default": "all"
507            #     }
508            # })
509            options.update({
510                "api_bearer_token": {
511                    "type": UserInput.OPTION_TEXT,
512                    "sensitive": True,
513                    "cache": True,
514                    "help": "API Bearer Token"
515                },
516            })
517
518        if config.get("twitterv2.id_lookup"):
519            options.update({
520                "query_type": {
521                    "type": UserInput.OPTION_CHOICE,
522                    "help": "Query type",
523                    "tooltip": "Note: Num of posts and date fields are ignored with 'Posts by ID' lookup",
524                    "options": {
525                        "query": "Search query",
526                        "id_lookup": "Posts by ID (list IDs seperated by commas or one per line)",
527                    },
528                    "default": "query"
529                }
530            })
531
532        options.update({
533            "query": {
534                "type": UserInput.OPTION_TEXT_LARGE,
535                "help": "Query"
536            },
537            "amount": {
538                "type": UserInput.OPTION_TEXT,
539                "help": "Posts to retrieve",
540                "tooltip": "0 = unlimited (be careful!)" if not max_tweets else ("0 = maximum (%s)" % str(max_tweets)),
541                "min": 0,
542                "max": max_tweets if max_tweets else 10_000_000,
543                "default": 10
544            },
545            "divider-2": {
546                "type": UserInput.OPTION_DIVIDER
547            },
548            "daterange-info": {
549                "type": UserInput.OPTION_INFO,
550                "help": "By default, X returns posts up til 30 days ago. If you want to go back further, you "
551                        "need to explicitly set a date range."
552            },
553            "daterange": {
554                "type": UserInput.OPTION_DATERANGE,
555                "help": "Date range"
556            },
557        })
558
559        return options

Get Twitter data source options

These are somewhat dynamic, because depending on settings users may or may not need to provide their own API key, and may or may not be able to enter a list of tweet IDs as their query. Hence the method.

Parameters
  • config:
  • parent_dataset: Should always be None
Returns

Data source options

@staticmethod
def validate_query(query, request, config):
561    @staticmethod
562    def validate_query(query, request, config):
563        """
564        Validate input for a dataset query on the Twitter data source.
565
566        Will raise a QueryParametersException if invalid parameters are
567        encountered. Parameters are additionally sanitised.
568
569        Will also raise a QueryNeedsExplicitConfirmation if the 'counts'
570        endpoint of the Twitter API indicates that it will take more than
571        30 minutes to collect the dataset. In the front-end, this will
572        trigger a warning and confirmation request.
573
574        :param dict query:  Query parameters, from client-side.
575        :param request:  Flask request
576        :param ConfigManager|None config:  Configuration reader (context-aware)
577        :return dict:  Safe query parameters
578        """
579        have_api_key = config.get("twitterv2-search.academic_api_key")
580        max_tweets = config.get("twitterv2-search.max_tweets", 10_000_000)
581
582        # this is the bare minimum, else we can't narrow down the full data set
583        if not query.get("query", None):
584            raise QueryParametersException("Please provide a query.")
585
586        if not have_api_key:
587            if not query.get("api_bearer_token", None):
588                raise QueryParametersException("Please provide a valid bearer token.")
589
590        if len(query.get("query")) > 1024 and query.get("query_type", "query") != "id_lookup":
591            raise QueryParametersException("X API queries cannot be longer than 1024 characters.")
592
593        if query.get("query_type", "query") == "id_lookup" and config.get("twitterv2-search.id_lookup"):
594            # reformat queries to be a comma-separated list with no wrapping
595            # whitespace
596            whitespace = re.compile(r"\s+")
597            items = whitespace.sub("", query.get("query").replace("\n", ","))
598            # eliminate empty queries
599            twitter_query = ','.join([item for item in items.split(",") if item])
600        else:
601            twitter_query = query.get("query")
602
603        # the dates need to make sense as a range to search within
604        # but, on Twitter, you can also specify before *or* after only
605        after, before = query.get("daterange")
606        if before and after and before < after:
607            raise QueryParametersException("Date range must start before it ends")
608
609        # if we made it this far, the query can be executed
610        params = {
611            "query": twitter_query,
612            "api_bearer_token": query.get("api_bearer_token"),
613            "api_type": query.get("api_type", "all"),
614            "query_type": query.get("query_type", "query"),
615            "min_date": after,
616            "max_date": before
617        }
618
619        # never query more tweets than allowed
620        tweets_to_collect = convert_to_int(query.get("amount"), 10)
621
622        if max_tweets and (tweets_to_collect > max_tweets or tweets_to_collect == 0):
623            tweets_to_collect = max_tweets
624        params["amount"] = tweets_to_collect
625
626        # figure out how many tweets we expect to get back - we can use this
627        # to dissuade users from running huge queries that will take forever
628        # to process
629        if params["query_type"] == "query" and (params.get("api_type") == "all" or have_api_key):
630            count_url = "https://api.x.com/2/tweets/counts/all"
631            count_params = {
632                "granularity": "day",
633                "query": params["query"],
634            }
635
636            # if we're doing a date range, pass this on to the counts endpoint in
637            # the right format
638            if after:
639                count_params["start_time"] = datetime.datetime.fromtimestamp(after).strftime("%Y-%m-%dT%H:%M:%SZ")
640
641            if before:
642                count_params["end_time"] = datetime.datetime.fromtimestamp(before).strftime("%Y-%m-%dT%H:%M:%SZ")
643
644            bearer_token = params.get("api_bearer_token") if not have_api_key else have_api_key
645
646            expected_tweets = 0
647            while True:
648                response = requests.get(count_url, params=count_params, headers={"Authorization": "Bearer %s" % bearer_token},
649                                        timeout=15)
650                if response.status_code == 200:
651                    try:
652                        # figure out how many tweets there are and estimate how much
653                        # time it will take to process them. if it's going to take
654                        # longer than half an hour, warn the user
655                        expected_tweets += int(response.json()["meta"]["total_tweet_count"])
656                    except KeyError:
657                        # no harm done, we just don't know how many tweets will be
658                        # returned (but they will still be returned)
659                        break
660
661                    if "next_token" not in response.json().get("meta", {}):
662                        break
663                    else:
664                        count_params["next_token"] = response.json()["meta"]["next_token"]
665
666                elif response.status_code == 401:
667                    raise QueryParametersException("Your bearer token seems to be invalid. Please make sure it is valid "
668                                                   "for the Research track of the X API.")
669
670                elif response.status_code == 400:
671                    raise QueryParametersException("Your query is invalid. Please make sure the date range does not "
672                                                   "extend into the future, or to before Twitter's founding, and that "
673                                                   "your query is shorter than 1024 characters. Using AND in the query "
674                                                   "is not possible (AND is implied; OR can be used). Use \"and\" to "
675                                                   "search for the literal word.")
676
677                else:
678                    # we can still continue without the expected tweets
679                    break
680
681            warning = ""
682            if expected_tweets:
683                collectible_tweets = min(max_tweets, params["amount"])
684                if collectible_tweets == 0:
685                    collectible_tweets = max_tweets
686
687                if collectible_tweets > 0:
688                    if collectible_tweets < expected_tweets:
689                        warning += ", but only %s will be collected. " % "{:,}".format(collectible_tweets)
690                    real_expected_tweets = min(expected_tweets, collectible_tweets)
691                else:
692                    real_expected_tweets = expected_tweets
693
694                expected_seconds = int(real_expected_tweets / 30)  # seems to be about this
695                expected_time = timify(expected_seconds)
696                params["expected-tweets"] = expected_tweets
697
698                if expected_seconds > 900:
699                    warning += ". Collection will take approximately %s." % expected_time
700
701            if warning and not query.get("frontend-confirm"):
702                warning = "This query matches approximately %s tweets%s" % ("{:,}".format(expected_tweets), warning)
703                warning += " Do you want to continue?"
704                raise QueryNeedsExplicitConfirmationException(warning)
705
706            params["amount"] = min(params["amount"], expected_tweets)
707            if max_tweets:
708                params["amount"] = min(max_tweets, params["amount"])
709
710        return params

Validate input for a dataset query on the Twitter data source.

Will raise a QueryParametersException if invalid parameters are encountered. Parameters are additionally sanitised.

Will also raise a QueryNeedsExplicitConfirmation if the 'counts' endpoint of the Twitter API indicates that it will take more than 30 minutes to collect the dataset. In the front-end, this will trigger a warning and confirmation request.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • ConfigManager|None config: Configuration reader (context-aware)
Returns

Safe query parameters

@staticmethod
def map_item(item):
712    @staticmethod
713    def map_item(item):
714        """
715        Map a nested Tweet object to a flat dictionary
716
717        Tweet objects are quite rich but 4CAT expects flat dictionaries per
718        item in many cases. Since it would be a shame to not store the data
719        retrieved from Twitter that cannot be stored in a flat file, we store
720        the full objects and only map them to a flat dictionary when needed.
721        This has a speed (and disk space) penalty, but makes sure we don't
722        throw away valuable data and allows for later changes that e.g. store
723        the tweets more efficiently as a MongoDB collection.
724
725        :param item:  Tweet object as originally returned by the Twitter API
726        :return dict:  Dictionary in the format expected by 4CAT
727        """
728        tweet_time = datetime.datetime.strptime(item["created_at"], "%Y-%m-%dT%H:%M:%S.000Z")
729
730        # For backward compatibility
731        author_username = item["author_user"]["username"] if item.get("author_user") else item["author_username"]
732        author_fullname = item["author_user"]["name"] if item.get("author_user") else item["author_fullname"]
733        author_followers = item["author_user"]["public_metrics"]["followers_count"] if item.get("author_user") else ""
734
735        hashtags = [tag["tag"] for tag in item.get("entities", {}).get("hashtags", [])]
736        mentions = [tag["username"] for tag in item.get("entities", {}).get("mentions", [])]
737        urls = [tag.get("expanded_url", tag.get("display_url")) for tag in item.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)]
738        images = [attachment["url"] for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
739        video_items = [attachment for attachment in item.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
740
741        # by default, the text of retweets is returned as "RT [excerpt of
742        # retweeted tweet]". Since we have the full tweet text, we can complete
743        # the excerpt:
744        is_retweet = any([ref.get("type") == "retweeted" for ref in item.get("referenced_tweets", [])])
745        if is_retweet:
746            retweeted_tweet = [t for t in item["referenced_tweets"] if t.get("type") == "retweeted"][0]
747            if retweeted_tweet.get("text", False):
748                retweeted_body = retweeted_tweet.get("text")
749                # Get user's username that was retweeted
750                if retweeted_tweet.get('author_user') and retweeted_tweet.get('author_user').get('username'):
751                    item["text"] = "RT @" + retweeted_tweet.get("author_user", {}).get("username") + ": " + retweeted_body
752                elif item.get('entities', {}).get('mentions', []):
753                    # Username may not always be here retweeted_tweet["author_user"]["username"] when user was removed/deleted
754                    retweeting_users = [mention.get('username') for mention in item.get('entities', {}).get('mentions', []) if mention.get('id') == retweeted_tweet.get('author_id')]
755                    if retweeting_users:
756                        # should only ever be one, but this verifies that there IS one and not NONE
757                        item["text"] = "RT @" + retweeting_users[0] + ": " + retweeted_body
758
759            retweeted_user = retweeted_tweet["author_user"]["username"] if retweeted_tweet.get("author_user") else retweeted_tweet.get("author_username", "") # Reference tweets were not always enriched
760
761            # Retweet entities are only included in the retweet if they occur in the first 140 characters
762            # Note: open question on quotes and replies as to whether containing hashtags or mentions of their referenced tweets makes sense
763            [hashtags.append(tag["tag"]) for tag in retweeted_tweet.get("entities", {}).get("hashtags", [])]
764            [mentions.append(tag["username"]) for tag in retweeted_tweet.get("entities", {}).get("mentions", [])]
765            [urls.append(tag.get("expanded_url", tag["display_url"])) for tag in retweeted_tweet.get("entities", {}).get("urls", []) if ("display_url" in tag or "expanded_url" in tag)]
766            # Images appear to be inheritted by retweets, but just in case
767            [images.append(attachment["url"]) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "photo"]
768            [video_items.append(attachment) for attachment in retweeted_tweet.get("attachments", {}).get("media_keys", []) if type(attachment) is dict and attachment.get("type") == "video"]
769
770        is_quoted = any([ref.get("type") == "quoted" for ref in item.get("referenced_tweets", [])])
771        is_reply = any([ref.get("type") == "replied_to" for ref in item.get("referenced_tweets", [])])
772
773        videos = []
774        for video in video_items:
775            variants = sorted(video.get('variants', []), key=lambda d: d.get('bit_rate', 0), reverse=True)
776            if variants:
777                videos.append(variants[0].get('url'))
778
779        expected_metrics = {"impression_count", "retweet_count", "bookmark_count", "like_count", "quote_count", "reply_count"}
780        public_metrics = {k: item["public_metrics"].get(k, MissingMappedField(0)) for k in expected_metrics}
781        missing_metrics = [m for m in expected_metrics if m not in item["public_metrics"]]
782        warning = ""
783        if missing_metrics:
784            warning = f"The following metrics were missing from a tweet: {', '.join(missing_metrics)}. They will have a value of '0' in any exports."
785
786        return MappedItem({
787            "id": item["id"],
788            "thread_id": item.get("conversation_id", item["id"]),
789            "timestamp": tweet_time.strftime("%Y-%m-%d %H:%M:%S"),
790            "unix_timestamp": int(tweet_time.timestamp()),
791            'link': "https://x.com/%s/status/%s" % (author_username, item.get('id')),
792            "subject": "",
793            "body": item["text"],
794            "author": author_username,
795            "author_fullname": author_fullname,
796            "author_id": item["author_id"],
797            "author_followers": author_followers,
798            "source": item.get("source"),
799            "language_guess": item.get("lang"),
800            "possibly_sensitive": "yes" if item.get("possibly_sensitive") else "no",
801            **public_metrics,
802            "is_retweet": "yes" if is_retweet else "no",
803            "retweeted_user": "" if not is_retweet else retweeted_user,
804            "is_quote_tweet": "yes" if is_quoted else "no",
805            "quoted_user": "" if not is_quoted else [ref for ref in item["referenced_tweets"] if ref["type"] == "quoted"].pop().get("author_user", {}).get("username", ""),
806            "is_reply": "yes" if is_reply else "no",
807            "replied_user": item.get("in_reply_to_user", {}).get("username", ""),
808            "hashtags": ','.join(set(hashtags)),
809            "urls": ','.join(set(urls)),
810            "images": ','.join(set(images)),
811            "videos": ','.join(set(videos)),
812            "mentions": ','.join(set(mentions)),
813            "long_lat": ', '.join([str(x) for x in item.get('geo', {}).get('coordinates', {}).get('coordinates', [])]),
814            'place_name': item.get('geo', {}).get('place', {}).get('full_name', ''),
815        }, message=warning)

Map a nested Tweet object to a flat dictionary

Tweet objects are quite rich but 4CAT expects flat dictionaries per item in many cases. Since it would be a shame to not store the data retrieved from Twitter that cannot be stored in a flat file, we store the full objects and only map them to a flat dictionary when needed. This has a speed (and disk space) penalty, but makes sure we don't throw away valuable data and allows for later changes that e.g. store the tweets more efficiently as a MongoDB collection.

Parameters
  • item: Tweet object as originally returned by the Twitter API
Returns

Dictionary in the format expected by 4CAT