Edit on GitHub

datasources.twitter-import.search_twitter

Import scraped X/Twitter data

It's prohibitively difficult to scrape data from Twitter within 4CAT itself due to its aggressive rate limiting. Instead, import data collected elsewhere.

  1"""
  2Import scraped X/Twitter data
  3
  4It's prohibitively difficult to scrape data from Twitter within 4CAT itself due
  5to its aggressive rate limiting. Instead, import data collected elsewhere.
  6"""
  7from datetime import datetime
  8
  9from backend.lib.search import Search
 10from common.lib.helpers import strip_tags
 11from common.lib.item_mapping import MappedItem
 12
 13
 14class SearchTwitterViaZeeschuimer(Search):
 15    """
 16    Import scraped X/Twitter data
 17    """
 18    type = "twitter-import"  # job ID
 19    category = "Search"  # category
 20    title = "Import scraped X/Twitter data"  # title displayed in UI
 21    description = "Import X/Twitter data collected with an external tool such as Zeeschuimer."  # description displayed in UI
 22    extension = "ndjson"  # extension of result file, used internally and in UI
 23    is_from_zeeschuimer = True
 24
 25    # not available as a processor for existing datasets
 26    accepts = []
 27    references = [
 28        "[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)",
 29        "[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)"
 30    ]
 31    
 32    def get_items(self, query):
 33        """
 34        Run custom search
 35
 36        Not available for Twitter
 37        """
 38        raise NotImplementedError("Twitter datasets can only be created by importing data from elsewhere")
 39
 40    @staticmethod
 41    def map_item(item):
 42
 43        if item.get("rest_id"):
 44            return MappedItem(SearchTwitterViaZeeschuimer.map_item_modern(item))
 45        elif item.get("type") == "adaptive":
 46            return MappedItem(SearchTwitterViaZeeschuimer.map_item_legacy(item))
 47        else:
 48            raise NotImplementedError
 49
 50    @staticmethod
 51    def map_item_modern(tweet):
 52
 53        # Sometimes a "core" key appears in user_results, sometimes not.
 54        # This has effect on where to get user data.
 55        has_core = tweet.get("core", {}).get("user_results", {}).get("result", {}).get("core", False)
 56        user_key = "core" if has_core else "legacy"
 57
 58        timestamp = datetime.strptime(tweet["legacy"]["created_at"], "%a %b %d %H:%M:%S %z %Y")
 59        withheld = False
 60
 61        retweet = tweet["legacy"].get("retweeted_status_result")
 62        if retweet:
 63            # make sure the full RT is included, by default this is shortened
 64            if "tweet" in retweet["result"]:
 65                retweet["result"] = retweet["result"]["tweet"]
 66
 67            if retweet["result"].get("legacy", {}).get("withheld_scope"):
 68                withheld = True
 69                tweet["legacy"]["full_text"] = retweet["result"]["legacy"]["full_text"]
 70            else:
 71                t_text = "RT @" + retweet["result"]["core"]["user_results"]["result"][user_key]["screen_name"] + \
 72                      ": " + retweet["result"]["legacy"]["full_text"]
 73                tweet["legacy"]["full_text"] = t_text
 74
 75        quote_tweet = tweet.get("quoted_status_result")
 76        if quote_tweet and "tweet" in quote_tweet.get("result", {}):
 77            # sometimes this is one level deeper, sometimes not...
 78            quote_tweet["result"] = quote_tweet["result"]["tweet"]
 79        # check if the quote tweet is available or not
 80        quote_withheld = True if (quote_tweet and "tombstone" in quote_tweet["result"]) else False
 81
 82        # extract media from tweet; if video, add thumbnail to images and video link to videos
 83        images = set()
 84        videos = set()
 85        
 86        # Process media from extended_entities for videos and photos
 87        for media in tweet["legacy"].get("extended_entities", {}).get("media", []):
 88            if media["type"] == "photo":
 89                images.add(media["media_url_https"])
 90            elif media["type"] == "video":
 91                # Add video thumbnail to images
 92                images.add(media["media_url_https"])
 93                # Add actual video URL to videos if available
 94                if media.get("video_info", {}).get("variants"):
 95                    # Filter variants to get video files (not streaming playlists)
 96                    video_variants = [
 97                        variant for variant in media["video_info"]["variants"]
 98                        if variant.get("content_type", "").startswith("video/")
 99                    ]
