Edit on GitHub

datasources.tiktok.search_tiktok

Import scraped TikTok data

It's prohibitively difficult to scrape data from TikTok within 4CAT itself due to its aggressive rate limiting. Instead, import data collected elsewhere.

  1"""
  2Import scraped TikTok data
  3
  4It's prohibitively difficult to scrape data from TikTok within 4CAT itself due
  5to its aggressive rate limiting. Instead, import data collected elsewhere.
  6"""
  7from datetime import datetime, timezone
  8from urllib.parse import urlparse, parse_qs
  9
 10from backend.lib.search import Search
 11from common.lib.item_mapping import MappedItem
 12
 13
 14class SearchTikTok(Search):
 15    """
 16    Import scraped TikTok data
 17    """
 18    type = "tiktok-search"  # job ID
 19    category = "Search"  # category
 20    title = "Import scraped Tiktok data"  # title displayed in UI
 21    description = "Import Tiktok data collected with an external tool such as Zeeschuimer."  # description displayed in UI
 22    extension = "ndjson"  # extension of result file, used internally and in UI
 23    is_from_zeeschuimer = True
 24
 25    # not available as a processor for existing datasets
 26    accepts = [None]
 27    references = [
 28        "[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)",
 29        "[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)"
 30    ]
 31
 32    def get_items(self, query):
 33        """
 34        Run custom search
 35
 36        Not available for TikTok
 37        """
 38        raise NotImplementedError("TikTok datasets can only be created by importing data from elsewhere")
 39
 40    @staticmethod
 41    def map_item(post):
 42        challenges = [challenge["title"] for challenge in post.get("challenges", [])]
 43
 44        hashtags = [extra["hashtagName"] for extra in post.get("textExtra", []) if
 45                    "hashtagName" in extra and extra["hashtagName"]]
 46
 47        labels = ",".join(post["diversificationLabels"]) if type(post.get("diversificationLabels")) is list else ""
 48
 49        if type(post.get("author")) is dict:
 50            # from intercepted API response
 51            user_nickname = post["author"]["uniqueId"]
 52            user_fullname = post["author"]["nickname"]
 53            user_thumbnail = post["author"].get("avatarThumb", "")
 54        elif post.get("author"):
 55            # from embedded JSON object
 56            user_nickname = post["author"]
 57            user_fullname = post["nickname"]
 58            user_thumbnail = ""
 59        else:
 60            user_nickname = ""
 61            user_fullname = ""
 62            user_thumbnail = ""
 63
 64        # there are various thumbnail URLs, some of them expire later than
 65        # others. Try to get the highest-resolution one that hasn't expired
 66        # yet
 67        thumbnail_options = []
 68
 69        if post["video"].get("shareCover"):
 70            thumbnail_options.append(post["video"]["shareCover"].pop())
 71
 72        if post["video"].get("cover"):
 73            thumbnail_options.append(post["video"]["cover"])
 74
 75        now = int(datetime.now(tz=timezone.utc).timestamp())
 76        thumbnail_url = [url for url in thumbnail_options if int(parse_qs(urlparse(url).query).get("x-expires", [now])[0]) >= now]
 77        thumbnail_url = thumbnail_url.pop() if thumbnail_url else ""
 78
 79        return MappedItem({
 80            "id": post["id"],
 81            "thread_id": post["id"],
 82            "author": user_nickname,
 83            "author_full": user_fullname,
 84            "author_followers": post.get("authorStats", {}).get("followerCount", ""),
 85            "author_likes": post.get("authorStats", {}).get("diggCount", ""),
 86            "author_videos": post.get("authorStats", {}).get("videoCount", ""),
 87            "author_avatar": user_thumbnail,
 88            "body": post["desc"],
 89            "timestamp": datetime.utcfromtimestamp(int(post["createTime"])).strftime('%Y-%m-%d %H:%M:%S'),
 90            "unix_timestamp": int(post["createTime"]),
 91            "is_duet": "yes" if (post.get("duetInfo", {}).get("duetFromId") != "0" if post.get("duetInfo", {}) else False) else "no",
 92            "is_ad": "yes" if post.get("isAd", False) else "no",
 93            "is_paid_partnership": "yes" if post.get("adAuthorization") else "no",
 94            "is_sensitive": "yes" if post.get("maskType") == 3 else "no",
 95            "is_photosensitive": "yes" if post.get("maskType") == 4 else "no",
 96            "music_name": post["music"]["title"],
 97            "music_id": post["music"]["id"],
 98            "music_url": post["music"].get("playUrl", ""),
 99            "music_thumbnail": post["music"].get("coverLarge", ""),
100            "music_author": post["music"].get("authorName", ""),
101            "video_url": post["video"].get("downloadAddr", ""),
102            "tiktok_url": "https://www.tiktok.com/@%s/video/%s" % (user_nickname, post["id"]),
103            "thumbnail_url": thumbnail_url,
104            "likes": post["stats"]["diggCount"],
105            "comments": post["stats"]["commentCount"],
106            "shares": post["stats"]["shareCount"],
107            "plays": post["stats"]["playCount"],
108            "hashtags": ",".join(hashtags),
109            "challenges": ",".join(challenges),
110            "diversification_labels": labels,
111            "location_created": post.get("locationCreated", ""),
112            "stickers": "\n".join(" ".join(s["stickerText"]) for s in post.get("stickersOnItem", [])),
113            "effects": ",".join([e["name"] for e in post.get("effectStickers", [])]),
114            "warning": ",".join([w["text"] for w in post.get("warnInfo", [])])
115        })
class SearchTikTok(backend.lib.search.Search):
 15class SearchTikTok(Search):
 16    """
 17    Import scraped TikTok data
 18    """
 19    type = "tiktok-search"  # job ID
 20    category = "Search"  # category
 21    title = "Import scraped Tiktok data"  # title displayed in UI
 22    description = "Import Tiktok data collected with an external tool such as Zeeschuimer."  # description displayed in UI
 23    extension = "ndjson"  # extension of result file, used internally and in UI
 24    is_from_zeeschuimer = True
 25
 26    # not available as a processor for existing datasets
 27    accepts = [None]
 28    references = [
 29        "[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)",
 30        "[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)"
 31    ]
 32
 33    def get_items(self, query):
 34        """
 35        Run custom search
 36
 37        Not available for TikTok
 38        """
 39        raise NotImplementedError("TikTok datasets can only be created by importing data from elsewhere")
 40
 41    @staticmethod
 42    def map_item(post):
 43        challenges = [challenge["title"] for challenge in post.get("challenges", [])]
 44
 45        hashtags = [extra["hashtagName"] for extra in post.get("textExtra", []) if
 46                    "hashtagName" in extra and extra["hashtagName"]]
 47
 48        labels = ",".join(post["diversificationLabels"]) if type(post.get("diversificationLabels")) is list else ""
 49
 50        if type(post.get("author")) is dict:
 51            # from intercepted API response
 52            user_nickname = post["author"]["uniqueId"]
 53            user_fullname = post["author"]["nickname"]
 54            user_thumbnail = post["author"].get("avatarThumb", "")
 55        elif post.get("author"):
 56            # from embedded JSON object
 57            user_nickname = post["author"]
 58            user_fullname = post["nickname"]
 59            user_thumbnail = ""
 60        else:
 61            user_nickname = ""
 62            user_fullname = ""
 63            user_thumbnail = ""
 64
 65        # there are various thumbnail URLs, some of them expire later than
 66        # others. Try to get the highest-resolution one that hasn't expired
 67        # yet
 68        thumbnail_options = []
 69
 70        if post["video"].get("shareCover"):
 71            thumbnail_options.append(post["video"]["shareCover"].pop())
 72
 73        if post["video"].get("cover"):
 74            thumbnail_options.append(post["video"]["cover"])
 75
 76        now = int(datetime.now(tz=timezone.utc).timestamp())
 77        thumbnail_url = [url for url in thumbnail_options if int(parse_qs(urlparse(url).query).get("x-expires", [now])[0]) >= now]
 78        thumbnail_url = thumbnail_url.pop() if thumbnail_url else ""
 79
 80        return MappedItem({
 81            "id": post["id"],
 82            "thread_id": post["id"],
 83            "author": user_nickname,
 84            "author_full": user_fullname,
 85            "author_followers": post.get("authorStats", {}).get("followerCount", ""),
 86            "author_likes": post.get("authorStats", {}).get("diggCount", ""),
 87            "author_videos": post.get("authorStats", {}).get("videoCount", ""),
 88            "author_avatar": user_thumbnail,
 89            "body": post["desc"],
 90            "timestamp": datetime.utcfromtimestamp(int(post["createTime"])).strftime('%Y-%m-%d %H:%M:%S'),
 91            "unix_timestamp": int(post["createTime"]),
 92            "is_duet": "yes" if (post.get("duetInfo", {}).get("duetFromId") != "0" if post.get("duetInfo", {}) else False) else "no",
 93            "is_ad": "yes" if post.get("isAd", False) else "no",
 94            "is_paid_partnership": "yes" if post.get("adAuthorization") else "no",
 95            "is_sensitive": "yes" if post.get("maskType") == 3 else "no",
 96            "is_photosensitive": "yes" if post.get("maskType") == 4 else "no",
 97            "music_name": post["music"]["title"],
 98            "music_id": post["music"]["id"],
 99            "music_url": post["music"].get("playUrl", ""),
100            "music_thumbnail": post["music"].get("coverLarge", ""),
101            "music_author": post["music"].get("authorName", ""),
102            "video_url": post["video"].get("downloadAddr", ""),
103            "tiktok_url": "https://www.tiktok.com/@%s/video/%s" % (user_nickname, post["id"]),
104            "thumbnail_url": thumbnail_url,
105            "likes": post["stats"]["diggCount"],
106            "comments": post["stats"]["commentCount"],
107            "shares": post["stats"]["shareCount"],
108            "plays": post["stats"]["playCount"],
109            "hashtags": ",".join(hashtags),
110            "challenges": ",".join(challenges),
111            "diversification_labels": labels,
112            "location_created": post.get("locationCreated", ""),
113            "stickers": "\n".join(" ".join(s["stickerText"]) for s in post.get("stickersOnItem", [])),
114            "effects": ",".join([e["name"] for e in post.get("effectStickers", [])]),
115            "warning": ",".join([w["text"] for w in post.get("warnInfo", [])])
116        })

