Edit on GitHub

datasources.tiktok_urls.search_tiktok_urls

Import scraped TikTok data

It's prohibitively difficult to scrape data from TikTok within 4CAT itself due to its aggressive rate limiting. Instead, import data collected elsewhere.

  1"""
  2Import scraped TikTok data
  3
  4It's prohibitively difficult to scrape data from TikTok within 4CAT itself due
  5to its aggressive rate limiting. Instead, import data collected elsewhere.
  6"""
  7import requests
  8import asyncio
  9import time
 10import json
 11import re
 12
 13from requests_futures.sessions import FuturesSession
 14from bs4 import BeautifulSoup
 15
 16from backend.lib.search import Search
 17from common.lib.helpers import UserInput
 18from common.lib.exceptions import WorkerInterruptedException, QueryParametersException, ProcessorException
 19from datasources.tiktok.search_tiktok import SearchTikTok as SearchTikTokByImport
 20from common.config_manager import config
 21
 22class SearchTikTokByID(Search):
 23    """
 24    Import scraped TikTok data
 25    """
 26    type = "tiktok-urls-search"  # job ID
 27    category = "Search"  # category
 28    title = "Search TikTok by post URL"  # title displayed in UI
 29    description = "Retrieve metadata for TikTok post URLs."  # description displayed in UI
 30    extension = "ndjson"  # extension of result file, used internally and in UI
 31    is_local = False  # Whether this datasource is locally scraped
 32    is_static = False  # Whether this datasource is still updated
 33
 34    # not available as a processor for existing datasets
 35    accepts = [None]
 36
 37    config = {
 38        "tiktok-urls-search.proxies": {
 39            "type": UserInput.OPTION_TEXT_JSON,
 40            "default": [],
 41            "help": "Proxies for TikTok data collection"
 42        },
 43        "tiktok-urls-search.proxies.wait": {
 44            "type": UserInput.OPTION_TEXT,
 45            "coerce_type": float,
 46            "default": 1.0,
 47            "help": "Request wait",
 48            "tooltip": "Time to wait before sending a new request from the same IP"
 49        }
 50    }
 51
 52    options = {
 53        "intro": {
 54            "type": UserInput.OPTION_INFO,
 55            "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those "
 56                    "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from "
 57                    "each post's page in the browser interface "
 58                    "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of "
 59                    "details about the post itself such as likes, tags and stickers. Note that some of the metadata is "
 60                    "only directly available when downloading the results as an .ndjson file."
 61        },
 62        "urls": {
 63            "type": UserInput.OPTION_TEXT_LARGE,
 64            "help": "Post URLs",
 65            "tooltip": "Separate by commas or new lines."
 66        }
 67    }
 68
 69    def get_items(self, query):
 70        """
 71        Retrieve metadata for TikTok URLs
 72
 73        :param dict query:  Search query parameters
 74        """
 75        tiktok_scraper = TikTokScraper(processor=self, config=self.config)
 76        loop = asyncio.new_event_loop()
 77        return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))
 78
 79    @staticmethod
 80    def validate_query(query, request, user):
 81        """
 82        Validate TikTok query
 83
 84        :param dict query:  Query parameters, from client-side.
 85        :param request:  Flask request
 86        :param User user:  User object of user who has submitted the query
 87        :return dict:  Safe query parameters
 88        """
 89        # reformat queries to be a comma-separated list with no wrapping
 90        # whitespace
 91        whitespace = re.compile(r"\s+")
 92        items = whitespace.sub("", query.get("urls").replace("\n", ","))
 93
 94        sanitized_items = []
 95        # handle telegram URLs
 96        for item in str(items).split(","):
 97            if not item.strip():
 98                continue
 99