100                    if video_variants:
101                        # Sort by bitrate (highest first) to get best quality
102                        video_variants.sort(key=lambda x: x.get("bitrate", 0), reverse=True)
103                        videos.add(video_variants[0]["url"])
104        
105        # Also check entities.media for any additional photos not in extended_entities
106        for media in tweet["legacy"]["entities"].get("media", []):
107            if media["type"] == "photo":
108                images.add(media["media_url_https"])
109
110        return {
111            "id": tweet["rest_id"],
112            "thread_id": tweet["legacy"]["conversation_id_str"],
113            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
114            "unix_timestamp": int(timestamp.timestamp()),
115            "link": f"https://x.com/{tweet['core']['user_results']['result'][user_key]['screen_name']}/status/{tweet['id']}",
116            "body": tweet["legacy"]["full_text"],
117            "author": tweet["core"]["user_results"]["result"][user_key]["screen_name"],
118            "author_fullname": tweet["core"]["user_results"]["result"][user_key]["name"],
119            "author_id": tweet["legacy"]["user_id_str"],
120            "author_avatar_url": tweet["core"]["user_results"]["result"]["avatar"]["image_url"] if "avatar" in tweet["core"]["user_results"]["result"] else tweet["core"]["user_results"]["result"]["legacy"].get("profile_image_url_https", ""),
121            "author_banner_url": tweet["core"]["user_results"]["result"]["legacy"].get("profile_banner_url", ""), # key does not exist when author does not have a banner
122            "verified": tweet["core"]["user_results"]["result"].get("is_blue_verified", ""),
123            "source": strip_tags(tweet["source"]),
124            "language_guess": tweet["legacy"].get("lang"),
125            "possibly_sensitive": "yes" if tweet.get("possibly_sensitive", False) or tweet["legacy"].get("possibly_sensitive", False) else "no",
126            "retweet_count": tweet["legacy"]["retweet_count"],
127            "reply_count": tweet["legacy"]["reply_count"],
128            "like_count": tweet["legacy"]["favorite_count"],
129            "quote_count": tweet["legacy"]["quote_count"],
130            "impression_count": tweet.get("views", {}).get("count", ""),
131            "is_retweet": "yes" if retweet else "no",
132            "retweeted_user": retweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if retweet else "",
133            "is_quote_tweet": "yes" if quote_tweet else "no",
134            "quote_tweet_id": quote_tweet["result"].get("rest_id", "") if quote_tweet else "",
135            "quote_author": quote_tweet["result"]["core"]["user_results"]["result"].get(user_key, {}).get("screen_name", "") if
136                        (quote_tweet and not quote_withheld) else "",
137            "quote_body": quote_tweet["result"]["legacy"].get("full_text", "") if quote_tweet and not quote_withheld else "",
138            "quote_images": ",".join(
139                [media["media_url_https"] for media in quote_tweet["result"]["legacy"].get("entities", {}).get("media", [])
140                 if media["type"] == "photo"]) if quote_tweet and not quote_withheld else "",
141            "quote_videos": ",".join(
142                [media["media_url_https"] for media in quote_tweet["result"]["legacy"].get("entities", {}).get("media", [])
143                 if media["type"] == "video"]) if quote_tweet and not quote_withheld else "",
144            "is_quote_withheld": "yes" if quote_withheld else "no",
145            "is_reply": "yes" if str(tweet["legacy"]["conversation_id_str"]) != str(tweet["rest_id"]) else "no",
146            "replied_author": tweet["legacy"].get("in_reply_to_screen_name", ""),
147            "is_withheld": "yes" if withheld else "no",
148            "hashtags": ",".join([hashtag["text"] for hashtag in tweet["legacy"]["entities"]["hashtags"]]),
149            "urls": ",".join([url.get("expanded_url", url["display_url"]) for url in tweet["legacy"]["entities"]["urls"]]),
150            "images": ",".join(images),
151            "videos": ",".join(videos),
152            "mentions": ",".join([media["screen_name"] for media in tweet["legacy"]["entities"]["user_mentions"]]),
153            "long_lat": SearchTwitterViaZeeschuimer.get_centroid(
154                tweet["legacy"]["place"]["bounding_box"]["coordinates"]) if tweet["legacy"].get("place") else "",
155            "place_name": tweet["legacy"].get("place", {}).get("full_name", "") if tweet["legacy"].get("place") else "",
156        }
157
158    @staticmethod
159    def map_item_legacy(tweet):
160        timestamp = datetime.strptime(tweet["legacy"]["created_at"], "%a %b %d %H:%M:%S %z %Y")
161        tweet_id = tweet["legacy"]["id_str"]
162        withheld = False
163
164        retweet = tweet["legacy"].get("retweeted_status_result")
165        if retweet:
166            # make sure the full RT is included, by default this is shortened
167            if retweet["result"].get("legacy", {}).get("withheld_status"):
168                withheld = True
169                tweet["legacy"]["full_text"] = retweet["result"]["legacy"]["full_text"]
170            else:
171                t_text = "RT @" + retweet["result"]["core"]["user_results"]["result"]["legacy"]["screen_name"] + \
172                     " " + retweet["result"]["legacy"]["full_text"]
173                tweet["legacy"]["full_text"] = t_text
174
175        quote_tweet = tweet.get("quoted_status_result")
176
177        if quote_tweet and "tweet" in quote_tweet.get("result", {}):
178            # sometimes this is one level deeper, sometimes not...
179            quote_tweet["result"] = quote_tweet["result"]["tweet"]
180
181        return {
182            "id": tweet_id,
183            "thread_id": tweet["legacy"]["conversation_id_str"],
184            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
185            "unix_timestamp": int(timestamp.timestamp()),
186            "link": f"https://x.com/{tweet['user']['screen_name']}/status/{tweet_id}",
187            "body": tweet["legacy"]["full_text"],
188            "author": tweet["user"]["screen_name"],
189            "author_fullname": tweet["user"]["name"],
190            "author_id": tweet["user"]["id_str"],
191            "author_avatar_url": "", # todo: add
192            "author_banner_url": "", # todo: add
193            "verified": "", # todo: add
194            "source": strip_tags(tweet["legacy"]["source"]),
195            "language_guess": tweet["legacy"].get("lang"),
196            "possibly_sensitive": "yes" if tweet["legacy"].get("possibly_sensitive") else "no",
197            "retweet_count": tweet["legacy"]["retweet_count"],
198            "reply_count": tweet["legacy"]["reply_count"],
199            "like_count": tweet["legacy"]["favorite_count"],
200            "quote_count": tweet["legacy"]["quote_count"],
201            "impression_count": tweet.get("ext_views", {}).get("count", ""),
202            "is_retweet": "yes" if retweet else "no",
203            "retweeted_user": retweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if retweet else "",
204            "is_quote_tweet": "yes" if quote_tweet else "no",
205            "quote_tweet_id": "", # todo: add
206            "quote_author": quote_tweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if quote_tweet else "",
207            "quote_body": "", # todo: add
208            "quote_images": "", # todo: add
209            "quote_videos": "",  # todo: add
210            "is_quote_withheld": "", # todo: add
211            "is_reply": "yes" if str(tweet["legacy"]["conversation_id_str"]) != tweet_id else "no",
212            "replied_author": tweet["legacy"].get("in_reply_to_screen_name", "") if tweet["legacy"].get(
213                "in_reply_to_screen_name") else "",
214            "is_withheld": "yes" if withheld else "no",
215            "hashtags": ",".join([hashtag["text"] for hashtag in tweet["legacy"]["entities"]["hashtags"]]),
216            "urls": ",".join([url.get("expanded_url", url["display_url"]) for url in tweet["legacy"]["entities"]["urls"]]),
217            "images": ",".join(
218                [media["media_url_https"] for media in tweet["legacy"].get("extended_entities", {}).get("media", []) if
219                 media["type"] == "photo"]),
220            "videos": ",".join([media["video_info"]["variants"][0]["url"] for media in
221                                tweet["legacy"].get("extended_entities", {}).get("media", []) if
222                                media["type"] == "video"]),
223            "mentions": ",".join([media["screen_name"] for media in tweet["legacy"]["entities"]["user_mentions"]]),
224            "long_lat": SearchTwitterViaZeeschuimer.get_centroid(
225                tweet["legacy"]["place"]["bounding_box"]["coordinates"]) if tweet["legacy"].get("place") else "",
226            "place_name": tweet["legacy"].get("place", {}).get("full_name", "") if tweet["legacy"].get("place") else "",
227        }
228
229    @staticmethod
230    def get_centroid(box):
231        """
232        Get centre of a rectangular box
233
234        Convenience function for converting X/Twitter's bounding box coordinates
235        to a singular coordinate - simply the centre of the box - because that
236        is what is expected for mapped output.
237
238        :param list box:  The box as part of X/Twitter's response
239        :return str:  Coordinate, as longitude,latitude.
240        """
241        box = box[0]
242        return ",".join((
243            str(round((box[0][0] + box[1][0]) / 2, 6)),
244            str(round((box[0][1] + box[1][1]) / 2, 6)),
245        ))
class SearchTwitterViaZeeschuimer(backend.lib.search.Search):
 15class SearchTwitterViaZeeschuimer(Search):
 16    """
 17    Import scraped X/Twitter data
 18    """
 19    type = "twitter-import"  # job ID
 20    category = "Search"  # category
 21    title = "Import scraped X/Twitter data"  # title displayed in UI
 22    description = "Import X/Twitter data collected with an external tool such as Zeeschuimer."  # description displayed in UI
 23    extension = "ndjson"  # extension of result file, used internally and in UI
 24    is_from_zeeschuimer = True
 25
 26    # not available as a processor for existing datasets
 27    accepts = []
 28    references = [
 29        "[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)",
 30        "[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)"
 31    ]
 32    
 33    def get_items(self, query):
 34        """
 35        Run custom search
 36
 37        Not available for Twitter
 38        """
 39        raise NotImplementedError("Twitter datasets can only be created by importing data from elsewhere")
 40
 41    @staticmethod
 42    def map_item(item):
 43
 44        if item.get("rest_id"):
 45            return MappedItem(SearchTwitterViaZeeschuimer.map_item_modern(item))
 46        elif item.get("type") == "adaptive":
 47            return MappedItem(SearchTwitterViaZeeschuimer.map_item_legacy(item))
 48        else:
 49            raise NotImplementedError
 50
 51    @staticmethod
 52    def map_item_modern(tweet):
 53
 54        # Sometimes a "core" key appears in user_results, sometimes not.
 55        # This has effect on where to get user data.
 56        has_core = tweet.get("core", {}).get("user_results", {}).get("result", {}).get("core", False)
 57        user_key = "core" if has_core else "legacy"
 58
 59        timestamp = datetime.strptime(tweet["legacy"]["created_at"], "%a %b %d %H:%M:%S %z %Y")
 60        withheld = False
 61
 62        retweet = tweet["legacy"].get("retweeted_status_result")
 63        if retweet:
 64            # make sure the full RT is included, by default this is shortened
 65            if "tweet" in retweet["result"]:
 66                retweet["result"] = retweet["result"]["tweet"]
 67
 68            if retweet["result"].get("legacy", {}).get("withheld_scope"):
 69                withheld = True
 70                tweet["legacy"]["full_text"] = retweet["result"]["legacy"]["full_text"]
 71            else:
 72                t_text = "RT @" + retweet["result"]["core"]["user_results"]["result"][user_key]["screen_name"] + \
 73                      ": " + retweet["result"]["legacy"]["full_text"]
 74                tweet["legacy"]["full_text"] = t_text
 75
 76        quote_tweet = tweet.get("quoted_status_result")
 77        if quote_tweet and "tweet" in quote_tweet.get("result", {}):
 78            # sometimes this is one level deeper, sometimes not...
 79            quote_tweet["result"] = quote_tweet["result"]["tweet"]
 80        # check if the quote tweet is available or not
 81        quote_withheld = True if (quote_tweet and "tombstone" in quote_tweet["result"]) else False
 82
 83        # extract media from tweet; if video, add thumbnail to images and video link to videos
 84        images = set()
 85        videos = set()
 86        
 87        # Process media from extended_entities for videos and photos
 88        for media in tweet["legacy"].get("extended_entities", {}).get("media", []):
 89            if media["type"] == "photo":
 90                images.add(media["media_url_https"])
 91            elif media["type"] == "video":
 92                # Add video thumbnail to images
 93                images.add(media["media_url_https"])
 94                # Add actual video URL to videos if available
 95                if media.get("video_info", {}).get("variants"):
 96                    # Filter variants to get video files (not streaming playlists)
 97                    video_variants = [
 98                        variant for variant in media["video_info"]["variants"]
 99                        if variant.get("content_type", "").startswith("video/")
100                    ]
101                    if video_variants:
102                        # Sort by bitrate (highest first) to get best quality
103                        video_variants.sort(key=lambda x: x.get("bitrate", 0), reverse=True)
104                        videos.add(video_variants[0]["url"])
105        
106        # Also check entities.media for any additional photos not in extended_entities
107        for media in tweet["legacy"]["entities"].get("media", []):
108            if media["type"] == "photo":
109                images.add(media["media_url_https"])
110
111        return {
112            "id": tweet["rest_id"],
113            "thread_id": tweet["legacy"]["conversation_id_str"],
114            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
115            "unix_timestamp": int(timestamp.timestamp()),
116            "link": f"https://x.com/{tweet['core']['user_results']['result'][user_key]['screen_name']}/status/{tweet['id']}",
117            "body": tweet["legacy"]["full_text"],
118            "author": tweet["core"]["user_results"]["result"][user_key]["screen_name"],
119            "author_fullname": tweet["core"]["user_results"]["result"][user_key]["name"],
120            "author_id": tweet["legacy"]["user_id_str"],
121            "author_avatar_url": tweet["core"]["user_results"]["result"]["avatar"]["image_url"] if "avatar" in tweet["core"]["user_results"]["result"] else tweet["core"]["user_results"]["result"]["legacy"].get("profile_image_url_https", ""),
122            "author_banner_url": tweet["core"]["user_results"]["result"]["legacy"].get("profile_banner_url", ""), # key does not exist when author does not have a banner
123            "verified": tweet["core"]["user_results"]["result"].get("is_blue_verified", ""),
124            "source": strip_tags(tweet["source"]),
125            "language_guess": tweet["legacy"].get("lang"),
126            "possibly_sensitive": "yes" if tweet.get("possibly_sensitive", False) or tweet["legacy"].get("possibly_sensitive", False) else "no",
127            "retweet_count": tweet["legacy"]["retweet_count"],
128            "reply_count": tweet["legacy"]["reply_count"],
129            "like_count": tweet["legacy"]["favorite_count"],
130            "quote_count": tweet["legacy"]["quote_count"],
131            "impression_count": tweet.get("views", {}).get("count", ""),
132            "is_retweet": "yes" if retweet else "no",
133            "retweeted_user": retweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if retweet else "",
134            "is_quote_tweet": "yes" if quote_tweet else "no",
135            "quote_tweet_id": quote_tweet["result"].get("rest_id", "") if quote_tweet else "",
136            "quote_author": quote_tweet["result"]["core"]["user_results"]["result"].get(user_key, {}).get("screen_name", "") if
137                        (quote_tweet and not quote_withheld) else "",
138            "quote_body": quote_tweet["result"]["legacy"].get("full_text", "") if quote_tweet and not quote_withheld else "",
139            "quote_images": ",".join(
140                [media["media_url_https"] for media in quote_tweet["result"]["legacy"].get("entities", {}).get("media", [])
141                 if media["type"] == "photo"]) if quote_tweet and not quote_withheld else "",
142            "quote_videos": ",".join(
143                [media["media_url_https"] for media in quote_tweet["result"]["legacy"].get("entities", {}).get("media", [])
144                 if media["type"] == "video"]) if quote_tweet and not quote_withheld else "",
145            "is_quote_withheld": "yes" if quote_withheld else "no",
146            "is_reply": "yes" if str(tweet["legacy"]["conversation_id_str"]) != str(tweet["rest_id"]) else "no",
147            "replied_author": tweet["legacy"].get("in_reply_to_screen_name", ""),
148            "is_withheld": "yes" if withheld else "no",
149            "hashtags": ",".join([hashtag["text"] for hashtag in tweet["legacy"]["entities"]["hashtags"]]),
150            "urls": ",".join([url.get("expanded_url", url["display_url"]) for url in tweet["legacy"]["entities"]["urls"]]),
151            "images": ",".join(images),
152            "videos": ",".join(videos),
153            "mentions": ",".join([media["screen_name"] for media in tweet["legacy"]["entities"]["user_mentions"]]),
154            "long_lat": SearchTwitterViaZeeschuimer.get_centroid(
155                tweet["legacy"]["place"]["bounding_box"]["coordinates"]) if tweet["legacy"].get("place") else "",
156            "place_name": tweet["legacy"].get("place", {}).get("full_name", "") if tweet["legacy"].get("place") else "",
157        }
158
159    @staticmethod
160    def map_item_legacy(tweet):
161        timestamp = datetime.strptime(tweet["legacy"]["created_at"], "%a %b %d %H:%M:%S %z %Y")
162        tweet_id = tweet["legacy"]["id_str"]
163        withheld = False
164
165        retweet = tweet["legacy"].get("retweeted_status_result")
166        if retweet:
167            # make sure the full RT is included, by default this is shortened
168            if retweet["result"].get("legacy", {}).get("withheld_status"):
169                withheld = True
170                tweet["legacy"]["full_text"] = retweet["result"]["legacy"]["full_text"]
171            else:
172                t_text = "RT @" + retweet["result"]["core"]["user_results"]["result"]["legacy"]["screen_name"] + \
173                     " " + retweet["result"]["legacy"]["full_text"]
174                tweet["legacy"]["full_text"] = t_text
175
176        quote_tweet = tweet.get("quoted_status_result")
177
178        if quote_tweet and "tweet" in quote_tweet.get("result", {}):
179            # sometimes this is one level deeper, sometimes not...
180            quote_tweet["result"] = quote_tweet["result"]["tweet"]
181
182        return {
183            "id": tweet_id,
184            "thread_id": tweet["legacy"]["conversation_id_str"],
185            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
186            "unix_timestamp": int(timestamp.timestamp()),
187            "link": f"https://x.com/{tweet['user']['screen_name']}/status/{tweet_id}",
188            "body": tweet["legacy"]["full_text"],
189            "author": tweet["user"]["screen_name"],
190            "author_fullname": tweet["user"]["name"],
191            "author_id": tweet["user"]["id_str"],
192            "author_avatar_url": "", # todo: add
193            "author_banner_url": "", # todo: add
194            "verified": "", # todo: add
195            "source": strip_tags(tweet["legacy"]["source"]),
196            "language_guess": tweet["legacy"].get("lang"),
197            "possibly_sensitive": "yes" if tweet["legacy"].get("possibly_sensitive") else "no",
198            "retweet_count": tweet["legacy"]["retweet_count"],
199            "reply_count": tweet["legacy"]["reply_count"],
200            "like_count": tweet["legacy"]["favorite_count"],
201            "quote_count": tweet["legacy"]["quote_count"],
202            "impression_count": tweet.get("ext_views", {}).get("count", ""),
203            "is_retweet": "yes" if retweet else "no",
204            "retweeted_user": retweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if retweet else "",
205            "is_quote_tweet": "yes" if quote_tweet else "no",
206            "quote_tweet_id": "", # todo: add
207            "quote_author": quote_tweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if quote_tweet else "",
208            "quote_body": "", # todo: add
209            "quote_images": "", # todo: add
210            "quote_videos": "",  # todo: add
211            "is_quote_withheld": "", # todo: add
212            "is_reply": "yes" if str(tweet["legacy"]["conversation_id_str"]) != tweet_id else "no",
213            "replied_author": tweet["legacy"].get("in_reply_to_screen_name", "") if tweet["legacy"].get(
214                "in_reply_to_screen_name") else "",
215            "is_withheld": "yes" if withheld else "no",
216            "hashtags": ",".join([hashtag["text"] for hashtag in tweet["legacy"]["entities"]["hashtags"]]),
217            "urls": ",".join([url.get("expanded_url", url["display_url"]) for url in tweet["legacy"]["entities"]["urls"]]),
218            "images": ",".join(
219                [media["media_url_https"] for media in tweet["legacy"].get("extended_entities", {}).get("media", []) if
220                 media["type"] == "photo"]),
221            "videos": ",".join([media["video_info"]["variants"][0]["url"] for media in
222                                tweet["legacy"].get("extended_entities", {}).get("media", []) if
223                                media["type"] == "video"]),
224            "mentions": ",".join([media["screen_name"] for media in tweet["legacy"]["entities"]["user_mentions"]]),
225            "long_lat": SearchTwitterViaZeeschuimer.get_centroid(
226                tweet["legacy"]["place"]["bounding_box"]["coordinates"]) if tweet["legacy"].get("place") else "",
227            "place_name": tweet["legacy"].get("place", {}).get("full_name", "") if tweet["legacy"].get("place") else "",
228        }
229
230    @staticmethod
231    def get_centroid(box):
232        """
233        Get centre of a rectangular box
234
235        Convenience function for converting X/Twitter's bounding box coordinates
236        to a singular coordinate - simply the centre of the box - because that
237        is what is expected for mapped output.
238
239        :param list box:  The box as part of X/Twitter's response
240        :return str:  Coordinate, as longitude,latitude.
241        """
242        box = box[0]
243        return ",".join((
244            str(round((box[0][0] + box[1][0]) / 2, 6)),
245            str(round((box[0][1] + box[1][1]) / 2, 6)),
246        ))