Import scraped TikTok data

type = 'tiktok-search'
category = 'Search'
title = 'Import scraped Tiktok data'
description = 'Import Tiktok data collected with an external tool such as Zeeschuimer.'
extension = 'ndjson'
is_from_zeeschuimer = True
accepts = [None]
references = ['[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)', '[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)']
def get_items(self, query):
33    def get_items(self, query):
34        """
35        Run custom search
36
37        Not available for TikTok
38        """
39        raise NotImplementedError("TikTok datasets can only be created by importing data from elsewhere")

Run custom search

Not available for TikTok

@staticmethod
def map_item(post):
 41    @staticmethod
 42    def map_item(post):
 43        challenges = [challenge["title"] for challenge in post.get("challenges", [])]
 44
 45        hashtags = [extra["hashtagName"] for extra in post.get("textExtra", []) if
 46                    "hashtagName" in extra and extra["hashtagName"]]
 47
 48        labels = ",".join(post["diversificationLabels"]) if type(post.get("diversificationLabels")) is list else ""
 49
 50        if type(post.get("author")) is dict:
 51            # from intercepted API response
 52            user_nickname = post["author"]["uniqueId"]
 53            user_fullname = post["author"]["nickname"]
 54            user_thumbnail = post["author"].get("avatarThumb", "")
 55        elif post.get("author"):
 56            # from embedded JSON object
 57            user_nickname = post["author"]
 58            user_fullname = post["nickname"]
 59            user_thumbnail = ""
 60        else:
 61            user_nickname = ""
 62            user_fullname = ""
 63            user_thumbnail = ""
 64
 65        # there are various thumbnail URLs, some of them expire later than
 66        # others. Try to get the highest-resolution one that hasn't expired
 67        # yet
 68        thumbnail_options = []
 69
 70        if post["video"].get("shareCover"):
 71            thumbnail_options.append(post["video"]["shareCover"].pop())
 72
 73        if post["video"].get("cover"):
 74            thumbnail_options.append(post["video"]["cover"])
 75
 76        now = int(datetime.now(tz=timezone.utc).timestamp())
 77        thumbnail_url = [url for url in thumbnail_options if int(parse_qs(urlparse(url).query).get("x-expires", [now])[0]) >= now]
 78        thumbnail_url = thumbnail_url.pop() if thumbnail_url else ""
 79
 80        return MappedItem({
 81            "id": post["id"],
 82            "thread_id": post["id"],
 83            "author": user_nickname,
 84            "author_full": user_fullname,
 85            "author_followers": post.get("authorStats", {}).get("followerCount", ""),
 86            "author_likes": post.get("authorStats", {}).get("diggCount", ""),
 87            "author_videos": post.get("authorStats", {}).get("videoCount", ""),
 88            "author_avatar": user_thumbnail,
 89            "body": post["desc"],
 90            "timestamp": datetime.utcfromtimestamp(int(post["createTime"])).strftime('%Y-%m-%d %H:%M:%S'),
 91            "unix_timestamp": int(post["createTime"]),
 92            "is_duet": "yes" if (post.get("duetInfo", {}).get("duetFromId") != "0" if post.get("duetInfo", {}) else False) else "no",
 93            "is_ad": "yes" if post.get("isAd", False) else "no",
 94            "is_paid_partnership": "yes" if post.get("adAuthorization") else "no",
 95            "is_sensitive": "yes" if post.get("maskType") == 3 else "no",
 96            "is_photosensitive": "yes" if post.get("maskType") == 4 else "no",
 97            "music_name": post["music"]["title"],
 98            "music_id": post["music"]["id"],
 99            "music_url": post["music"].get("playUrl", ""),
100            "music_thumbnail": post["music"].get("coverLarge", ""),
101            "music_author": post["music"].get("authorName", ""),
102            "video_url": post["video"].get("downloadAddr", ""),
103            "tiktok_url": "https://www.tiktok.com/@%s/video/%s" % (user_nickname, post["id"]),
104            "thumbnail_url": thumbnail_url,
105            "likes": post["stats"]["diggCount"],
106            "comments": post["stats"]["commentCount"],
107            "shares": post["stats"]["shareCount"],
108            "plays": post["stats"]["playCount"],
109            "hashtags": ",".join(hashtags),
110            "challenges": ",".join(challenges),
111            "diversification_labels": labels,
112            "location_created": post.get("locationCreated", ""),
113            "stickers": "\n".join(" ".join(s["stickerText"]) for s in post.get("stickersOnItem", [])),
114            "effects": ",".join([e["name"] for e in post.get("effectStickers", [])]),
115            "warning": ",".join([w["text"] for w in post.get("warnInfo", [])])
116        })