100            if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \
101                    not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \
102                    not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item):
103                raise QueryParametersException("'%s' is not a valid TikTok video URL" % item)
104
105            sanitized_items.append(item)
106
107        # no query 4 u
108        if not sanitized_items:
109            raise QueryParametersException("You must provide at least one valid TikTok video URL.")
110
111        # simple!
112        return {
113            "urls": ",".join(sanitized_items)
114        }
115
116    @staticmethod
117    def map_item(item):
118        """
119        Analogous to the other TikTok data source
120
121        :param item:
122        :return:
123        """
124        return SearchTikTokByImport.map_item(item)
125
126
127class TikTokScraper:
128    proxy_map = None
129    proxy_sleep = 1
130    headers = {
131        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
132        "Accept-Encoding": "gzip, deflate",
133        "Accept-Language": "en-US,en;q=0.5",
134        "Connection": "keep-alive",
135        "DNT": "1",
136        "Sec-Fetch-Dest": "document",
137        "Sec-Fetch-Mode": "navigate",
138        "Sec-Fetch-Site": "none",
139        "Sec-Fetch-User": "?1",
140        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0"
141    }
142    last_proxy_update = 0
143    last_time_proxy_available = None
144    no_available_proxy_timeout = 600
145
146    VIDEO_NOT_FOUND = "oh no, sire, no video was found"
147
148    def __init__(self, processor, config):
149        """
150        :param Processor processor:  The processor using this function and needing updates
151        """
152        self.proxy_map = {}
153        self.processor = processor
154
155    def update_proxies(self):
156        """
157        Get proxies that are available
158
159        :return:
160        """
161        all_proxies = self.processor.config.get("tiktok-urls-search.proxies")
162        self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep)
163        if not all_proxies:
164            # no proxies? just request directly
165            all_proxies = ["__localhost__"]
166
167        for proxy in all_proxies:
168            if proxy in self.proxy_map:
169                continue
170            else:
171                self.proxy_map[proxy] = {
172                    "busy": False,
173                    "url": None,
174                    "next_request": 0
175                }
176
177        for proxy in list(self.proxy_map.keys()):
178            if proxy not in all_proxies:
179                del self.proxy_map[proxy]
180
181    def get_available_proxies(self):
182        """
183        Collect proxies from proxy_map that are ready for new requests
184        """
185        # update proxies every 5 seconds so we can potentially update them
186        # while the scrape is running
187        if self.last_proxy_update < time.time():
188            self.update_proxies()
189            self.last_proxy_update = time.time() + 5
190
191        # find out whether there is any connection we can use to send the
192        # next request
193        available_proxies = [proxy for proxy in self.proxy_map if
194                             not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()]
195
196        if not available_proxies:
197            # No proxy available
198            if self.last_time_proxy_available is None:
199                # First run, possibly issue, but this will allow it to time out
200                self.processor.dataset.log("No available proxy found at start of request_metadata")
201                self.last_time_proxy_available = time.time()
202
203            if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time():
204                # No available proxies in timeout period
205                raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}")
206        else:
207            self.last_time_proxy_available = time.time()
208
209        return available_proxies
210
211    def release_proxy(self, url):
212        """
213        Release a proxy to be used later
214        """
215        # Release proxy
216        used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url]
217        if used_proxy:
218            used_proxy = used_proxy[0]
219            self.proxy_map[used_proxy].update({
220                "busy": False,
221                "next_request": time.time() + self.proxy_sleep
222            })
223        else:
224            # TODO: why are we releasing a proxy without a URL?
225            self.processor.dataset.log(f"Unable to find and release proxy associated with {url}")
226            pass
227
228    async def request_metadata(self, urls):
229        """
230        Request TikTok metadata for a list of URLs
231
232        Uses asyncio to request URLs concurrently if proxy servers are
233        available. Returns a list of metadata, one object per video.
234
235        :param list urls:  URLs to collect data for
236        :return list:  Metadata
237        """
238        session = FuturesSession()
239        session.headers.update(self.headers)
240        tiktok_requests = {}
241        finished = 0
242        num_urls = len(urls)
243        seen_urls = set()
244
245        results = []
246        failed = 0
247        dupes = 0
248        retries = {}
249
250        while urls or tiktok_requests:
251            # give tasks time to run
252            await asyncio.sleep(0.1)
253
254            available_proxies = self.get_available_proxies()
255
256            for available_proxy in available_proxies:
257                url = None
258                while urls and url is None:
259                    url = urls.pop(0)
260                    url = url.replace("https://", "http://")  # https is finicky, lots of blocks
261
262                    # Check if url already collected or should be retried
263                    if url in seen_urls and url not in retries:
264                        finished += 1
265                        dupes += 1
266                        self.processor.dataset.log("Skipping duplicate of %s" % url)
267                        url = None
268                        continue
269
270                    # Add url to be collected
271                    self.processor.dataset.log(f"Requesting: {url}")
272                    proxy = {"http": available_proxy,
273                             "https": available_proxy} if available_proxy != "__localhost__" else None
274                    tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30)
275                    seen_urls.add(url)
276                    self.proxy_map[available_proxy].update({
277                        "busy": True,
278                        "url": url
279                    })
280
281            # wait for async requests to end (after cancelling) before quitting
282            # the worker
283            if self.processor.interrupted:
284                for request in tiktok_requests.values():
285                    request.cancel()
286
287                max_timeout = time.time() + 20
288                while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout:
289                    await asyncio.sleep(0.5)
290
291                raise WorkerInterruptedException("Interrupted while fetching TikTok metadata")
292
293            # handle received data
294            for url in list(tiktok_requests.keys()):
295                request = tiktok_requests[url]
296                if not request.done():
297                    continue
298
299                finished += 1
300                self.release_proxy(url)
301
302                # handle the exceptions we know to expect - else just raise and
303                # log
304                exception = request.exception()
305                if exception:
306                    failed += 1
307                    if isinstance(exception, requests.exceptions.RequestException):
308                        self.processor.dataset.update_status(
309                            "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception))
310                    else:
311                        raise exception
312
313                # retry on requestexceptions
314                try:
315                    response = request.result()
316                except requests.exceptions.RequestException:
317                    if url not in retries or retries[url] < 3:
318                        if url not in retries:
319                            retries[url] = 0
320                        retries[url] += 1
321                        urls.append(url)
322                    continue
323                finally:
324                    del tiktok_requests[url]
325
326                # video may not exist
327                if response.status_code == 404:
328                    failed += 1
329                    self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url)
330                    skip_to_next = True
331                    continue
332
333                # haven't seen these in the wild - 403 or 429 might happen?
334                elif response.status_code != 200:
335                    failed += 1
336                    self.processor.dataset.update_status(
337                        "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url))
338                    continue
339
340                # now! try to extract the JSON from the page
341                soup = BeautifulSoup(response.text, "html.parser")
342                sigil = soup.select_one("script#SIGI_STATE")
343
344                if not sigil:
345                    # alternatively, the JSON is here
346                    sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__")
347
348                if not sigil:
349                    if url not in retries or retries[url] < 3:
350                        if url not in retries:
351                            retries[url] = 0
352                        retries[url] += 1
353                        urls.append(url)
354                        self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url)
355                    else:
356                        failed += 1
357                        self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url)
358                    continue
359
360                try:
361                    if sigil.text:
362                        metadata = json.loads(sigil.text)
363                    elif sigil.contents and len(sigil.contents) > 0:
364                        metadata = json.loads(sigil.contents[0])
365                    else:
366                        failed += 1
367                        self.processor.dataset.log(
368                            "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
369                        continue
370                except json.JSONDecodeError:
371                    failed += 1
372                    self.processor.dataset.log(
373                        "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
374                    continue
375
376                for video in self.reformat_metadata(metadata):
377                    if video == self.VIDEO_NOT_FOUND:
378                        failed += 1
379                        self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping")
380                        continue
381
382                    if not video.get("stats") or video.get("createTime") == "0":
383                        # sometimes there are empty videos? which seems to
384                        # indicate a login wall
385
386                        self.processor.dataset.log(
387                            f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.")
388                        continue
389                    else:
390                        results.append(video)
391
392                    self.processor.dataset.update_status("Processed %s of %s TikTok URLs" %
393                                               ("{:,}".format(finished), "{:,}".format(num_urls)))
394                    self.processor.dataset.update_progress(finished / num_urls)
395
396        notes = []
397        if failed:
398            notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed))
399        if dupes:
400            notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes))
401
402        if notes:
403            self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See "
404                                       "dataset log for details." % ", ".join(notes))
405
406        return results
407
408    def reformat_metadata(self, metadata):
409        """
410        Take embedded JSON and yield one item per post
411
412        :param dict metadata: Metadata extracted from the TikTok video page
413        :return:  Yields one dictionary per video
414        """
415        # may need some extra parsing to find the item data...
416        if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]:
417            try:
418                video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"]
419            except KeyError as e:
420                if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]:
421                    yield self.VIDEO_NOT_FOUND
422                    return
423                else:
424                    raise e.__class__ from e
425
426            metadata = {"ItemModule": {
427                video["id"]: video
428            }}
429
430        if "ItemModule" in metadata:
431            for video_id, item in metadata["ItemModule"].items():
432                if "CommentItem" in metadata:
433                    comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id}
434                    if "UserModule" in metadata:
435                        for comment_id in list(comments.keys()):
436                            username = comments[comment_id]["user"]
437                            comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username,
438                                                                                                       username)
439                else:
440                    comments = {}
441
442                yield {**item, "comments": list(comments.values())}
443
444    async def download_videos(self, video_ids, staging_area, max_videos):
445        """
446        Download TikTok Videos
447
448        This is based on the TikTok downloader from https://jdownloader.org/
449        """
450        video_download_headers = {
451                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0",
452                "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5",
453                "Accept-Language": "en-US,en;q=0.5",
454                # "Range": "bytes=0-",
455                "Connection": "keep-alive",
456                "Referer": "https://www.tiktok.com/",
457                "Sec-Fetch-Dest": "video",
458                "Sec-Fetch-Mode": "no-cors",
459                "Sec-Fetch-Site": "cross-site",
460                "Accept-Encoding": "identity"
461            }
462        session = FuturesSession()
463
464        download_results = {}
465        downloaded_videos = 0
466        metadata_collected = 0
467        video_requests = {}
468        video_download_urls = []
469
470        while video_ids or video_download_urls or video_requests:
471            # give tasks time to run
472            await asyncio.sleep(0.1)
473
474            available_proxies = self.get_available_proxies()
475
476            for available_proxy in available_proxies:
477                if downloaded_videos > max_videos:
478                    # We're done here
479                    video_ids = []
480                    video_download_urls = []
481                    break
482
483                # Download videos (if available)
484                if video_download_urls:
485                    video_id, video_download_url = video_download_urls.pop(0)
486                    proxy = {"http": available_proxy,
487                             "https": available_proxy} if available_proxy != "__localhost__" else None
488                    session.headers.update(video_download_headers)
489                    video_requests[video_download_url] = {
490                        "request": session.get(video_download_url, proxies=proxy, timeout=30),
491                        "video_id": video_id,
492                        "type": "download",
493                    }
494                    self.proxy_map[available_proxy].update({
495                        "busy": True,
496                        "url": video_download_url
497                    })
498                # Collect video metadata (to find videos to download)
499                elif video_ids:
500                    video_id = video_ids.pop(0)
501                    url = f"https://www.tiktok.com/embed/v2/{video_id}"
502
503                    proxy = {"http": available_proxy,
504                             "https": available_proxy} if available_proxy != "__localhost__" else None
505                    session.headers.update(self.headers)
506                    video_requests[url] = {
507                        "request": session.get(url, proxies=proxy, timeout=30),
508                        "video_id": video_id,
509                        "type": "metadata",
510                    }
511                    self.proxy_map[available_proxy].update({
512                        "busy": True,
513                        "url": url
514                    })
515
516            # wait for async requests to end (after cancelling) before quitting
517            # the worker
518            if self.processor.interrupted:
519                for request in video_requests.values():
520                    request["request"].cancel()
521
522                max_timeout = time.time() + 20
523                while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout:
524                    await asyncio.sleep(0.5)
525
526                raise WorkerInterruptedException("Interrupted while downloading TikTok videos")
527
528            # Extract video download URLs
529            for url in list(video_requests.keys()):
530                video_id = video_requests[url]["video_id"]
531                request = video_requests[url]["request"]
532                request_type = video_requests[url]["type"]
533                request_metadata = {
534                    "success": False,
535                    "url": url,
536                    "error": None,
537                    "from_dataset": self.processor.source_dataset.key,
538                    "post_ids": [video_id],
539                }
540                if not request.done():
541                    continue
542
543                # Release proxy
544                self.release_proxy(url)
545
546                # Collect response
547                try:
548                    response = request.result()
549                except requests.exceptions.RequestException as e:
550                    error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})"
551                    request_metadata["error"] = error_message
552                    download_results[video_id] = request_metadata
553                    self.processor.dataset.log(error_message)
554                    continue
555                finally:
556                    del video_requests[url]
557
558                if response.status_code != 200:
559                    error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping."
560                    request_metadata["error"] = error_message
561                    download_results[video_id] = request_metadata
562                    self.processor.dataset.log(error_message)
563                    continue
564
565                if request_type == "metadata":
566                    # Collect Video Download URL
567                    soup = BeautifulSoup(response.text, "html.parser")
568                    json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__")
569                    video_metadata = None
570                    try:
571                        if json_source.text:
572                            video_metadata = json.loads(json_source.text)
573                        elif json_source.contents[0]:
574                            video_metadata = json.loads(json_source.contents[0])
575                    except json.JSONDecodeError as e:
576                        self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}")
577
578                    if not video_metadata:
579                        # Failed to collect metadata
580                        error_message = f"Failed to find metadata for video {video_id}"
581                        request_metadata["error"] = error_message
582                        download_results[video_id] = request_metadata
583                        self.processor.dataset.log(error_message)
584                        continue
585
586                    try:
587                        url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0]
588                    except (KeyError, IndexError):
589                        error_message = f"vid: {video_id} - failed to find video download URL"
590                        request_metadata["error"] = error_message
591                        download_results[video_id] = request_metadata
592                        self.processor.dataset.log(error_message)
593                        self.processor.dataset.log(video_metadata["source"]["data"].values())
594                        continue
595
596                    # Add new download URL to be collected
597                    video_download_urls.append((video_id, url))
598                    metadata_collected += 1
599                    self.processor.dataset.update_status("Collected metadata for %i/%i videos" %
600                                                    (metadata_collected, max_videos))
601                    self.processor.dataset.update_progress(metadata_collected / max_videos)
602
603                elif request_type == "download":
604                    # Download video
605                    with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f:
606                        for chunk in response.iter_content(chunk_size=1024 * 1024):
607                            if chunk:
608                                f.write(chunk)
609                    request_metadata["success"] = True
610                    request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}]
611                    download_results[video_id] = request_metadata
612
613                    downloaded_videos += 1
614                    self.processor.dataset.update_status("Downloaded %i/%i videos" %
615                                                    (downloaded_videos, max_videos))
616                    self.processor.dataset.update_progress(downloaded_videos / max_videos)
617
618        return download_results
class SearchTikTokByID(backend.lib.search.Search):
 23class SearchTikTokByID(Search):
 24    """
 25    Import scraped TikTok data
 26    """
 27    type = "tiktok-urls-search"  # job ID
 28    category = "Search"  # category
 29    title = "Search TikTok by post URL"  # title displayed in UI
 30    description = "Retrieve metadata for TikTok post URLs."  # description displayed in UI
 31    extension = "ndjson"  # extension of result file, used internally and in UI
 32    is_local = False  # Whether this datasource is locally scraped
 33    is_static = False  # Whether this datasource is still updated
 34
 35    # not available as a processor for existing datasets
 36    accepts = [None]
 37
 38    config = {
 39        "tiktok-urls-search.proxies": {
 40            "type": UserInput.OPTION_TEXT_JSON,
 41            "default": [],
 42            "help": "Proxies for TikTok data collection"
 43        },
 44        "tiktok-urls-search.proxies.wait": {
 45            "type": UserInput.OPTION_TEXT,
 46            "coerce_type": float,
 47            "default": 1.0,
 48            "help": "Request wait",
 49            "tooltip": "Time to wait before sending a new request from the same IP"
 50        }
 51    }
 52
 53    options = {
 54        "intro": {
 55            "type": UserInput.OPTION_INFO,
 56            "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those "
 57                    "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from "
 58                    "each post's page in the browser interface "
 59                    "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of "
 60                    "details about the post itself such as likes, tags and stickers. Note that some of the metadata is "
 61                    "only directly available when downloading the results as an .ndjson file."
 62        },
 63        "urls": {
 64            "type": UserInput.OPTION_TEXT_LARGE,
 65            "help": "Post URLs",
 66            "tooltip": "Separate by commas or new lines."
 67        }
 68    }
 69
 70    def get_items(self, query):
 71        """
 72        Retrieve metadata for TikTok URLs
 73
 74        :param dict query:  Search query parameters
 75        """
 76        tiktok_scraper = TikTokScraper(processor=self, config=self.config)
 77        loop = asyncio.new_event_loop()
 78        return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))
 79
 80    @staticmethod
 81    def validate_query(query, request, user):
 82        """
 83        Validate TikTok query
 84
 85        :param dict query:  Query parameters, from client-side.
 86        :param request:  Flask request
 87        :param User user:  User object of user who has submitted the query
 88        :return dict:  Safe query parameters
 89        """
 90        # reformat queries to be a comma-separated list with no wrapping
 91        # whitespace
 92        whitespace = re.compile(r"\s+")
 93        items = whitespace.sub("", query.get("urls").replace("\n", ","))
 94
 95        sanitized_items = []
 96        # handle telegram URLs
 97        for item in str(items).split(","):
 98            if not item.strip():
 99                continue