Import scraped X/Twitter data

type = 'twitter-import'
category = 'Search'
title = 'Import scraped X/Twitter data'
description = 'Import X/Twitter data collected with an external tool such as Zeeschuimer.'
extension = 'ndjson'
is_from_zeeschuimer = True
accepts = []
references = ['[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)', '[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)']
def get_items(self, query):
33    def get_items(self, query):
34        """
35        Run custom search
36
37        Not available for Twitter
38        """
39        raise NotImplementedError("Twitter datasets can only be created by importing data from elsewhere")

Run custom search

Not available for Twitter

@staticmethod
def map_item(item):
41    @staticmethod
42    def map_item(item):
43
44        if item.get("rest_id"):
45            return MappedItem(SearchTwitterViaZeeschuimer.map_item_modern(item))
46        elif item.get("type") == "adaptive":
47            return MappedItem(SearchTwitterViaZeeschuimer.map_item_legacy(item))
48        else:
49            raise NotImplementedError
@staticmethod
def map_item_modern(tweet):
 51    @staticmethod
 52    def map_item_modern(tweet):
 53
 54        # Sometimes a "core" key appears in user_results, sometimes not.
 55        # This has effect on where to get user data.
 56        has_core = tweet.get("core", {}).get("user_results", {}).get("result", {}).get("core", False)
 57        user_key = "core" if has_core else "legacy"
 58
 59        timestamp = datetime.strptime(tweet["legacy"]["created_at"], "%a %b %d %H:%M:%S %z %Y")
 60        withheld = False
 61
 62        retweet = tweet["legacy"].get("retweeted_status_result")
 63        if retweet:
 64            # make sure the full RT is included, by default this is shortened
 65            if "tweet" in retweet["result"]:
 66                retweet["result"] = retweet["result"]["tweet"]
 67
 68            if retweet["result"].get("legacy", {}).get("withheld_scope"):
 69                withheld = True
 70                tweet["legacy"]["full_text"] = retweet["result"]["legacy"]["full_text"]
 71            else:
 72                t_text = "RT @" + retweet["result"]["core"]["user_results"]["result"][user_key]["screen_name"] + \
 73                      ": " + retweet["result"]["legacy"]["full_text"]
 74                tweet["legacy"]["full_text"] = t_text
 75
 76        quote_tweet = tweet.get("quoted_status_result")
 77        if quote_tweet and "tweet" in quote_tweet.get("result", {}):
 78            # sometimes this is one level deeper, sometimes not...
 79            quote_tweet["result"] = quote_tweet["result"]["tweet"]
 80        # check if the quote tweet is available or not
 81        quote_withheld = True if (quote_tweet and "tombstone" in quote_tweet["result"]) else False
 82
 83        # extract media from tweet; if video, add thumbnail to images and video link to videos
 84        images = set()
 85        videos = set()
 86        
 87        # Process media from extended_entities for videos and photos
 88        for media in tweet["legacy"].get("extended_entities", {}).get("media", []):
 89            if media["type"] == "photo":
 90                images.add(media["media_url_https"])
 91            elif media["type"] == "video":
 92                # Add video thumbnail to images
 93                images.add(media["media_url_https"])
 94                # Add actual video URL to videos if available
 95                if media.get("video_info", {}).get("variants"):
 96                    # Filter variants to get video files (not streaming playlists)
 97                    video_variants = [
 98                        variant for variant in media["video_info"]["variants"]
 99                        if variant.get("content_type", "").startswith("video/")
100                    ]
101                    if video_variants:
102                        # Sort by bitrate (highest first) to get best quality
103                        video_variants.sort(key=lambda x: x.get("bitrate", 0), reverse=True)
104                        videos.add(video_variants[0]["url"])
105        
106        # Also check entities.media for any additional photos not in extended_entities
107        for media in tweet["legacy"]["entities"].get("media", []):
108            if media["type"] == "photo":
109                images.add(media["media_url_https"])
110
111        return {
112            "id": tweet["rest_id"],
113            "thread_id": tweet["legacy"]["conversation_id_str"],
114            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
115            "unix_timestamp": int(timestamp.timestamp()),
116            "link": f"https://x.com/{tweet['core']['user_results']['result'][user_key]['screen_name']}/status/{tweet['id']}",
117            "body": tweet["legacy"]["full_text"],
118            "author": tweet["core"]["user_results"]["result"][user_key]["screen_name"],
119            "author_fullname": tweet["core"]["user_results"]["result"][user_key]["name"],
120            "author_id": tweet["legacy"]["user_id_str"],
121            "author_avatar_url": tweet["core"]["user_results"]["result"]["avatar"]["image_url"] if "avatar" in tweet["core"]["user_results"]["result"] else tweet["core"]["user_results"]["result"]["legacy"].get("profile_image_url_https", ""),
122            "author_banner_url": tweet["core"]["user_results"]["result"]["legacy"].get("profile_banner_url", ""), # key does not exist when author does not have a banner
123            "verified": tweet["core"]["user_results"]["result"].get("is_blue_verified", ""),
124            "source": strip_tags(tweet["source"]),
125            "language_guess": tweet["legacy"].get("lang"),
126            "possibly_sensitive": "yes" if tweet.get("possibly_sensitive", False) or tweet["legacy"].get("possibly_sensitive", False) else "no",
127            "retweet_count": tweet["legacy"]["retweet_count"],
128            "reply_count": tweet["legacy"]["reply_count"],
129            "like_count": tweet["legacy"]["favorite_count"],
130            "quote_count": tweet["legacy"]["quote_count"],
131            "impression_count": tweet.get("views", {}).get("count", ""),
132            "is_retweet": "yes" if retweet else "no",
133            "retweeted_user": retweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if retweet else "",
134            "is_quote_tweet": "yes" if quote_tweet else "no",
135            "quote_tweet_id": quote_tweet["result"].get("rest_id", "") if quote_tweet else "",
136            "quote_author": quote_tweet["result"]["core"]["user_results"]["result"].get(user_key, {}).get("screen_name", "") if
137                        (quote_tweet and not quote_withheld) else "",
138            "quote_body": quote_tweet["result"]["legacy"].get("full_text", "") if quote_tweet and not quote_withheld else "",
139            "quote_images": ",".join(
140                [media["media_url_https"] for media in quote_tweet["result"]["legacy"].get("entities", {}).get("media", [])
141                 if media["type"] == "photo"]) if quote_tweet and not quote_withheld else "",
142            "quote_videos": ",".join(
143                [media["media_url_https"] for media in quote_tweet["result"]["legacy"].get("entities", {}).get("media", [])
144                 if media["type"] == "video"]) if quote_tweet and not quote_withheld else "",
145            "is_quote_withheld": "yes" if quote_withheld else "no",
146            "is_reply": "yes" if str(tweet["legacy"]["conversation_id_str"]) != str(tweet["rest_id"]) else "no",
147            "replied_author": tweet["legacy"].get("in_reply_to_screen_name", ""),
148            "is_withheld": "yes" if withheld else "no",
149            "hashtags": ",".join([hashtag["text"] for hashtag in tweet["legacy"]["entities"]["hashtags"]]),
150            "urls": ",".join([url.get("expanded_url", url["display_url"]) for url in tweet["legacy"]["entities"]["urls"]]),
151            "images": ",".join(images),
152            "videos": ",".join(videos),
153            "mentions": ",".join([media["screen_name"] for media in tweet["legacy"]["entities"]["user_mentions"]]),
154            "long_lat": SearchTwitterViaZeeschuimer.get_centroid(
155                tweet["legacy"]["place"]["bounding_box"]["coordinates"]) if tweet["legacy"].get("place") else "",
156            "place_name": tweet["legacy"].get("place", {}).get("full_name", "") if tweet["legacy"].get("place") else "",
157        }
@staticmethod
def map_item_legacy(tweet):
159    @staticmethod
160    def map_item_legacy(tweet):
161        timestamp = datetime.strptime(tweet["legacy"]["created_at"], "%a %b %d %H:%M:%S %z %Y")
162        tweet_id = tweet["legacy"]["id_str"]
163        withheld = False
164
165        retweet = tweet["legacy"].get("retweeted_status_result")
166        if retweet:
167            # make sure the full RT is included, by default this is shortened
168            if retweet["result"].get("legacy", {}).get("withheld_status"):
169                withheld = True
170                tweet["legacy"]["full_text"] = retweet["result"]["legacy"]["full_text"]
171            else:
172                t_text = "RT @" + retweet["result"]["core"]["user_results"]["result"]["legacy"]["screen_name"] + \
173                     " " + retweet["result"]["legacy"]["full_text"]
174                tweet["legacy"]["full_text"] = t_text
175
176        quote_tweet = tweet.get("quoted_status_result")
177
178        if quote_tweet and "tweet" in quote_tweet.get("result", {}):
179            # sometimes this is one level deeper, sometimes not...
180            quote_tweet["result"] = quote_tweet["result"]["tweet"]
181
182        return {
183            "id": tweet_id,
184            "thread_id": tweet["legacy"]["conversation_id_str"],
185            "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
186            "unix_timestamp": int(timestamp.timestamp()),
187            "link": f"https://x.com/{tweet['user']['screen_name']}/status/{tweet_id}",
188            "body": tweet["legacy"]["full_text"],
189            "author": tweet["user"]["screen_name"],
190            "author_fullname": tweet["user"]["name"],
191            "author_id": tweet["user"]["id_str"],
192            "author_avatar_url": "", # todo: add
193            "author_banner_url": "", # todo: add
194            "verified": "", # todo: add
195            "source": strip_tags(tweet["legacy"]["source"]),
196            "language_guess": tweet["legacy"].get("lang"),
197            "possibly_sensitive": "yes" if tweet["legacy"].get("possibly_sensitive") else "no",
198            "retweet_count": tweet["legacy"]["retweet_count"],
199            "reply_count": tweet["legacy"]["reply_count"],
200            "like_count": tweet["legacy"]["favorite_count"],
201            "quote_count": tweet["legacy"]["quote_count"],
202            "impression_count": tweet.get("ext_views", {}).get("count", ""),
203            "is_retweet": "yes" if retweet else "no",
204            "retweeted_user": retweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if retweet else "",
205            "is_quote_tweet": "yes" if quote_tweet else "no",
206            "quote_tweet_id": "", # todo: add
207            "quote_author": quote_tweet["result"]["core"]["user_results"]["result"].get("legacy", {}).get("screen_name", "") if quote_tweet else "",
208            "quote_body": "", # todo: add
209            "quote_images": "", # todo: add
210            "quote_videos": "",  # todo: add
211            "is_quote_withheld": "", # todo: add
212            "is_reply": "yes" if str(tweet["legacy"]["conversation_id_str"]) != tweet_id else "no",
213            "replied_author": tweet["legacy"].get("in_reply_to_screen_name", "") if tweet["legacy"].get(
214                "in_reply_to_screen_name") else "",
215            "is_withheld": "yes" if withheld else "no",
216            "hashtags": ",".join([hashtag["text"] for hashtag in tweet["legacy"]["entities"]["hashtags"]]),
217            "urls": ",".join([url.get("expanded_url", url["display_url"]) for url in tweet["legacy"]["entities"]["urls"]]),
218            "images": ",".join(
219                [media["media_url_https"] for media in tweet["legacy"].get("extended_entities", {}).get("media", []) if
220                 media["type"] == "photo"]),
221            "videos": ",".join([media["video_info"]["variants"][0]["url"] for media in
222                                tweet["legacy"].get("extended_entities", {}).get("media", []) if
223                                media["type"] == "video"]),
224            "mentions": ",".join([media["screen_name"] for media in tweet["legacy"]["entities"]["user_mentions"]]),
225            "long_lat": SearchTwitterViaZeeschuimer.get_centroid(
226                tweet["legacy"]["place"]["bounding_box"]["coordinates"]) if tweet["legacy"].get("place") else "",
227            "place_name": tweet["legacy"].get("place", {}).get("full_name", "") if tweet["legacy"].get("place") else "",
228        }
@staticmethod
def get_centroid(box):
230    @staticmethod
231    def get_centroid(box):
232        """
233        Get centre of a rectangular box
234
235        Convenience function for converting X/Twitter's bounding box coordinates
236        to a singular coordinate - simply the centre of the box - because that
237        is what is expected for mapped output.
238
239        :param list box:  The box as part of X/Twitter's response
240        :return str:  Coordinate, as longitude,latitude.
241        """
242        box = box[0]
243        return ",".join((
244            str(round((box[0][0] + box[1][0]) / 2, 6)),
245            str(round((box[0][1] + box[1][1]) / 2, 6)),
246        ))

Get centre of a rectangular box

Convenience function for converting X/Twitter's bounding box coordinates to a singular coordinate - simply the centre of the box - because that is what is expected for mapped output.

Parameters
  • list box: The box as part of X/Twitter's response
Returns

Coordinate, as longitude,latitude.