Edit on GitHub

datasources.tiktok.search_tiktok

Import scraped TikTok data

It's prohibitively difficult to scrape data from TikTok within 4CAT itself due to its aggressive rate limiting. Instead, import data collected elsewhere.

  1"""
  2Import scraped TikTok data
  3
  4It's prohibitively difficult to scrape data from TikTok within 4CAT itself due
  5to its aggressive rate limiting. Instead, import data collected elsewhere.
  6"""
  7from datetime import datetime, timezone
  8from urllib.parse import urlparse, parse_qs
  9
 10from backend.lib.search import Search
 11from common.lib.item_mapping import MappedItem
 12from common.lib.helpers import normalize_url_encoding
 13
 14
 15class SearchTikTok(Search):
 16    """
 17    Import scraped TikTok data
 18    """
 19    type = "tiktok-search"  # job ID
 20    category = "Search"  # category
 21    title = "Import scraped Tiktok data"  # title displayed in UI
 22    description = "Import Tiktok data collected with an external tool such as Zeeschuimer."  # description displayed in UI
 23    extension = "ndjson"  # extension of result file, used internally and in UI
 24    is_from_zeeschuimer = True
 25
 26    # not available as a processor for existing datasets
 27    accepts = [None]
 28    references = [
 29        "[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)",
 30        "[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)"
 31    ]
 32
 33    def get_items(self, query):
 34        """
 35        Run custom search
 36
 37        Not available for TikTok
 38        """
 39        raise NotImplementedError("TikTok datasets can only be created by importing data from elsewhere")
 40
 41    @staticmethod
 42    def map_item(post):
 43        # Zeeschuimer metadata
 44        metadata = post.get("__import_meta", {})
 45
 46        challenges = [challenge["title"] for challenge in post.get("challenges", [])]
 47
 48        hashtags = [extra["hashtagName"] for extra in post.get("textExtra", []) if
 49                    "hashtagName" in extra and extra["hashtagName"]]
 50
 51        labels = ",".join(post["diversificationLabels"]) if type(post.get("diversificationLabels")) is list else ""
 52
 53        if type(post.get("author")) is dict:
 54            # from intercepted API response
 55            user_nickname = post["author"]["uniqueId"]
 56            user_fullname = post["author"]["nickname"]
 57            user_thumbnail = post["author"].get("avatarThumb", "")
 58        elif post.get("author"):
 59            # from embedded JSON object
 60            user_nickname = post["author"]
 61            user_fullname = post["nickname"]
 62            user_thumbnail = ""
 63        else:
 64            user_nickname = ""
 65            user_fullname = ""
 66            user_thumbnail = ""
 67
 68        # there are various thumbnail URLs, some of them expire later than
 69        # others. Try to get the highest-resolution one that hasn't expired
 70        # yet
 71        thumbnail_options = []
 72
 73        if post["video"].get("shareCover"):
 74            thumbnail_options.append(post["video"]["shareCover"].pop())
 75
 76        if post["video"].get("cover"):
 77            thumbnail_options.append(post["video"]["cover"])
 78
 79        now = int(datetime.now(tz=timezone.utc).timestamp())
 80        thumbnail_url = [url for url in thumbnail_options if int(parse_qs(urlparse(url).query).get("x-expires", [now])[0]) >= now]
 81        thumbnail_url = thumbnail_url.pop() if thumbnail_url else ""
 82
 83        return MappedItem({
 84            "collected_from_url": normalize_url_encoding(metadata.get("source_platform_url")) if metadata.get("source_platform_url") else "",
 85            "id": post["id"],
 86            "thread_id": post["id"],
 87            "author": user_nickname,
 88            "author_full": user_fullname,
 89            "author_followers": post.get("authorStats", {}).get("followerCount", ""),
 90            "author_likes": post.get("authorStats", {}).get("diggCount", ""),
 91            "author_videos": post.get("authorStats", {}).get("videoCount", ""),
 92            "author_avatar": user_thumbnail,
 93            "body": post["desc"],
 94            "stickers": "\n".join(" ".join(s["stickerText"]) for s in post.get("stickersOnItem", [])),
 95            "timestamp": datetime.utcfromtimestamp(int(post["createTime"])).strftime('%Y-%m-%d %H:%M:%S'),
 96            "unix_timestamp": int(post["createTime"]),
 97            "is_duet": "yes" if (post.get("duetInfo", {}).get("duetFromId") != "0" if post.get("duetInfo", {}) else False) else "no",
 98            "is_ad": "yes" if post.get("isAd", False) else "no",
 99            "is_paid_partnership": "yes" if post.get("adAuthorization") else "no",
100            "is_sensitive": "yes" if post.get("maskType") == 3 else "no",
101            "is_photosensitive": "yes" if post.get("maskType") == 4 else "no",
102            "music_name": post["music"]["title"],
103            "music_id": post["music"]["id"],
104            "music_url": post["music"].get("playUrl", ""),
105            "music_thumbnail": post["music"].get("coverLarge", ""),
106            "music_author": post["music"].get("authorName", ""),
107            "video_url": post["video"].get("downloadAddr", ""),
108            "tiktok_url": "https://www.tiktok.com/@%s/video/%s" % (user_nickname, post["id"]),
109            "thumbnail_url": thumbnail_url,
110            "likes": post["stats"]["diggCount"],
111            "comments": post["stats"]["commentCount"],
112            "shares": post["stats"]["shareCount"],
113            "plays": post["stats"]["playCount"],
114            "hashtags": ",".join(hashtags),
115            "challenges": ",".join(challenges),
116            "diversification_labels": labels,
117            "location_created": post.get("locationCreated", ""),
118            "effects": ",".join([e["name"] for e in post.get("effectStickers", [])]),
119            "warning": ",".join([w["text"] for w in post.get("warnInfo", [])])
120        })
class SearchTikTok(backend.lib.search.Search):
 16class SearchTikTok(Search):
 17    """
 18    Import scraped TikTok data
 19    """
 20    type = "tiktok-search"  # job ID
 21    category = "Search"  # category
 22    title = "Import scraped Tiktok data"  # title displayed in UI
 23    description = "Import Tiktok data collected with an external tool such as Zeeschuimer."  # description displayed in UI
 24    extension = "ndjson"  # extension of result file, used internally and in UI
 25    is_from_zeeschuimer = True
 26
 27    # not available as a processor for existing datasets
 28    accepts = [None]
 29    references = [
 30        "[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)",
 31        "[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)"
 32    ]
 33
 34    def get_items(self, query):
 35        """
 36        Run custom search
 37
 38        Not available for TikTok
 39        """
 40        raise NotImplementedError("TikTok datasets can only be created by importing data from elsewhere")
 41
 42    @staticmethod
 43    def map_item(post):
 44        # Zeeschuimer metadata
 45        metadata = post.get("__import_meta", {})
 46
 47        challenges = [challenge["title"] for challenge in post.get("challenges", [])]
 48
 49        hashtags = [extra["hashtagName"] for extra in post.get("textExtra", []) if
 50                    "hashtagName" in extra and extra["hashtagName"]]
 51
 52        labels = ",".join(post["diversificationLabels"]) if type(post.get("diversificationLabels")) is list else ""
 53
 54        if type(post.get("author")) is dict:
 55            # from intercepted API response
 56            user_nickname = post["author"]["uniqueId"]
 57            user_fullname = post["author"]["nickname"]
 58            user_thumbnail = post["author"].get("avatarThumb", "")
 59        elif post.get("author"):
 60            # from embedded JSON object
 61            user_nickname = post["author"]
 62            user_fullname = post["nickname"]
 63            user_thumbnail = ""
 64        else:
 65            user_nickname = ""
 66            user_fullname = ""
 67            user_thumbnail = ""
 68
 69        # there are various thumbnail URLs, some of them expire later than
 70        # others. Try to get the highest-resolution one that hasn't expired
 71        # yet
 72        thumbnail_options = []
 73
 74        if post["video"].get("shareCover"):
 75            thumbnail_options.append(post["video"]["shareCover"].pop())
 76
 77        if post["video"].get("cover"):
 78            thumbnail_options.append(post["video"]["cover"])
 79
 80        now = int(datetime.now(tz=timezone.utc).timestamp())
 81        thumbnail_url = [url for url in thumbnail_options if int(parse_qs(urlparse(url).query).get("x-expires", [now])[0]) >= now]
 82        thumbnail_url = thumbnail_url.pop() if thumbnail_url else ""
 83
 84        return MappedItem({
 85            "collected_from_url": normalize_url_encoding(metadata.get("source_platform_url")) if metadata.get("source_platform_url") else "",
 86            "id": post["id"],
 87            "thread_id": post["id"],
 88            "author": user_nickname,
 89            "author_full": user_fullname,
 90            "author_followers": post.get("authorStats", {}).get("followerCount", ""),
 91            "author_likes": post.get("authorStats", {}).get("diggCount", ""),
 92            "author_videos": post.get("authorStats", {}).get("videoCount", ""),
 93            "author_avatar": user_thumbnail,
 94            "body": post["desc"],
 95            "stickers": "\n".join(" ".join(s["stickerText"]) for s in post.get("stickersOnItem", [])),
 96            "timestamp": datetime.utcfromtimestamp(int(post["createTime"])).strftime('%Y-%m-%d %H:%M:%S'),
 97            "unix_timestamp": int(post["createTime"]),
 98            "is_duet": "yes" if (post.get("duetInfo", {}).get("duetFromId") != "0" if post.get("duetInfo", {}) else False) else "no",
 99            "is_ad": "yes" if post.get("isAd", False) else "no",
100            "is_paid_partnership": "yes" if post.get("adAuthorization") else "no",
101            "is_sensitive": "yes" if post.get("maskType") == 3 else "no",
102            "is_photosensitive": "yes" if post.get("maskType") == 4 else "no",
103            "music_name": post["music"]["title"],
104            "music_id": post["music"]["id"],
105            "music_url": post["music"].get("playUrl", ""),
106            "music_thumbnail": post["music"].get("coverLarge", ""),
107            "music_author": post["music"].get("authorName", ""),
108            "video_url": post["video"].get("downloadAddr", ""),
109            "tiktok_url": "https://www.tiktok.com/@%s/video/%s" % (user_nickname, post["id"]),
110            "thumbnail_url": thumbnail_url,
111            "likes": post["stats"]["diggCount"],
112            "comments": post["stats"]["commentCount"],
113            "shares": post["stats"]["shareCount"],
114            "plays": post["stats"]["playCount"],
115            "hashtags": ",".join(hashtags),
116            "challenges": ",".join(challenges),
117            "diversification_labels": labels,
118            "location_created": post.get("locationCreated", ""),
119            "effects": ",".join([e["name"] for e in post.get("effectStickers", [])]),
120            "warning": ",".join([w["text"] for w in post.get("warnInfo", [])])
121        })