100
101            if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \
102                    not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \
103                    not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item):
104                raise QueryParametersException("'%s' is not a valid TikTok video URL" % item)
105
106            sanitized_items.append(item)
107
108        # no query 4 u
109        if not sanitized_items:
110            raise QueryParametersException("You must provide at least one valid TikTok video URL.")
111
112        # simple!
113        return {
114            "urls": ",".join(sanitized_items)
115        }
116
117    @staticmethod
118    def map_item(item):
119        """
120        Analogous to the other TikTok data source
121
122        :param item:
123        :return:
124        """
125        return SearchTikTokByImport.map_item(item)

Import scraped TikTok data

type = 'tiktok-urls-search'
category = 'Search'
title = 'Search TikTok by post URL'
description = 'Retrieve metadata for TikTok post URLs.'
extension = 'ndjson'
is_local = False
is_static = False
accepts = [None]
config = {'tiktok-urls-search.proxies': {'type': 'json', 'default': [], 'help': 'Proxies for TikTok data collection'}, 'tiktok-urls-search.proxies.wait': {'type': 'string', 'coerce_type': <class 'float'>, 'default': 1.0, 'help': 'Request wait', 'tooltip': 'Time to wait before sending a new request from the same IP'}}
options = {'intro': {'type': 'info', 'help': "This data source can retrieve metadata for TikTok posts based on a list of URLs for those posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from each post's page in the browser interface ([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of details about the post itself such as likes, tags and stickers. Note that some of the metadata is only directly available when downloading the results as an .ndjson file."}, 'urls': {'type': 'textarea', 'help': 'Post URLs', 'tooltip': 'Separate by commas or new lines.'}}
def get_items(self, query):
70    def get_items(self, query):
71        """
72        Retrieve metadata for TikTok URLs
73
74        :param dict query:  Search query parameters
75        """
76        tiktok_scraper = TikTokScraper(processor=self, config=self.config)
77        loop = asyncio.new_event_loop()
78        return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))

Retrieve metadata for TikTok URLs

Parameters
  • dict query: Search query parameters
@staticmethod
def validate_query(query, request, user):
 80    @staticmethod
 81    def validate_query(query, request, user):
 82        """
 83        Validate TikTok query
 84
 85        :param dict query:  Query parameters, from client-side.
 86        :param request:  Flask request
 87        :param User user:  User object of user who has submitted the query
 88        :return dict:  Safe query parameters
 89        """
 90        # reformat queries to be a comma-separated list with no wrapping
 91        # whitespace
 92        whitespace = re.compile(r"\s+")
 93        items = whitespace.sub("", query.get("urls").replace("\n", ","))
 94
 95        sanitized_items = []
 96        # handle telegram URLs
 97        for item in str(items).split(","):
 98            if not item.strip():
 99                continue
100
101            if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \
102                    not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \
103                    not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item):
104                raise QueryParametersException("'%s' is not a valid TikTok video URL" % item)
105
106            sanitized_items.append(item)
107
108        # no query 4 u
109        if not sanitized_items:
110            raise QueryParametersException("You must provide at least one valid TikTok video URL.")
111
112        # simple!
113        return {
114            "urls": ",".join(sanitized_items)
115        }

Validate TikTok query

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters

@staticmethod
def map_item(item):
117    @staticmethod
118    def map_item(item):
119        """
120        Analogous to the other TikTok data source
121
122        :param item:
123        :return:
124        """
125        return SearchTikTokByImport.map_item(item)

Analogous to the other TikTok data source

Parameters
  • item:
Returns
class TikTokScraper:
128class TikTokScraper:
129    proxy_map = None
130    proxy_sleep = 1
131    headers = {
132        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
133        "Accept-Encoding": "gzip, deflate",
134        "Accept-Language": "en-US,en;q=0.5",
135        "Connection": "keep-alive",
136        "DNT": "1",
137        "Sec-Fetch-Dest": "document",
138        "Sec-Fetch-Mode": "navigate",
139        "Sec-Fetch-Site": "none",
140        "Sec-Fetch-User": "?1",
141        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0"
142    }
143    last_proxy_update = 0
144    last_time_proxy_available = None
145    no_available_proxy_timeout = 600
146
147    VIDEO_NOT_FOUND = "oh no, sire, no video was found"
148
149    def __init__(self, processor, config):
150        """
151        :param Processor processor:  The processor using this function and needing updates
152        """
153        self.proxy_map = {}
154        self.processor = processor
155
156    def update_proxies(self):
157        """
158        Get proxies that are available
159
160        :return:
161        """
162        all_proxies = self.processor.config.get("tiktok-urls-search.proxies")
163        self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep)
164        if not all_proxies:
165            # no proxies? just request directly
166            all_proxies = ["__localhost__"]
167
168        for proxy in all_proxies:
169            if proxy in self.proxy_map:
170                continue
171            else:
172                self.proxy_map[proxy] = {
173                    "busy": False,
174                    "url": None,
175                    "next_request": 0
176                }
177
178        for proxy in list(self.proxy_map.keys()):
179            if proxy not in all_proxies:
180                del self.proxy_map[proxy]
181
182    def get_available_proxies(self):
183        """
184        Collect proxies from proxy_map that are ready for new requests
185        """
186        # update proxies every 5 seconds so we can potentially update them
187        # while the scrape is running
188        if self.last_proxy_update < time.time():
189            self.update_proxies()
190            self.last_proxy_update = time.time() + 5
191
192        # find out whether there is any connection we can use to send the
193        # next request
194        available_proxies = [proxy for proxy in self.proxy_map if
195                             not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()]
196
197        if not available_proxies:
198            # No proxy available
199            if self.last_time_proxy_available is None:
200                # First run, possibly issue, but this will allow it to time out
201                self.processor.dataset.log("No available proxy found at start of request_metadata")
202                self.last_time_proxy_available = time.time()
203
204            if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time():
205                # No available proxies in timeout period
206                raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}")
207        else:
208            self.last_time_proxy_available = time.time()
209
210        return available_proxies
211
212    def release_proxy(self, url):
213        """
214        Release a proxy to be used later
215        """
216        # Release proxy
217        used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url]
218        if used_proxy:
219            used_proxy = used_proxy[0]
220            self.proxy_map[used_proxy].update({
221                "busy": False,
222                "next_request": time.time() + self.proxy_sleep
223            })
224        else:
225            # TODO: why are we releasing a proxy without a URL?
226            self.processor.dataset.log(f"Unable to find and release proxy associated with {url}")
227            pass
228
229    async def request_metadata(self, urls):
230        """
231        Request TikTok metadata for a list of URLs
232
233        Uses asyncio to request URLs concurrently if proxy servers are
234        available. Returns a list of metadata, one object per video.
235
236        :param list urls:  URLs to collect data for
237        :return list:  Metadata
238        """
239        session = FuturesSession()
240        session.headers.update(self.headers)
241        tiktok_requests = {}
242        finished = 0
243        num_urls = len(urls)
244        seen_urls = set()
245
246        results = []
247        failed = 0
248        dupes = 0
249        retries = {}
250
251        while urls or tiktok_requests:
252            # give tasks time to run
253            await asyncio.sleep(0.1)
254
255            available_proxies = self.get_available_proxies()
256
257            for available_proxy in available_proxies:
258                url = None
259                while urls and url is None:
260                    url = urls.pop(0)
261                    url = url.replace("https://", "http://")  # https is finicky, lots of blocks
262
263                    # Check if url already collected or should be retried
264                    if url in seen_urls and url not in retries:
265                        finished += 1
266                        dupes += 1
267                        self.processor.dataset.log("Skipping duplicate of %s" % url)
268                        url = None
269                        continue
270
271                    # Add url to be collected
272                    self.processor.dataset.log(f"Requesting: {url}")
273                    proxy = {"http": available_proxy,
274                             "https": available_proxy} if available_proxy != "__localhost__" else None
275                    tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30)
276                    seen_urls.add(url)
277                    self.proxy_map[available_proxy].update({
278                        "busy": True,
279                        "url": url
280                    })
281
282            # wait for async requests to end (after cancelling) before quitting
283            # the worker
284            if self.processor.interrupted:
285                for request in tiktok_requests.values():
286                    request.cancel()
287
288                max_timeout = time.time() + 20
289                while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout:
290                    await asyncio.sleep(0.5)
291
292                raise WorkerInterruptedException("Interrupted while fetching TikTok metadata")
293
294            # handle received data
295            for url in list(tiktok_requests.keys()):
296                request = tiktok_requests[url]
297                if not request.done():
298                    continue
299
300                finished += 1
301                self.release_proxy(url)
302
303                # handle the exceptions we know to expect - else just raise and
304                # log
305                exception = request.exception()
306                if exception:
307                    failed += 1
308                    if isinstance(exception, requests.exceptions.RequestException):
309                        self.processor.dataset.update_status(
310                            "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception))
311                    else:
312                        raise exception
313
314                # retry on requestexceptions
315                try:
316                    response = request.result()
317                except requests.exceptions.RequestException:
318                    if url not in retries or retries[url] < 3:
319                        if url not in retries:
320                            retries[url] = 0
321                        retries[url] += 1
322                        urls.append(url)
323                    continue
324                finally:
325                    del tiktok_requests[url]
326
327                # video may not exist
328                if response.status_code == 404:
329                    failed += 1
330                    self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url)
331                    skip_to_next = True
332                    continue
333
334                # haven't seen these in the wild - 403 or 429 might happen?
335                elif response.status_code != 200:
336                    failed += 1
337                    self.processor.dataset.update_status(
338                        "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url))
339                    continue
340
341                # now! try to extract the JSON from the page
342                soup = BeautifulSoup(response.text, "html.parser")
343                sigil = soup.select_one("script#SIGI_STATE")
344
345                if not sigil:
346                    # alternatively, the JSON is here
347                    sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__")
348
349                if not sigil:
350                    if url not in retries or retries[url] < 3:
351                        if url not in retries:
352                            retries[url] = 0
353                        retries[url] += 1
354                        urls.append(url)
355                        self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url)
356                    else:
357                        failed += 1
358                        self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url)
359                    continue
360
361                try:
362                    if sigil.text:
363                        metadata = json.loads(sigil.text)
364                    elif sigil.contents and len(sigil.contents) > 0:
365                        metadata = json.loads(sigil.contents[0])
366                    else:
367                        failed += 1
368                        self.processor.dataset.log(
369                            "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
370                        continue
371                except json.JSONDecodeError:
372                    failed += 1
373                    self.processor.dataset.log(
374                        "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
375                    continue
376
377                for video in self.reformat_metadata(metadata):
378                    if video == self.VIDEO_NOT_FOUND:
379                        failed += 1
380                        self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping")
381                        continue
382
383                    if not video.get("stats") or video.get("createTime") == "0":
384                        # sometimes there are empty videos? which seems to
385                        # indicate a login wall
386
387                        self.processor.dataset.log(
388                            f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.")
389                        continue
390                    else:
391                        results.append(video)
392
393                    self.processor.dataset.update_status("Processed %s of %s TikTok URLs" %
394                                               ("{:,}".format(finished), "{:,}".format(num_urls)))
395                    self.processor.dataset.update_progress(finished / num_urls)
396
397        notes = []
398        if failed:
399            notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed))
400        if dupes:
401            notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes))
402
403        if notes:
404            self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See "
405                                       "dataset log for details." % ", ".join(notes))
406
407        return results
408
409    def reformat_metadata(self, metadata):
410        """
411        Take embedded JSON and yield one item per post
412
413        :param dict metadata: Metadata extracted from the TikTok video page
414        :return:  Yields one dictionary per video
415        """
416        # may need some extra parsing to find the item data...
417        if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]:
418            try:
419                video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"]
420            except KeyError as e:
421                if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]:
422                    yield self.VIDEO_NOT_FOUND
423                    return
424                else:
425                    raise e.__class__ from e
426
427            metadata = {"ItemModule": {
428                video["id"]: video
429            }}
430
431        if "ItemModule" in metadata:
432            for video_id, item in metadata["ItemModule"].items():
433                if "CommentItem" in metadata:
434                    comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id}
435                    if "UserModule" in metadata:
436                        for comment_id in list(comments.keys()):
437                            username = comments[comment_id]["user"]
438                            comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username,
439                                                                                                       username)
440                else:
441                    comments = {}
442
443                yield {**item, "comments": list(comments.values())}
444
445    async def download_videos(self, video_ids, staging_area, max_videos):
446        """
447        Download TikTok Videos
448
449        This is based on the TikTok downloader from https://jdownloader.org/
450        """
451        video_download_headers = {
452                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0",
453                "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5",
454                "Accept-Language": "en-US,en;q=0.5",
455                # "Range": "bytes=0-",
456                "Connection": "keep-alive",
457                "Referer": "https://www.tiktok.com/",
458                "Sec-Fetch-Dest": "video",
459                "Sec-Fetch-Mode": "no-cors",
460                "Sec-Fetch-Site": "cross-site",
461                "Accept-Encoding": "identity"
462            }
463        session = FuturesSession()
464
465        download_results = {}
466        downloaded_videos = 0
467        metadata_collected = 0
468        video_requests = {}
469        video_download_urls = []
470
471        while video_ids or video_download_urls or video_requests:
472            # give tasks time to run
473            await asyncio.sleep(0.1)
474
475            available_proxies = self.get_available_proxies()
476
477            for available_proxy in available_proxies:
478                if downloaded_videos > max_videos:
479                    # We're done here
480                    video_ids = []
481                    video_download_urls = []
482                    break
483
484                # Download videos (if available)
485                if video_download_urls:
486                    video_id, video_download_url = video_download_urls.pop(0)
487                    proxy = {"http": available_proxy,
488                             "https": available_proxy} if available_proxy != "__localhost__" else None
489                    session.headers.update(video_download_headers)
490                    video_requests[video_download_url] = {
491                        "request": session.get(video_download_url, proxies=proxy, timeout=30),
492                        "video_id": video_id,
493                        "type": "download",
494                    }
495                    self.proxy_map[available_proxy].update({
496                        "busy": True,
497                        "url": video_download_url
498                    })
499                # Collect video metadata (to find videos to download)
500                elif video_ids:
501                    video_id = video_ids.pop(0)
502                    url = f"https://www.tiktok.com/embed/v2/{video_id}"
503
504                    proxy = {"http": available_proxy,
505                             "https": available_proxy} if available_proxy != "__localhost__" else None
506                    session.headers.update(self.headers)
507                    video_requests[url] = {
508                        "request": session.get(url, proxies=proxy, timeout=30),
509                        "video_id": video_id,
510                        "type": "metadata",
511                    }
512                    self.proxy_map[available_proxy].update({
513                        "busy": True,
514                        "url": url
515                    })
516
517            # wait for async requests to end (after cancelling) before quitting
518            # the worker
519            if self.processor.interrupted:
520                for request in video_requests.values():
521                    request["request"].cancel()
522
523                max_timeout = time.time() + 20
524                while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout:
525                    await asyncio.sleep(0.5)
526
527                raise WorkerInterruptedException("Interrupted while downloading TikTok videos")
528
529            # Extract video download URLs
530            for url in list(video_requests.keys()):
531                video_id = video_requests[url]["video_id"]
532                request = video_requests[url]["request"]
533                request_type = video_requests[url]["type"]
534                request_metadata = {
535                    "success": False,
536                    "url": url,
537                    "error": None,
538                    "from_dataset": self.processor.source_dataset.key,
539                    "post_ids": [video_id],
540                }
541                if not request.done():
542                    continue
543
544                # Release proxy
545                self.release_proxy(url)
546
547                # Collect response
548                try:
549                    response = request.result()
550                except requests.exceptions.RequestException as e:
551                    error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})"
552                    request_metadata["error"] = error_message
553                    download_results[video_id] = request_metadata
554                    self.processor.dataset.log(error_message)
555                    continue
556                finally:
557                    del video_requests[url]
558
559                if response.status_code != 200:
560                    error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping."
561                    request_metadata["error"] = error_message
562                    download_results[video_id] = request_metadata
563                    self.processor.dataset.log(error_message)
564                    continue
565
566                if request_type == "metadata":
567                    # Collect Video Download URL
568                    soup = BeautifulSoup(response.text, "html.parser")
569                    json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__")
570                    video_metadata = None
571                    try:
572                        if json_source.text:
573                            video_metadata = json.loads(json_source.text)
574                        elif json_source.contents[0]:
575                            video_metadata = json.loads(json_source.contents[0])
576                    except json.JSONDecodeError as e:
577                        self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}")
578
579                    if not video_metadata:
580                        # Failed to collect metadata
581                        error_message = f"Failed to find metadata for video {video_id}"
582                        request_metadata["error"] = error_message
583                        download_results[video_id] = request_metadata
584                        self.processor.dataset.log(error_message)
585                        continue
586
587                    try:
588                        url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0]
589                    except (KeyError, IndexError):
590                        error_message = f"vid: {video_id} - failed to find video download URL"
591                        request_metadata["error"] = error_message
592                        download_results[video_id] = request_metadata
593                        self.processor.dataset.log(error_message)
594                        self.processor.dataset.log(video_metadata["source"]["data"].values())
595                        continue
596
597                    # Add new download URL to be collected
598                    video_download_urls.append((video_id, url))
599                    metadata_collected += 1
600                    self.processor.dataset.update_status("Collected metadata for %i/%i videos" %
601                                                    (metadata_collected, max_videos))
602                    self.processor.dataset.update_progress(metadata_collected / max_videos)
603
604                elif request_type == "download":
605                    # Download video
606                    with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f:
607                        for chunk in response.iter_content(chunk_size=1024 * 1024):
608                            if chunk:
609                                f.write(chunk)
610                    request_metadata["success"] = True
611                    request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}]
612                    download_results[video_id] = request_metadata
613
614                    downloaded_videos += 1
615                    self.processor.dataset.update_status("Downloaded %i/%i videos" %
616                                                    (downloaded_videos, max_videos))
617                    self.processor.dataset.update_progress(downloaded_videos / max_videos)
618
619        return download_results
TikTokScraper(processor, config)
149    def __init__(self, processor, config):
150        """
151        :param Processor processor:  The processor using this function and needing updates
152        """
153        self.proxy_map = {}
154        self.processor = processor
Parameters
  • Processor processor: The processor using this function and needing updates