Import scraped TikTok data

type = 'tiktok-search'
category = 'Search'
title = 'Import scraped Tiktok data'
description = 'Import Tiktok data collected with an external tool such as Zeeschuimer.'
extension = 'ndjson'
is_from_zeeschuimer = True
accepts = [None]
references = ['[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)', '[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)']
def get_items(self, query):
34    def get_items(self, query):
35        """
36        Run custom search
37
38        Not available for TikTok
39        """
40        raise NotImplementedError("TikTok datasets can only be created by importing data from elsewhere")

Run custom search

Not available for TikTok

@staticmethod
def map_item(post):
 42    @staticmethod
 43    def map_item(post):
 44        # Zeeschuimer metadata
 45        metadata = post.get("__import_meta", {})
 46
 47        challenges = [challenge["title"] for challenge in post.get("challenges", [])]
 48
 49        hashtags = [extra["hashtagName"] for extra in post.get("textExtra", []) if
 50                    "hashtagName" in extra and extra["hashtagName"]]
 51
 52        labels = ",".join(post["diversificationLabels"]) if type(post.get("diversificationLabels")) is list else ""
 53
 54        if type(post.get("author")) is dict:
 55            # from intercepted API response
 56            user_nickname = post["author"]["uniqueId"]
 57            user_fullname = post["author"]["nickname"]
 58            user_thumbnail = post["author"].get("avatarThumb", "")
 59        elif post.get("author"):
 60            # from embedded JSON object
 61            user_nickname = post["author"]
 62            user_fullname = post["nickname"]
 63            user_thumbnail = ""
 64        else:
 65            user_nickname = ""
 66            user_fullname = ""
 67            user_thumbnail = ""
 68
 69        # there are various thumbnail URLs, some of them expire later than
 70        # others. Try to get the highest-resolution one that hasn't expired
 71        # yet
 72        thumbnail_options = []
 73
 74        if post["video"].get("shareCover"):
 75            thumbnail_options.append(post["video"]["shareCover"].pop())
 76
 77        if post["video"].get("cover"):
 78            thumbnail_options.append(post["video"]["cover"])
 79
 80        now = int(datetime.now(tz=timezone.utc).timestamp())
 81        thumbnail_url = [url for url in thumbnail_options if int(parse_qs(urlparse(url).query).get("x-expires", [now])[0]) >= now]
 82        thumbnail_url = thumbnail_url.pop() if thumbnail_url else ""
 83
 84        return MappedItem({
 85            "collected_from_url": normalize_url_encoding(metadata.get("source_platform_url")) if metadata.get("source_platform_url") else "",
 86            "id": post["id"],
 87            "thread_id": post["id"],
 88            "author": user_nickname,
 89            "author_full": user_fullname,
 90            "author_followers": post.get("authorStats", {}).get("followerCount", ""),
 91            "author_likes": post.get("authorStats", {}).get("diggCount", ""),
 92            "author_videos": post.get("authorStats", {}).get("videoCount", ""),
 93            "author_avatar": user_thumbnail,
 94            "body": post["desc"],
 95            "stickers": "\n".join(" ".join(s["stickerText"]) for s in post.get("stickersOnItem", [])),
 96            "timestamp": datetime.utcfromtimestamp(int(post["createTime"])).strftime('%Y-%m-%d %H:%M:%S'),
 97            "unix_timestamp": int(post["createTime"]),
 98            "is_duet": "yes" if (post.get("duetInfo", {}).get("duetFromId") != "0" if post.get("duetInfo", {}) else False) else "no",
 99            "is_ad": "yes" if post.get("isAd", False) else "no",
100            "is_paid_partnership": "yes" if post.get("adAuthorization") else "no",
101            "is_sensitive": "yes" if post.get("maskType") == 3 else "no",
102            "is_photosensitive": "yes" if post.get("maskType") == 4 else "no",
103            "music_name": post["music"]["title"],
104            "music_id": post["music"]["id"],
105            "music_url": post["music"].get("playUrl", ""),
106            "music_thumbnail": post["music"].get("coverLarge", ""),
107            "music_author": post["music"].get("authorName", ""),
108            "video_url": post["video"].get("downloadAddr", ""),
109            "tiktok_url": "https://www.tiktok.com/@%s/video/%s" % (user_nickname, post["id"]),
110            "thumbnail_url": thumbnail_url,
111            "likes": post["stats"]["diggCount"],
112            "comments": post["stats"]["commentCount"],
113            "shares": post["stats"]["shareCount"],
114            "plays": post["stats"]["playCount"],
115            "hashtags": ",".join(hashtags),
116            "challenges": ",".join(challenges),
117            "diversification_labels": labels,
118            "location_created": post.get("locationCreated", ""),
119            "effects": ",".join([e["name"] for e in post.get("effectStickers", [])]),
120            "warning": ",".join([w["text"] for w in post.get("warnInfo", [])])
121        })