proxy_map = None
proxy_sleep = 1
headers = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'DNT': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0'}
last_proxy_update = 0
last_time_proxy_available = None
no_available_proxy_timeout = 600
VIDEO_NOT_FOUND = 'oh no, sire, no video was found'
processor
def update_proxies(self):
156    def update_proxies(self):
157        """
158        Get proxies that are available
159
160        :return:
161        """
162        all_proxies = self.processor.config.get("tiktok-urls-search.proxies")
163        self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep)
164        if not all_proxies:
165            # no proxies? just request directly
166            all_proxies = ["__localhost__"]
167
168        for proxy in all_proxies:
169            if proxy in self.proxy_map:
170                continue
171            else:
172                self.proxy_map[proxy] = {
173                    "busy": False,
174                    "url": None,
175                    "next_request": 0
176                }
177
178        for proxy in list(self.proxy_map.keys()):
179            if proxy not in all_proxies:
180                del self.proxy_map[proxy]

Get proxies that are available

Returns
def get_available_proxies(self):
182    def get_available_proxies(self):
183        """
184        Collect proxies from proxy_map that are ready for new requests
185        """
186        # update proxies every 5 seconds so we can potentially update them
187        # while the scrape is running
188        if self.last_proxy_update < time.time():
189            self.update_proxies()
190            self.last_proxy_update = time.time() + 5
191
192        # find out whether there is any connection we can use to send the
193        # next request
194        available_proxies = [proxy for proxy in self.proxy_map if
195                             not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()]
196
197        if not available_proxies:
198            # No proxy available
199            if self.last_time_proxy_available is None:
200                # First run, possibly issue, but this will allow it to time out
201                self.processor.dataset.log("No available proxy found at start of request_metadata")
202                self.last_time_proxy_available = time.time()
203
204            if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time():
205                # No available proxies in timeout period
206                raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}")
207        else:
208            self.last_time_proxy_available = time.time()
209
210        return available_proxies

Collect proxies from proxy_map that are ready for new requests

def release_proxy(self, url):
212    def release_proxy(self, url):
213        """
214        Release a proxy to be used later
215        """
216        # Release proxy
217        used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url]
218        if used_proxy:
219            used_proxy = used_proxy[0]
220            self.proxy_map[used_proxy].update({
221                "busy": False,
222                "next_request": time.time() + self.proxy_sleep
223            })
224        else:
225            # TODO: why are we releasing a proxy without a URL?
226            self.processor.dataset.log(f"Unable to find and release proxy associated with {url}")
227            pass

Release a proxy to be used later

async def request_metadata(self, urls):
229    async def request_metadata(self, urls):
230        """
231        Request TikTok metadata for a list of URLs
232
233        Uses asyncio to request URLs concurrently if proxy servers are
234        available. Returns a list of metadata, one object per video.
235
236        :param list urls:  URLs to collect data for
237        :return list:  Metadata
238        """
239        session = FuturesSession()
240        session.headers.update(self.headers)
241        tiktok_requests = {}
242        finished = 0
243        num_urls = len(urls)
244        seen_urls = set()
245
246        results = []
247        failed = 0
248        dupes = 0
249        retries = {}
250
251        while urls or tiktok_requests:
252            # give tasks time to run
253            await asyncio.sleep(0.1)
254
255            available_proxies = self.get_available_proxies()
256
257            for available_proxy in available_proxies:
258                url = None
259                while urls and url is None:
260                    url = urls.pop(0)
261                    url = url.replace("https://", "http://")  # https is finicky, lots of blocks
262
263                    # Check if url already collected or should be retried
264                    if url in seen_urls and url not in retries:
265                        finished += 1
266                        dupes += 1
267                        self.processor.dataset.log("Skipping duplicate of %s" % url)
268                        url = None
269                        continue
270
271                    # Add url to be collected
272                    self.processor.dataset.log(f"Requesting: {url}")
273                    proxy = {"http": available_proxy,
274                             "https": available_proxy} if available_proxy != "__localhost__" else None
275                    tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30)
276                    seen_urls.add(url)
277                    self.proxy_map[available_proxy].update({
278                        "busy": True,
279                        "url": url
280                    })
281
282            # wait for async requests to end (after cancelling) before quitting
283            # the worker
284            if self.processor.interrupted:
285                for request in tiktok_requests.values():
286                    request.cancel()
287
288                max_timeout = time.time() + 20
289                while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout:
290                    await asyncio.sleep(0.5)
291
292                raise WorkerInterruptedException("Interrupted while fetching TikTok metadata")
293
294            # handle received data
295            for url in list(tiktok_requests.keys()):
296                request = tiktok_requests[url]
297                if not request.done():
298                    continue
299
300                finished += 1
301                self.release_proxy(url)
302
303                # handle the exceptions we know to expect - else just raise and
304                # log
305                exception = request.exception()
306                if exception:
307                    failed += 1
308                    if isinstance(exception, requests.exceptions.RequestException):
309                        self.processor.dataset.update_status(
310                            "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception))
311                    else:
312                        raise exception
313
314                # retry on requestexceptions
315                try:
316                    response = request.result()
317                except requests.exceptions.RequestException:
318                    if url not in retries or retries[url] < 3:
319                        if url not in retries:
320                            retries[url] = 0
321                        retries[url] += 1
322                        urls.append(url)
323                    continue
324                finally:
325                    del tiktok_requests[url]
326
327                # video may not exist
328                if response.status_code == 404:
329                    failed += 1
330                    self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url)
331                    skip_to_next = True
332                    continue
333
334                # haven't seen these in the wild - 403 or 429 might happen?
335                elif response.status_code != 200:
336                    failed += 1
337                    self.processor.dataset.update_status(
338                        "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url))
339                    continue
340
341                # now! try to extract the JSON from the page
342                soup = BeautifulSoup(response.text, "html.parser")
343                sigil = soup.select_one("script#SIGI_STATE")
344
345                if not sigil:
346                    # alternatively, the JSON is here
347                    sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__")
348
349                if not sigil:
350                    if url not in retries or retries[url] < 3:
351                        if url not in retries:
352                            retries[url] = 0
353                        retries[url] += 1
354                        urls.append(url)
355                        self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url)
356                    else:
357                        failed += 1
358                        self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url)
359                    continue
360
361                try:
362                    if sigil.text:
363                        metadata = json.loads(sigil.text)
364                    elif sigil.contents and len(sigil.contents) > 0:
365                        metadata = json.loads(sigil.contents[0])
366                    else:
367                        failed += 1
368                        self.processor.dataset.log(
369                            "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
370                        continue
371                except json.JSONDecodeError:
372                    failed += 1
373                    self.processor.dataset.log(
374                        "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
375                    continue
376
377                for video in self.reformat_metadata(metadata):
378                    if video == self.VIDEO_NOT_FOUND:
379                        failed += 1
380                        self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping")
381                        continue
382
383                    if not video.get("stats") or video.get("createTime") == "0":
384                        # sometimes there are empty videos? which seems to
385                        # indicate a login wall
386
387                        self.processor.dataset.log(
388                            f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.")
389                        continue
390                    else:
391                        results.append(video)
392
393                    self.processor.dataset.update_status("Processed %s of %s TikTok URLs" %
394                                               ("{:,}".format(finished), "{:,}".format(num_urls)))
395                    self.processor.dataset.update_progress(finished / num_urls)
396
397        notes = []
398        if failed:
399            notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed))
400        if dupes:
401            notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes))
402
403        if notes:
404            self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See "
405                                       "dataset log for details." % ", ".join(notes))
406
407        return results

Request TikTok metadata for a list of URLs

Uses asyncio to request URLs concurrently if proxy servers are available. Returns a list of metadata, one object per video.

Parameters
  • list urls: URLs to collect data for
Returns

Metadata

def reformat_metadata(self, metadata):
409    def reformat_metadata(self, metadata):
410        """
411        Take embedded JSON and yield one item per post
412
413        :param dict metadata: Metadata extracted from the TikTok video page
414        :return:  Yields one dictionary per video
415        """
416        # may need some extra parsing to find the item data...
417        if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]:
418            try:
419                video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"]
420            except KeyError as e:
421                if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]:
422                    yield self.VIDEO_NOT_FOUND
423                    return
424                else:
425                    raise e.__class__ from e
426
427            metadata = {"ItemModule": {
428                video["id"]: video
429            }}
430
431        if "ItemModule" in metadata:
432            for video_id, item in metadata["ItemModule"].items():
433                if "CommentItem" in metadata:
434                    comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id}
435                    if "UserModule" in metadata:
436                        for comment_id in list(comments.keys()):
437                            username = comments[comment_id]["user"]
438                            comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username,
439                                                                                                       username)
440                else:
441                    comments = {}
442
443                yield {**item, "comments": list(comments.values())}

Take embedded JSON and yield one item per post

Parameters
  • dict metadata: Metadata extracted from the TikTok video page
Returns

Yields one dictionary per video

async def download_videos(self, video_ids, staging_area, max_videos):
445    async def download_videos(self, video_ids, staging_area, max_videos):
446        """
447        Download TikTok Videos
448
449        This is based on the TikTok downloader from https://jdownloader.org/
450        """
451        video_download_headers = {
452                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0",
453                "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5",
454                "Accept-Language": "en-US,en;q=0.5",
455                # "Range": "bytes=0-",
456                "Connection": "keep-alive",
457                "Referer": "https://www.tiktok.com/",
458                "Sec-Fetch-Dest": "video",
459                "Sec-Fetch-Mode": "no-cors",
460                "Sec-Fetch-Site": "cross-site",
461                "Accept-Encoding": "identity"
462            }
463        session = FuturesSession()
464
465        download_results = {}
466        downloaded_videos = 0
467        metadata_collected = 0
468        video_requests = {}
469        video_download_urls = []
470
471        while video_ids or video_download_urls or video_requests:
472            # give tasks time to run
473            await asyncio.sleep(0.1)
474
475            available_proxies = self.get_available_proxies()
476
477            for available_proxy in available_proxies:
478                if downloaded_videos > max_videos:
479                    # We're done here
480                    video_ids = []
481                    video_download_urls = []
482                    break
483
484                # Download videos (if available)
485                if video_download_urls:
486                    video_id, video_download_url = video_download_urls.pop(0)
487                    proxy = {"http": available_proxy,
488                             "https": available_proxy} if available_proxy != "__localhost__" else None
489                    session.headers.update(video_download_headers)
490                    video_requests[video_download_url] = {
491                        "request": session.get(video_download_url, proxies=proxy, timeout=30),
492                        "video_id": video_id,
493                        "type": "download",
494                    }
495                    self.proxy_map[available_proxy].update({
496                        "busy": True,
497                        "url": video_download_url
498                    })
499                # Collect video metadata (to find videos to download)
500                elif video_ids:
501                    video_id = video_ids.pop(0)
502                    url = f"https://www.tiktok.com/embed/v2/{video_id}"
503
504                    proxy = {"http": available_proxy,
505                             "https": available_proxy} if available_proxy != "__localhost__" else None
506                    session.headers.update(self.headers)
507                    video_requests[url] = {
508                        "request": session.get(url, proxies=proxy, timeout=30),
509                        "video_id": video_id,
510                        "type": "metadata",
511                    }
512                    self.proxy_map[available_proxy].update({
513                        "busy": True,
514                        "url": url
515                    })
516
517            # wait for async requests to end (after cancelling) before quitting
518            # the worker
519            if self.processor.interrupted:
520                for request in video_requests.values():
521                    request["request"].cancel()
522
523                max_timeout = time.time() + 20
524                while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout:
525                    await asyncio.sleep(0.5)
526
527                raise WorkerInterruptedException("Interrupted while downloading TikTok videos")
528
529            # Extract video download URLs
530            for url in list(video_requests.keys()):
531                video_id = video_requests[url]["video_id"]
532                request = video_requests[url]["request"]
533                request_type = video_requests[url]["type"]
534                request_metadata = {
535                    "success": False,
536                    "url": url,
537                    "error": None,
538                    "from_dataset": self.processor.source_dataset.key,
539                    "post_ids": [video_id],
540                }
541                if not request.done():
542                    continue
543
544                # Release proxy
545                self.release_proxy(url)
546
547                # Collect response
548                try:
549                    response = request.result()
550                except requests.exceptions.RequestException as e:
551                    error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})"
552                    request_metadata["error"] = error_message
553                    download_results[video_id] = request_metadata
554                    self.processor.dataset.log(error_message)
555                    continue
556                finally:
557                    del video_requests[url]
558
559                if response.status_code != 200:
560                    error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping."
561                    request_metadata["error"] = error_message
562                    download_results[video_id] = request_metadata
563                    self.processor.dataset.log(error_message)
564                    continue
565
566                if request_type == "metadata":
567                    # Collect Video Download URL
568                    soup = BeautifulSoup(response.text, "html.parser")
569                    json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__")
570                    video_metadata = None
571                    try:
572                        if json_source.text:
573                            video_metadata = json.loads(json_source.text)
574                        elif json_source.contents[0]:
575                            video_metadata = json.loads(json_source.contents[0])
576                    except json.JSONDecodeError as e:
577                        self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}")
578
579                    if not video_metadata:
580                        # Failed to collect metadata
581                        error_message = f"Failed to find metadata for video {video_id}"
582                        request_metadata["error"] = error_message
583                        download_results[video_id] = request_metadata
584                        self.processor.dataset.log(error_message)
585                        continue
586
587                    try:
588                        url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0]
589                    except (KeyError, IndexError):
590                        error_message = f"vid: {video_id} - failed to find video download URL"
591                        request_metadata["error"] = error_message
592                        download_results[video_id] = request_metadata
593                        self.processor.dataset.log(error_message)
594                        self.processor.dataset.log(video_metadata["source"]["data"].values())
595                        continue
596
597                    # Add new download URL to be collected
598                    video_download_urls.append((video_id, url))
599                    metadata_collected += 1
600                    self.processor.dataset.update_status("Collected metadata for %i/%i videos" %
601                                                    (metadata_collected, max_videos))
602                    self.processor.dataset.update_progress(metadata_collected / max_videos)
603
604                elif request_type == "download":
605                    # Download video
606                    with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f:
607                        for chunk in response.iter_content(chunk_size=1024 * 1024):
608                            if chunk:
609                                f.write(chunk)
610                    request_metadata["success"] = True
611                    request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}]
612                    download_results[video_id] = request_metadata
613
614                    downloaded_videos += 1
615                    self.processor.dataset.update_status("Downloaded %i/%i videos" %
616                                                    (downloaded_videos, max_videos))
617                    self.processor.dataset.update_progress(downloaded_videos / max_videos)
618
619        return download_results

Download TikTok Videos

This is based on the TikTok downloader from https://jdownloader.org/