Edit on GitHub

datasources.tiktok_urls.search_tiktok_urls

Import scraped TikTok data

It's prohibitively difficult to scrape data from TikTok within 4CAT itself due to its aggressive rate limiting. Instead, import data collected elsewhere.

  1"""
  2Import scraped TikTok data
  3
  4It's prohibitively difficult to scrape data from TikTok within 4CAT itself due
  5to its aggressive rate limiting. Instead, import data collected elsewhere.
  6"""
  7import requests
  8import asyncio
  9import time
 10import json
 11import re
 12
 13from requests_futures.sessions import FuturesSession
 14from bs4 import BeautifulSoup
 15
 16from backend.lib.search import Search
 17from common.lib.helpers import UserInput
 18from common.lib.exceptions import WorkerInterruptedException, QueryParametersException, ProcessorException
 19from datasources.tiktok.search_tiktok import SearchTikTok as SearchTikTokByImport
 20
 21class SearchTikTokByID(Search):
 22    """
 23    Import scraped TikTok data
 24    """
 25    type = "tiktok-urls-search"  # job ID
 26    category = "Search"  # category
 27    title = "Search TikTok by post URL"  # title displayed in UI
 28    description = "Retrieve metadata for TikTok post URLs."  # description displayed in UI
 29    extension = "ndjson"  # extension of result file, used internally and in UI
 30    is_local = False  # Whether this datasource is locally scraped
 31    is_static = False  # Whether this datasource is still updated
 32
 33    # not available as a processor for existing datasets
 34    accepts = [None]
 35
 36    config = {
 37        "tiktok-urls-search.proxies": {
 38            "type": UserInput.OPTION_TEXT_JSON,
 39            "default": [],
 40            "help": "Proxies for TikTok data collection"
 41        },
 42        "tiktok-urls-search.proxies.wait": {
 43            "type": UserInput.OPTION_TEXT,
 44            "coerce_type": float,
 45            "default": 1.0,
 46            "help": "Request wait",
 47            "tooltip": "Time to wait before sending a new request from the same IP"
 48        }
 49    }
 50
 51    options = {
 52        "intro": {
 53            "type": UserInput.OPTION_INFO,
 54            "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those "
 55                    "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from "
 56                    "each post's page in the browser interface "
 57                    "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of "
 58                    "details about the post itself such as likes, tags and stickers. Note that some of the metadata is "
 59                    "only directly available when downloading the results as an .ndjson file."
 60        },
 61        "urls": {
 62            "type": UserInput.OPTION_TEXT_LARGE,
 63            "help": "Post URLs",
 64            "tooltip": "Separate by commas or new lines."
 65        }
 66    }
 67
 68    def get_items(self, query):
 69        """
 70        Retrieve metadata for TikTok URLs
 71
 72        :param dict query:  Search query parameters
 73        """
 74        tiktok_scraper = TikTokScraper(processor=self, config=self.config)
 75        loop = asyncio.new_event_loop()
 76        return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))
 77
 78    @staticmethod
 79    def validate_query(query, request, config):
 80        """
 81        Validate TikTok query
 82
 83        :param dict query:  Query parameters, from client-side.
 84        :param request:  Flask request
 85        :param ConfigManager|None config:  Configuration reader (context-aware)
 86        :return dict:  Safe query parameters
 87        """
 88        if not query.get("urls"):
 89            # no URLs provided
 90            raise QueryParametersException("Missing \"Post URLs\" (\"urls\" parameter). Please provide a list of TikTok video URLs.")
 91        
 92        # reformat queries to be a comma-separated list with no wrapping
 93        # whitespace
 94        whitespace = re.compile(r"\s+")
 95        items = whitespace.sub("", query.get("urls").replace("\n", ","))
 96
 97        sanitized_items = []
 98        # handle telegram URLs
 99        for item in str(items).split(","):
100            if not item.strip():
101                continue
102
103            if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \
104                    not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \
105                    not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item):
106                raise QueryParametersException("'%s' is not a valid TikTok video URL" % item)
107
108            sanitized_items.append(item)
109
110        # no query 4 u
111        if not sanitized_items:
112            raise QueryParametersException("You must provide at least one valid TikTok video URL.")
113
114        # simple!
115        return {
116            "urls": ",".join(sanitized_items)
117        }
118
119    @staticmethod
120    def map_item(item):
121        """
122        Analogous to the other TikTok data source
123
124        :param item:
125        :return:
126        """
127        return SearchTikTokByImport.map_item(item)
128
129
130class TikTokScraper:
131    proxy_map = None
132    proxy_sleep = 1
133    headers = {
134        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
135        "Accept-Encoding": "gzip, deflate",
136        "Accept-Language": "en-US,en;q=0.5",
137        "Connection": "keep-alive",
138        "DNT": "1",
139        "Sec-Fetch-Dest": "document",
140        "Sec-Fetch-Mode": "navigate",
141        "Sec-Fetch-Site": "none",
142        "Sec-Fetch-User": "?1",
143        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0"
144    }
145    last_proxy_update = 0
146    last_time_proxy_available = None
147    no_available_proxy_timeout = 600
148    consecutive_failures = 0
149
150    VIDEO_NOT_FOUND = "oh no, sire, no video was found"
151
152    def __init__(self, processor, config):
153        """
154        :param Processor processor:  The processor using this function and needing updates
155        """
156        self.proxy_map = {}
157        self.processor = processor
158
159    def update_proxies(self):
160        """
161        Get proxies that are available
162
163        :return:
164        """
165        all_proxies = self.processor.config.get("tiktok-urls-search.proxies")
166        self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep)
167        if not all_proxies:
168            # no proxies? just request directly
169            all_proxies = ["__localhost__"]
170
171        for proxy in all_proxies:
172            if proxy in self.proxy_map:
173                continue
174            else:
175                self.proxy_map[proxy] = {
176                    "busy": False,
177                    "url": None,
178                    "next_request": 0
179                }
180
181        for proxy in list(self.proxy_map.keys()):
182            if proxy not in all_proxies:
183                del self.proxy_map[proxy]
184
185    def get_available_proxies(self):
186        """
187        Collect proxies from proxy_map that are ready for new requests
188        """
189        # update proxies every 5 seconds so we can potentially update them
190        # while the scrape is running
191        if self.last_proxy_update < time.time():
192            self.update_proxies()
193            self.last_proxy_update = time.time() + 5
194
195        # find out whether there is any connection we can use to send the
196        # next request
197        available_proxies = [proxy for proxy in self.proxy_map if
198                             not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()]
199
200        if not available_proxies:
201            # No proxy available
202            if self.last_time_proxy_available is None:
203                # First run, possibly issue, but this will allow it to time out
204                self.processor.dataset.log("No available proxy found at start of request_metadata")
205                self.last_time_proxy_available = time.time()
206
207            if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time():
208                # No available proxies in timeout period
209                raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}")
210        else:
211            self.last_time_proxy_available = time.time()
212
213        return available_proxies
214
215    def release_proxy(self, url):
216        """
217        Release a proxy to be used later
218        """
219        # Release proxy
220        used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url]
221        if used_proxy:
222            used_proxy = used_proxy[0]
223            self.proxy_map[used_proxy].update({
224                "busy": False,
225                "next_request": time.time() + self.proxy_sleep
226            })
227        else:
228            # TODO: why are we releasing a proxy without a URL?
229            self.processor.dataset.log(f"Unable to find and release proxy associated with {url}")
230            pass
231
232    async def request_metadata(self, urls):
233        """
234        Request TikTok metadata for a list of URLs
235
236        Uses asyncio to request URLs concurrently if proxy servers are
237        available. Returns a list of metadata, one object per video.
238
239        :param list urls:  URLs to collect data for
240        :return list:  Metadata
241        """
242        session = FuturesSession()
243        session.headers.update(self.headers)
244        tiktok_requests = {}
245        finished = 0
246        num_urls = len(urls)
247        seen_urls = set()
248
249        results = []
250        failed = 0
251        dupes = 0
252        retries = {}
253
254        while urls or tiktok_requests:
255            # give tasks time to run
256            await asyncio.sleep(0.1)
257
258            available_proxies = self.get_available_proxies()
259
260            for available_proxy in available_proxies:
261                url = None
262                while urls and url is None:
263                    url = urls.pop(0)
264                    url = url.replace("https://", "http://")  # https is finicky, lots of blocks
265
266                    # Check if url already collected or should be retried
267                    if url in seen_urls and url not in retries:
268                        finished += 1
269                        dupes += 1
270                        self.processor.dataset.log("Skipping duplicate of %s" % url)
271                        url = None
272                        continue
273
274                    # Add url to be collected
275                    self.processor.dataset.log(f"Requesting: {url}")
276                    proxy = {"http": available_proxy,
277                             "https": available_proxy} if available_proxy != "__localhost__" else None
278                    tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30)
279                    seen_urls.add(url)
280                    self.proxy_map[available_proxy].update({
281                        "busy": True,
282                        "url": url
283                    })
284
285            # wait for async requests to end (after cancelling) before quitting
286            # the worker
287            if self.processor.interrupted:
288                for request in tiktok_requests.values():
289                    request.cancel()
290
291                max_timeout = time.time() + 20
292                while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout:
293                    await asyncio.sleep(0.5)
294
295                raise WorkerInterruptedException("Interrupted while fetching TikTok metadata")
296
297            # handle received data
298            for url in list(tiktok_requests.keys()):
299                request = tiktok_requests[url]
300                if not request.done():
301                    continue
302
303                finished += 1
304                self.release_proxy(url)
305
306                # handle the exceptions we know to expect - else just raise and
307                # log
308                exception = request.exception()
309                if exception:
310                    failed += 1
311                    if isinstance(exception, requests.exceptions.RequestException):
312                        self.processor.dataset.update_status(
313                            "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception))
314                    else:
315                        raise exception
316
317                # retry on requestexceptions
318                try:
319                    response = request.result()
320                except requests.exceptions.RequestException:
321                    if url not in retries or retries[url] < 3:
322                        if url not in retries:
323                            retries[url] = 0
324                        retries[url] += 1
325                        urls.append(url)
326                    continue
327                finally:
328                    del tiktok_requests[url]
329
330                # video may not exist
331                if response.status_code == 404:
332                    failed += 1
333                    self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url)
334                    continue
335
336                # haven't seen these in the wild - 403 or 429 might happen?
337                elif response.status_code != 200:
338                    failed += 1
339                    self.processor.dataset.update_status(
340                        "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url))
341                    continue
342
343                # now! try to extract the JSON from the page
344                soup = BeautifulSoup(response.text, "html.parser")
345                sigil = soup.select_one("script#SIGI_STATE")
346
347                if not sigil:
348                    # alternatively, the JSON is here
349                    sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__")
350
351                if not sigil:
352                    if url not in retries or retries[url] < 3:
353                        if url not in retries:
354                            retries[url] = 0
355                        retries[url] += 1
356                        urls.append(url)
357                        self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url)
358                    else:
359                        failed += 1
360                        self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url)
361                    continue
362
363                try:
364                    if sigil.text:
365                        metadata = json.loads(sigil.text)
366                    elif sigil.contents and len(sigil.contents) > 0:
367                        metadata = json.loads(sigil.contents[0])
368                    else:
369                        failed += 1
370                        self.processor.dataset.log(
371                            "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
372                        continue
373                except json.JSONDecodeError:
374                    failed += 1
375                    self.processor.dataset.log(
376                        "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
377                    continue
378
379                for video in self.reformat_metadata(metadata):
380                    if video == self.VIDEO_NOT_FOUND:
381                        failed += 1
382                        self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping")
383                        continue
384
385                    if not video.get("stats") or video.get("createTime") == "0":
386                        # sometimes there are empty videos? which seems to
387                        # indicate a login wall
388
389                        self.processor.dataset.log(
390                            f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.")
391                        continue
392                    else:
393                        results.append(video)
394
395                    self.processor.dataset.update_status("Processed %s of %s TikTok URLs" %
396                                               ("{:,}".format(finished), "{:,}".format(num_urls)))
397                    self.processor.dataset.update_progress(finished / num_urls)
398
399        notes = []
400        if failed:
401            notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed))
402        if dupes:
403            notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes))
404
405        if notes:
406            self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See "
407                                       "dataset log for details." % ", ".join(notes))
408
409        return results
410
411    def reformat_metadata(self, metadata):
412        """
413        Take embedded JSON and yield one item per post
414
415        :param dict metadata: Metadata extracted from the TikTok video page
416        :return:  Yields one dictionary per video
417        """
418        # may need some extra parsing to find the item data...
419        if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]:
420            try:
421                video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"]
422            except KeyError as e:
423                if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]:
424                    yield self.VIDEO_NOT_FOUND
425                    return
426                else:
427                    raise e.__class__ from e
428
429            metadata = {"ItemModule": {
430                video["id"]: video
431            }}
432
433        if "ItemModule" in metadata:
434            for video_id, item in metadata["ItemModule"].items():
435                if "CommentItem" in metadata:
436                    comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id}
437                    if "UserModule" in metadata:
438                        for comment_id in list(comments.keys()):
439                            username = comments[comment_id]["user"]
440                            comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username,
441                                                                                                       username)
442                else:
443                    comments = {}
444
445                yield {**item, "comments": list(comments.values())}
446
447    async def download_videos(self, video_ids, staging_area, max_videos):
448        """
449        Download TikTok Videos
450
451        This is based on the TikTok downloader from https://jdownloader.org/
452        """
453        video_download_headers = {
454                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0",
455                "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5",
456                "Accept-Language": "en-US,en;q=0.5",
457                # "Range": "bytes=0-",
458                "Connection": "keep-alive",
459                "Referer": "https://www.tiktok.com/",
460                "Sec-Fetch-Dest": "video",
461                "Sec-Fetch-Mode": "no-cors",
462                "Sec-Fetch-Site": "cross-site",
463                "Accept-Encoding": "identity"
464            }
465        session = FuturesSession()
466
467        download_results = {}
468        downloaded_videos = 0
469        metadata_collected = 0
470        video_requests = {}
471        video_download_urls = []
472
473        while video_ids or video_download_urls or video_requests:
474            # give tasks time to run
475            await asyncio.sleep(0.1)
476
477            available_proxies = self.get_available_proxies()
478
479            for available_proxy in available_proxies:
480                if downloaded_videos > max_videos:
481                    # We're done here
482                    video_ids = []
483                    video_download_urls = []
484                    break
485
486                # Download videos (if available)
487                if video_download_urls:
488                    video_id, video_download_url = video_download_urls.pop(0)
489                    proxy = {"http": available_proxy,
490                             "https": available_proxy} if available_proxy != "__localhost__" else None
491                    session.headers.update(video_download_headers)
492                    video_requests[video_download_url] = {
493                        "request": session.get(video_download_url, proxies=proxy, timeout=30), # not using stream=True here, as it seems slower; possibly due to short video lengths
494                        "video_id": video_id,
495                        "type": "download",
496                    }
497                    self.proxy_map[available_proxy].update({
498                        "busy": True,
499                        "url": video_download_url
500                    })
501                # Collect video metadata (to find videos to download)
502                elif video_ids:
503                    video_id = video_ids.pop(0)
504                    url = f"https://www.tiktok.com/embed/v2/{video_id}"
505
506                    proxy = {"http": available_proxy,
507                             "https": available_proxy} if available_proxy != "__localhost__" else None
508                    session.headers.update(self.headers)
509                    video_requests[url] = {
510                        "request": session.get(url, proxies=proxy, timeout=30),
511                        "video_id": video_id,
512                        "type": "metadata",
513                    }
514                    self.proxy_map[available_proxy].update({
515                        "busy": True,
516                        "url": url
517                    })
518
519            # wait for async requests to end (after cancelling) before quitting
520            # the worker
521            if self.processor.interrupted:
522                for request in video_requests.values():
523                    request["request"].cancel()
524
525                max_timeout = time.time() + 20
526                while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout:
527                    await asyncio.sleep(0.5)
528
529                raise WorkerInterruptedException("Interrupted while downloading TikTok videos")
530
531            # Extract video download URLs
532            for url in list(video_requests.keys()):
533                video_id = video_requests[url]["video_id"]
534                request = video_requests[url]["request"]
535                request_type = video_requests[url]["type"]
536                request_metadata = {
537                    "success": False,
538                    "url": url,
539                    "error": None,
540                    "from_dataset": self.processor.source_dataset.key,
541                    "post_ids": [video_id],
542                }
543                if not request.done():
544                    continue
545
546                # Release proxy
547                self.release_proxy(url)
548
549                # Collect response
550                try:
551                    response = request.result()
552                except requests.exceptions.RequestException as e:
553                    error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})"
554                    request_metadata["error"] = error_message
555                    download_results[video_id] = request_metadata
556                    self.processor.dataset.log(error_message)
557                    continue
558                finally:
559                    del video_requests[url]
560
561                if response.status_code != 200:
562                    error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping."
563                    request_metadata["error"] = error_message
564                    download_results[video_id] = request_metadata
565                    self.processor.dataset.log(error_message)
566                    continue
567
568                if request_type == "metadata":
569                    # Collect Video Download URL
570                    soup = BeautifulSoup(response.text, "html.parser")
571                    json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__")
572                    video_metadata = None
573                    try:
574                        if json_source.text:
575                            video_metadata = json.loads(json_source.text)
576                        elif json_source.contents[0]:
577                            video_metadata = json.loads(json_source.contents[0])
578                    except json.JSONDecodeError as e:
579                        self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}")
580
581                    if not video_metadata:
582                        # Failed to collect metadata
583                        error_message = f"Failed to find metadata for video {video_id}"
584                        request_metadata["error"] = error_message
585                        download_results[video_id] = request_metadata
586                        self.processor.dataset.log(error_message)
587                        self.consecutive_failures += 1
588                        if self.consecutive_failures > 5:
589                            raise ProcessorException("Too many consecutive failures, stopping")
590                        continue
591
592                    try:
593                        url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0]
594                    except (KeyError, IndexError):
595                        error_message = f"vid: {video_id} - failed to find video download URL"
596                        request_metadata["error"] = error_message
597                        download_results[video_id] = request_metadata
598                        self.processor.dataset.log(error_message)
599                        self.processor.dataset.log(video_metadata["source"]["data"].values())
600                        continue
601                    
602                    # Add new download URL to be collected
603                    self.consecutive_failures = 0
604                    video_download_urls.append((video_id, url))
605                    metadata_collected += 1
606                    self.processor.dataset.update_status("Collected metadata for %i/%i videos" %
607                                                    (metadata_collected, max_videos))
608                    self.processor.dataset.update_progress(metadata_collected / max_videos)
609
610                elif request_type == "download":
611                    # Download video
612                    with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f:
613                        for chunk in response.iter_content(chunk_size=1024 * 1024):
614                            if chunk:
615                                f.write(chunk)
616                    request_metadata["success"] = True
617                    request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}]
618                    download_results[video_id] = request_metadata
619
620                    downloaded_videos += 1
621                    self.processor.dataset.update_status("Downloaded %i/%i videos" %
622                                                    (downloaded_videos, max_videos))
623                    self.processor.dataset.update_progress(downloaded_videos / max_videos)
624
625        return download_results
class SearchTikTokByID(backend.lib.search.Search):
 22class SearchTikTokByID(Search):
 23    """
 24    Import scraped TikTok data
 25    """
 26    type = "tiktok-urls-search"  # job ID
 27    category = "Search"  # category
 28    title = "Search TikTok by post URL"  # title displayed in UI
 29    description = "Retrieve metadata for TikTok post URLs."  # description displayed in UI
 30    extension = "ndjson"  # extension of result file, used internally and in UI
 31    is_local = False  # Whether this datasource is locally scraped
 32    is_static = False  # Whether this datasource is still updated
 33
 34    # not available as a processor for existing datasets
 35    accepts = [None]
 36
 37    config = {
 38        "tiktok-urls-search.proxies": {
 39            "type": UserInput.OPTION_TEXT_JSON,
 40            "default": [],
 41            "help": "Proxies for TikTok data collection"
 42        },
 43        "tiktok-urls-search.proxies.wait": {
 44            "type": UserInput.OPTION_TEXT,
 45            "coerce_type": float,
 46            "default": 1.0,
 47            "help": "Request wait",
 48            "tooltip": "Time to wait before sending a new request from the same IP"
 49        }
 50    }
 51
 52    options = {
 53        "intro": {
 54            "type": UserInput.OPTION_INFO,
 55            "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those "
 56                    "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from "
 57                    "each post's page in the browser interface "
 58                    "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of "
 59                    "details about the post itself such as likes, tags and stickers. Note that some of the metadata is "
 60                    "only directly available when downloading the results as an .ndjson file."
 61        },
 62        "urls": {
 63            "type": UserInput.OPTION_TEXT_LARGE,
 64            "help": "Post URLs",
 65            "tooltip": "Separate by commas or new lines."
 66        }
 67    }
 68
 69    def get_items(self, query):
 70        """
 71        Retrieve metadata for TikTok URLs
 72
 73        :param dict query:  Search query parameters
 74        """
 75        tiktok_scraper = TikTokScraper(processor=self, config=self.config)
 76        loop = asyncio.new_event_loop()
 77        return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))
 78
 79    @staticmethod
 80    def validate_query(query, request, config):
 81        """
 82        Validate TikTok query
 83
 84        :param dict query:  Query parameters, from client-side.
 85        :param request:  Flask request
 86        :param ConfigManager|None config:  Configuration reader (context-aware)
 87        :return dict:  Safe query parameters
 88        """
 89        if not query.get("urls"):
 90            # no URLs provided
 91            raise QueryParametersException("Missing \"Post URLs\" (\"urls\" parameter). Please provide a list of TikTok video URLs.")
 92        
 93        # reformat queries to be a comma-separated list with no wrapping
 94        # whitespace
 95        whitespace = re.compile(r"\s+")
 96        items = whitespace.sub("", query.get("urls").replace("\n", ","))
 97
 98        sanitized_items = []
 99        # handle telegram URLs
100        for item in str(items).split(","):
101            if not item.strip():
102                continue
103
104            if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \
105                    not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \
106                    not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item):
107                raise QueryParametersException("'%s' is not a valid TikTok video URL" % item)
108
109            sanitized_items.append(item)
110
111        # no query 4 u
112        if not sanitized_items:
113            raise QueryParametersException("You must provide at least one valid TikTok video URL.")
114
115        # simple!
116        return {
117            "urls": ",".join(sanitized_items)
118        }
119
120    @staticmethod
121    def map_item(item):
122        """
123        Analogous to the other TikTok data source
124
125        :param item:
126        :return:
127        """
128        return SearchTikTokByImport.map_item(item)

Import scraped TikTok data

type = 'tiktok-urls-search'
category = 'Search'
title = 'Search TikTok by post URL'
description = 'Retrieve metadata for TikTok post URLs.'
extension = 'ndjson'
is_local = False
is_static = False
accepts = [None]
config = {'tiktok-urls-search.proxies': {'type': 'json', 'default': [], 'help': 'Proxies for TikTok data collection'}, 'tiktok-urls-search.proxies.wait': {'type': 'string', 'coerce_type': <class 'float'>, 'default': 1.0, 'help': 'Request wait', 'tooltip': 'Time to wait before sending a new request from the same IP'}}
options = {'intro': {'type': 'info', 'help': "This data source can retrieve metadata for TikTok posts based on a list of URLs for those posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from each post's page in the browser interface ([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of details about the post itself such as likes, tags and stickers. Note that some of the metadata is only directly available when downloading the results as an .ndjson file."}, 'urls': {'type': 'textarea', 'help': 'Post URLs', 'tooltip': 'Separate by commas or new lines.'}}
def get_items(self, query):
69    def get_items(self, query):
70        """
71        Retrieve metadata for TikTok URLs
72
73        :param dict query:  Search query parameters
74        """
75        tiktok_scraper = TikTokScraper(processor=self, config=self.config)
76        loop = asyncio.new_event_loop()
77        return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))

Retrieve metadata for TikTok URLs

Parameters
  • dict query: Search query parameters
@staticmethod
def validate_query(query, request, config):
 79    @staticmethod
 80    def validate_query(query, request, config):
 81        """
 82        Validate TikTok query
 83
 84        :param dict query:  Query parameters, from client-side.
 85        :param request:  Flask request
 86        :param ConfigManager|None config:  Configuration reader (context-aware)
 87        :return dict:  Safe query parameters
 88        """
 89        if not query.get("urls"):
 90            # no URLs provided
 91            raise QueryParametersException("Missing \"Post URLs\" (\"urls\" parameter). Please provide a list of TikTok video URLs.")
 92        
 93        # reformat queries to be a comma-separated list with no wrapping
 94        # whitespace
 95        whitespace = re.compile(r"\s+")
 96        items = whitespace.sub("", query.get("urls").replace("\n", ","))
 97
 98        sanitized_items = []
 99        # handle telegram URLs
100        for item in str(items).split(","):
101            if not item.strip():
102                continue
103
104            if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \
105                    not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \
106                    not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item):
107                raise QueryParametersException("'%s' is not a valid TikTok video URL" % item)
108
109            sanitized_items.append(item)
110
111        # no query 4 u
112        if not sanitized_items:
113            raise QueryParametersException("You must provide at least one valid TikTok video URL.")
114
115        # simple!
116        return {
117            "urls": ",".join(sanitized_items)
118        }

Validate TikTok query

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • ConfigManager|None config: Configuration reader (context-aware)
Returns

Safe query parameters

@staticmethod
def map_item(item):
120    @staticmethod
121    def map_item(item):
122        """
123        Analogous to the other TikTok data source
124
125        :param item:
126        :return:
127        """
128        return SearchTikTokByImport.map_item(item)

Analogous to the other TikTok data source

Parameters
  • item:
Returns
class TikTokScraper:
131class TikTokScraper:
132    proxy_map = None
133    proxy_sleep = 1
134    headers = {
135        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
136        "Accept-Encoding": "gzip, deflate",
137        "Accept-Language": "en-US,en;q=0.5",
138        "Connection": "keep-alive",
139        "DNT": "1",
140        "Sec-Fetch-Dest": "document",
141        "Sec-Fetch-Mode": "navigate",
142        "Sec-Fetch-Site": "none",
143        "Sec-Fetch-User": "?1",
144        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0"
145    }
146    last_proxy_update = 0
147    last_time_proxy_available = None
148    no_available_proxy_timeout = 600
149    consecutive_failures = 0
150
151    VIDEO_NOT_FOUND = "oh no, sire, no video was found"
152
153    def __init__(self, processor, config):
154        """
155        :param Processor processor:  The processor using this function and needing updates
156        """
157        self.proxy_map = {}
158        self.processor = processor
159
160    def update_proxies(self):
161        """
162        Get proxies that are available
163
164        :return:
165        """
166        all_proxies = self.processor.config.get("tiktok-urls-search.proxies")
167        self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep)
168        if not all_proxies:
169            # no proxies? just request directly
170            all_proxies = ["__localhost__"]
171
172        for proxy in all_proxies:
173            if proxy in self.proxy_map:
174                continue
175            else:
176                self.proxy_map[proxy] = {
177                    "busy": False,
178                    "url": None,
179                    "next_request": 0
180                }
181
182        for proxy in list(self.proxy_map.keys()):
183            if proxy not in all_proxies:
184                del self.proxy_map[proxy]
185
186    def get_available_proxies(self):
187        """
188        Collect proxies from proxy_map that are ready for new requests
189        """
190        # update proxies every 5 seconds so we can potentially update them
191        # while the scrape is running
192        if self.last_proxy_update < time.time():
193            self.update_proxies()
194            self.last_proxy_update = time.time() + 5
195
196        # find out whether there is any connection we can use to send the
197        # next request
198        available_proxies = [proxy for proxy in self.proxy_map if
199                             not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()]
200
201        if not available_proxies:
202            # No proxy available
203            if self.last_time_proxy_available is None:
204                # First run, possibly issue, but this will allow it to time out
205                self.processor.dataset.log("No available proxy found at start of request_metadata")
206                self.last_time_proxy_available = time.time()
207
208            if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time():
209                # No available proxies in timeout period
210                raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}")
211        else:
212            self.last_time_proxy_available = time.time()
213
214        return available_proxies
215
216    def release_proxy(self, url):
217        """
218        Release a proxy to be used later
219        """
220        # Release proxy
221        used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url]
222        if used_proxy:
223            used_proxy = used_proxy[0]
224            self.proxy_map[used_proxy].update({
225                "busy": False,
226                "next_request": time.time() + self.proxy_sleep
227            })
228        else:
229            # TODO: why are we releasing a proxy without a URL?
230            self.processor.dataset.log(f"Unable to find and release proxy associated with {url}")
231            pass
232
233    async def request_metadata(self, urls):
234        """
235        Request TikTok metadata for a list of URLs
236
237        Uses asyncio to request URLs concurrently if proxy servers are
238        available. Returns a list of metadata, one object per video.
239
240        :param list urls:  URLs to collect data for
241        :return list:  Metadata
242        """
243        session = FuturesSession()
244        session.headers.update(self.headers)
245        tiktok_requests = {}
246        finished = 0
247        num_urls = len(urls)
248        seen_urls = set()
249
250        results = []
251        failed = 0
252        dupes = 0
253        retries = {}
254
255        while urls or tiktok_requests:
256            # give tasks time to run
257            await asyncio.sleep(0.1)
258
259            available_proxies = self.get_available_proxies()
260
261            for available_proxy in available_proxies:
262                url = None
263                while urls and url is None:
264                    url = urls.pop(0)
265                    url = url.replace("https://", "http://")  # https is finicky, lots of blocks
266
267                    # Check if url already collected or should be retried
268                    if url in seen_urls and url not in retries:
269                        finished += 1
270                        dupes += 1
271                        self.processor.dataset.log("Skipping duplicate of %s" % url)
272                        url = None
273                        continue
274
275                    # Add url to be collected
276                    self.processor.dataset.log(f"Requesting: {url}")
277                    proxy = {"http": available_proxy,
278                             "https": available_proxy} if available_proxy != "__localhost__" else None
279                    tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30)
280                    seen_urls.add(url)
281                    self.proxy_map[available_proxy].update({
282                        "busy": True,
283                        "url": url
284                    })
285
286            # wait for async requests to end (after cancelling) before quitting
287            # the worker
288            if self.processor.interrupted:
289                for request in tiktok_requests.values():
290                    request.cancel()
291
292                max_timeout = time.time() + 20
293                while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout:
294                    await asyncio.sleep(0.5)
295
296                raise WorkerInterruptedException("Interrupted while fetching TikTok metadata")
297
298            # handle received data
299            for url in list(tiktok_requests.keys()):
300                request = tiktok_requests[url]
301                if not request.done():
302                    continue
303
304                finished += 1
305                self.release_proxy(url)
306
307                # handle the exceptions we know to expect - else just raise and
308                # log
309                exception = request.exception()
310                if exception:
311                    failed += 1
312                    if isinstance(exception, requests.exceptions.RequestException):
313                        self.processor.dataset.update_status(
314                            "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception))
315                    else:
316                        raise exception
317
318                # retry on requestexceptions
319                try:
320                    response = request.result()
321                except requests.exceptions.RequestException:
322                    if url not in retries or retries[url] < 3:
323                        if url not in retries:
324                            retries[url] = 0
325                        retries[url] += 1
326                        urls.append(url)
327                    continue
328                finally:
329                    del tiktok_requests[url]
330
331                # video may not exist
332                if response.status_code == 404:
333                    failed += 1
334                    self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url)
335                    continue
336
337                # haven't seen these in the wild - 403 or 429 might happen?
338                elif response.status_code != 200:
339                    failed += 1
340                    self.processor.dataset.update_status(
341                        "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url))
342                    continue
343
344                # now! try to extract the JSON from the page
345                soup = BeautifulSoup(response.text, "html.parser")
346                sigil = soup.select_one("script#SIGI_STATE")
347
348                if not sigil:
349                    # alternatively, the JSON is here
350                    sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__")
351
352                if not sigil:
353                    if url not in retries or retries[url] < 3:
354                        if url not in retries:
355                            retries[url] = 0
356                        retries[url] += 1
357                        urls.append(url)
358                        self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url)
359                    else:
360                        failed += 1
361                        self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url)
362                    continue
363
364                try:
365                    if sigil.text:
366                        metadata = json.loads(sigil.text)
367                    elif sigil.contents and len(sigil.contents) > 0:
368                        metadata = json.loads(sigil.contents[0])
369                    else:
370                        failed += 1
371                        self.processor.dataset.log(
372                            "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
373                        continue
374                except json.JSONDecodeError:
375                    failed += 1
376                    self.processor.dataset.log(
377                        "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
378                    continue
379
380                for video in self.reformat_metadata(metadata):
381                    if video == self.VIDEO_NOT_FOUND:
382                        failed += 1
383                        self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping")
384                        continue
385
386                    if not video.get("stats") or video.get("createTime") == "0":
387                        # sometimes there are empty videos? which seems to
388                        # indicate a login wall
389
390                        self.processor.dataset.log(
391                            f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.")
392                        continue
393                    else:
394                        results.append(video)
395
396                    self.processor.dataset.update_status("Processed %s of %s TikTok URLs" %
397                                               ("{:,}".format(finished), "{:,}".format(num_urls)))
398                    self.processor.dataset.update_progress(finished / num_urls)
399
400        notes = []
401        if failed:
402            notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed))
403        if dupes:
404            notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes))
405
406        if notes:
407            self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See "
408                                       "dataset log for details." % ", ".join(notes))
409
410        return results
411
412    def reformat_metadata(self, metadata):
413        """
414        Take embedded JSON and yield one item per post
415
416        :param dict metadata: Metadata extracted from the TikTok video page
417        :return:  Yields one dictionary per video
418        """
419        # may need some extra parsing to find the item data...
420        if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]:
421            try:
422                video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"]
423            except KeyError as e:
424                if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]:
425                    yield self.VIDEO_NOT_FOUND
426                    return
427                else:
428                    raise e.__class__ from e
429
430            metadata = {"ItemModule": {
431                video["id"]: video
432            }}
433
434        if "ItemModule" in metadata:
435            for video_id, item in metadata["ItemModule"].items():
436                if "CommentItem" in metadata:
437                    comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id}
438                    if "UserModule" in metadata:
439                        for comment_id in list(comments.keys()):
440                            username = comments[comment_id]["user"]
441                            comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username,
442                                                                                                       username)
443                else:
444                    comments = {}
445
446                yield {**item, "comments": list(comments.values())}
447
448    async def download_videos(self, video_ids, staging_area, max_videos):
449        """
450        Download TikTok Videos
451
452        This is based on the TikTok downloader from https://jdownloader.org/
453        """
454        video_download_headers = {
455                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0",
456                "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5",
457                "Accept-Language": "en-US,en;q=0.5",
458                # "Range": "bytes=0-",
459                "Connection": "keep-alive",
460                "Referer": "https://www.tiktok.com/",
461                "Sec-Fetch-Dest": "video",
462                "Sec-Fetch-Mode": "no-cors",
463                "Sec-Fetch-Site": "cross-site",
464                "Accept-Encoding": "identity"
465            }
466        session = FuturesSession()
467
468        download_results = {}
469        downloaded_videos = 0
470        metadata_collected = 0
471        video_requests = {}
472        video_download_urls = []
473
474        while video_ids or video_download_urls or video_requests:
475            # give tasks time to run
476            await asyncio.sleep(0.1)
477
478            available_proxies = self.get_available_proxies()
479
480            for available_proxy in available_proxies:
481                if downloaded_videos > max_videos:
482                    # We're done here
483                    video_ids = []
484                    video_download_urls = []
485                    break
486
487                # Download videos (if available)
488                if video_download_urls:
489                    video_id, video_download_url = video_download_urls.pop(0)
490                    proxy = {"http": available_proxy,
491                             "https": available_proxy} if available_proxy != "__localhost__" else None
492                    session.headers.update(video_download_headers)
493                    video_requests[video_download_url] = {
494                        "request": session.get(video_download_url, proxies=proxy, timeout=30), # not using stream=True here, as it seems slower; possibly due to short video lengths
495                        "video_id": video_id,
496                        "type": "download",
497                    }
498                    self.proxy_map[available_proxy].update({
499                        "busy": True,
500                        "url": video_download_url
501                    })
502                # Collect video metadata (to find videos to download)
503                elif video_ids:
504                    video_id = video_ids.pop(0)
505                    url = f"https://www.tiktok.com/embed/v2/{video_id}"
506
507                    proxy = {"http": available_proxy,
508                             "https": available_proxy} if available_proxy != "__localhost__" else None
509                    session.headers.update(self.headers)
510                    video_requests[url] = {
511                        "request": session.get(url, proxies=proxy, timeout=30),
512                        "video_id": video_id,
513                        "type": "metadata",
514                    }
515                    self.proxy_map[available_proxy].update({
516                        "busy": True,
517                        "url": url
518                    })
519
520            # wait for async requests to end (after cancelling) before quitting
521            # the worker
522            if self.processor.interrupted:
523                for request in video_requests.values():
524                    request["request"].cancel()
525
526                max_timeout = time.time() + 20
527                while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout:
528                    await asyncio.sleep(0.5)
529
530                raise WorkerInterruptedException("Interrupted while downloading TikTok videos")
531
532            # Extract video download URLs
533            for url in list(video_requests.keys()):
534                video_id = video_requests[url]["video_id"]
535                request = video_requests[url]["request"]
536                request_type = video_requests[url]["type"]
537                request_metadata = {
538                    "success": False,
539                    "url": url,
540                    "error": None,
541                    "from_dataset": self.processor.source_dataset.key,
542                    "post_ids": [video_id],
543                }
544                if not request.done():
545                    continue
546
547                # Release proxy
548                self.release_proxy(url)
549
550                # Collect response
551                try:
552                    response = request.result()
553                except requests.exceptions.RequestException as e:
554                    error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})"
555                    request_metadata["error"] = error_message
556                    download_results[video_id] = request_metadata
557                    self.processor.dataset.log(error_message)
558                    continue
559                finally:
560                    del video_requests[url]
561
562                if response.status_code != 200:
563                    error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping."
564                    request_metadata["error"] = error_message
565                    download_results[video_id] = request_metadata
566                    self.processor.dataset.log(error_message)
567                    continue
568
569                if request_type == "metadata":
570                    # Collect Video Download URL
571                    soup = BeautifulSoup(response.text, "html.parser")
572                    json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__")
573                    video_metadata = None
574                    try:
575                        if json_source.text:
576                            video_metadata = json.loads(json_source.text)
577                        elif json_source.contents[0]:
578                            video_metadata = json.loads(json_source.contents[0])
579                    except json.JSONDecodeError as e:
580                        self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}")
581
582                    if not video_metadata:
583                        # Failed to collect metadata
584                        error_message = f"Failed to find metadata for video {video_id}"
585                        request_metadata["error"] = error_message
586                        download_results[video_id] = request_metadata
587                        self.processor.dataset.log(error_message)
588                        self.consecutive_failures += 1
589                        if self.consecutive_failures > 5:
590                            raise ProcessorException("Too many consecutive failures, stopping")
591                        continue
592
593                    try:
594                        url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0]
595                    except (KeyError, IndexError):
596                        error_message = f"vid: {video_id} - failed to find video download URL"
597                        request_metadata["error"] = error_message
598                        download_results[video_id] = request_metadata
599                        self.processor.dataset.log(error_message)
600                        self.processor.dataset.log(video_metadata["source"]["data"].values())
601                        continue
602                    
603                    # Add new download URL to be collected
604                    self.consecutive_failures = 0
605                    video_download_urls.append((video_id, url))
606                    metadata_collected += 1
607                    self.processor.dataset.update_status("Collected metadata for %i/%i videos" %
608                                                    (metadata_collected, max_videos))
609                    self.processor.dataset.update_progress(metadata_collected / max_videos)
610
611                elif request_type == "download":
612                    # Download video
613                    with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f:
614                        for chunk in response.iter_content(chunk_size=1024 * 1024):
615                            if chunk:
616                                f.write(chunk)
617                    request_metadata["success"] = True
618                    request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}]
619                    download_results[video_id] = request_metadata
620
621                    downloaded_videos += 1
622                    self.processor.dataset.update_status("Downloaded %i/%i videos" %
623                                                    (downloaded_videos, max_videos))
624                    self.processor.dataset.update_progress(downloaded_videos / max_videos)
625
626        return download_results
TikTokScraper(processor, config)
153    def __init__(self, processor, config):
154        """
155        :param Processor processor:  The processor using this function and needing updates
156        """
157        self.proxy_map = {}
158        self.processor = processor
Parameters
  • Processor processor: The processor using this function and needing updates
proxy_map = None
proxy_sleep = 1
headers = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'DNT': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0'}
last_proxy_update = 0
last_time_proxy_available = None
no_available_proxy_timeout = 600
consecutive_failures = 0
VIDEO_NOT_FOUND = 'oh no, sire, no video was found'
processor
def update_proxies(self):
160    def update_proxies(self):
161        """
162        Get proxies that are available
163
164        :return:
165        """
166        all_proxies = self.processor.config.get("tiktok-urls-search.proxies")
167        self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep)
168        if not all_proxies:
169            # no proxies? just request directly
170            all_proxies = ["__localhost__"]
171
172        for proxy in all_proxies:
173            if proxy in self.proxy_map:
174                continue
175            else:
176                self.proxy_map[proxy] = {
177                    "busy": False,
178                    "url": None,
179                    "next_request": 0
180                }
181
182        for proxy in list(self.proxy_map.keys()):
183            if proxy not in all_proxies:
184                del self.proxy_map[proxy]

Get proxies that are available

Returns
def get_available_proxies(self):
186    def get_available_proxies(self):
187        """
188        Collect proxies from proxy_map that are ready for new requests
189        """
190        # update proxies every 5 seconds so we can potentially update them
191        # while the scrape is running
192        if self.last_proxy_update < time.time():
193            self.update_proxies()
194            self.last_proxy_update = time.time() + 5
195
196        # find out whether there is any connection we can use to send the
197        # next request
198        available_proxies = [proxy for proxy in self.proxy_map if
199                             not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()]
200
201        if not available_proxies:
202            # No proxy available
203            if self.last_time_proxy_available is None:
204                # First run, possibly issue, but this will allow it to time out
205                self.processor.dataset.log("No available proxy found at start of request_metadata")
206                self.last_time_proxy_available = time.time()
207
208            if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time():
209                # No available proxies in timeout period
210                raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}")
211        else:
212            self.last_time_proxy_available = time.time()
213
214        return available_proxies

Collect proxies from proxy_map that are ready for new requests

def release_proxy(self, url):
216    def release_proxy(self, url):
217        """
218        Release a proxy to be used later
219        """
220        # Release proxy
221        used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url]
222        if used_proxy:
223            used_proxy = used_proxy[0]
224            self.proxy_map[used_proxy].update({
225                "busy": False,
226                "next_request": time.time() + self.proxy_sleep
227            })
228        else:
229            # TODO: why are we releasing a proxy without a URL?
230            self.processor.dataset.log(f"Unable to find and release proxy associated with {url}")
231            pass

Release a proxy to be used later

async def request_metadata(self, urls):
233    async def request_metadata(self, urls):
234        """
235        Request TikTok metadata for a list of URLs
236
237        Uses asyncio to request URLs concurrently if proxy servers are
238        available. Returns a list of metadata, one object per video.
239
240        :param list urls:  URLs to collect data for
241        :return list:  Metadata
242        """
243        session = FuturesSession()
244        session.headers.update(self.headers)
245        tiktok_requests = {}
246        finished = 0
247        num_urls = len(urls)
248        seen_urls = set()
249
250        results = []
251        failed = 0
252        dupes = 0
253        retries = {}
254
255        while urls or tiktok_requests:
256            # give tasks time to run
257            await asyncio.sleep(0.1)
258
259            available_proxies = self.get_available_proxies()
260
261            for available_proxy in available_proxies:
262                url = None
263                while urls and url is None:
264                    url = urls.pop(0)
265                    url = url.replace("https://", "http://")  # https is finicky, lots of blocks
266
267                    # Check if url already collected or should be retried
268                    if url in seen_urls and url not in retries:
269                        finished += 1
270                        dupes += 1
271                        self.processor.dataset.log("Skipping duplicate of %s" % url)
272                        url = None
273                        continue
274
275                    # Add url to be collected
276                    self.processor.dataset.log(f"Requesting: {url}")
277                    proxy = {"http": available_proxy,
278                             "https": available_proxy} if available_proxy != "__localhost__" else None
279                    tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30)
280                    seen_urls.add(url)
281                    self.proxy_map[available_proxy].update({
282                        "busy": True,
283                        "url": url
284                    })
285
286            # wait for async requests to end (after cancelling) before quitting
287            # the worker
288            if self.processor.interrupted:
289                for request in tiktok_requests.values():
290                    request.cancel()
291
292                max_timeout = time.time() + 20
293                while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout:
294                    await asyncio.sleep(0.5)
295
296                raise WorkerInterruptedException("Interrupted while fetching TikTok metadata")
297
298            # handle received data
299            for url in list(tiktok_requests.keys()):
300                request = tiktok_requests[url]
301                if not request.done():
302                    continue
303
304                finished += 1
305                self.release_proxy(url)
306
307                # handle the exceptions we know to expect - else just raise and
308                # log
309                exception = request.exception()
310                if exception:
311                    failed += 1
312                    if isinstance(exception, requests.exceptions.RequestException):
313                        self.processor.dataset.update_status(
314                            "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception))
315                    else:
316                        raise exception
317
318                # retry on requestexceptions
319                try:
320                    response = request.result()
321                except requests.exceptions.RequestException:
322                    if url not in retries or retries[url] < 3:
323                        if url not in retries:
324                            retries[url] = 0
325                        retries[url] += 1
326                        urls.append(url)
327                    continue
328                finally:
329                    del tiktok_requests[url]
330
331                # video may not exist
332                if response.status_code == 404:
333                    failed += 1
334                    self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url)
335                    continue
336
337                # haven't seen these in the wild - 403 or 429 might happen?
338                elif response.status_code != 200:
339                    failed += 1
340                    self.processor.dataset.update_status(
341                        "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url))
342                    continue
343
344                # now! try to extract the JSON from the page
345                soup = BeautifulSoup(response.text, "html.parser")
346                sigil = soup.select_one("script#SIGI_STATE")
347
348                if not sigil:
349                    # alternatively, the JSON is here
350                    sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__")
351
352                if not sigil:
353                    if url not in retries or retries[url] < 3:
354                        if url not in retries:
355                            retries[url] = 0
356                        retries[url] += 1
357                        urls.append(url)
358                        self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url)
359                    else:
360                        failed += 1
361                        self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url)
362                    continue
363
364                try:
365                    if sigil.text:
366                        metadata = json.loads(sigil.text)
367                    elif sigil.contents and len(sigil.contents) > 0:
368                        metadata = json.loads(sigil.contents[0])
369                    else:
370                        failed += 1
371                        self.processor.dataset.log(
372                            "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
373                        continue
374                except json.JSONDecodeError:
375                    failed += 1
376                    self.processor.dataset.log(
377                        "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url)
378                    continue
379
380                for video in self.reformat_metadata(metadata):
381                    if video == self.VIDEO_NOT_FOUND:
382                        failed += 1
383                        self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping")
384                        continue
385
386                    if not video.get("stats") or video.get("createTime") == "0":
387                        # sometimes there are empty videos? which seems to
388                        # indicate a login wall
389
390                        self.processor.dataset.log(
391                            f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.")
392                        continue
393                    else:
394                        results.append(video)
395
396                    self.processor.dataset.update_status("Processed %s of %s TikTok URLs" %
397                                               ("{:,}".format(finished), "{:,}".format(num_urls)))
398                    self.processor.dataset.update_progress(finished / num_urls)
399
400        notes = []
401        if failed:
402            notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed))
403        if dupes:
404            notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes))
405
406        if notes:
407            self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See "
408                                       "dataset log for details." % ", ".join(notes))
409
410        return results

Request TikTok metadata for a list of URLs

Uses asyncio to request URLs concurrently if proxy servers are available. Returns a list of metadata, one object per video.

Parameters
  • list urls: URLs to collect data for
Returns

Metadata

def reformat_metadata(self, metadata):
412    def reformat_metadata(self, metadata):
413        """
414        Take embedded JSON and yield one item per post
415
416        :param dict metadata: Metadata extracted from the TikTok video page
417        :return:  Yields one dictionary per video
418        """
419        # may need some extra parsing to find the item data...
420        if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]:
421            try:
422                video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"]
423            except KeyError as e:
424                if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]:
425                    yield self.VIDEO_NOT_FOUND
426                    return
427                else:
428                    raise e.__class__ from e
429
430            metadata = {"ItemModule": {
431                video["id"]: video
432            }}
433
434        if "ItemModule" in metadata:
435            for video_id, item in metadata["ItemModule"].items():
436                if "CommentItem" in metadata:
437                    comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id}
438                    if "UserModule" in metadata:
439                        for comment_id in list(comments.keys()):
440                            username = comments[comment_id]["user"]
441                            comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username,
442                                                                                                       username)
443                else:
444                    comments = {}
445
446                yield {**item, "comments": list(comments.values())}

Take embedded JSON and yield one item per post

Parameters
  • dict metadata: Metadata extracted from the TikTok video page
Returns

Yields one dictionary per video

async def download_videos(self, video_ids, staging_area, max_videos):
448    async def download_videos(self, video_ids, staging_area, max_videos):
449        """
450        Download TikTok Videos
451
452        This is based on the TikTok downloader from https://jdownloader.org/
453        """
454        video_download_headers = {
455                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0",
456                "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5",
457                "Accept-Language": "en-US,en;q=0.5",
458                # "Range": "bytes=0-",
459                "Connection": "keep-alive",
460                "Referer": "https://www.tiktok.com/",
461                "Sec-Fetch-Dest": "video",
462                "Sec-Fetch-Mode": "no-cors",
463                "Sec-Fetch-Site": "cross-site",
464                "Accept-Encoding": "identity"
465            }
466        session = FuturesSession()
467
468        download_results = {}
469        downloaded_videos = 0
470        metadata_collected = 0
471        video_requests = {}
472        video_download_urls = []
473
474        while video_ids or video_download_urls or video_requests:
475            # give tasks time to run
476            await asyncio.sleep(0.1)
477
478            available_proxies = self.get_available_proxies()
479
480            for available_proxy in available_proxies:
481                if downloaded_videos > max_videos:
482                    # We're done here
483                    video_ids = []
484                    video_download_urls = []
485                    break
486
487                # Download videos (if available)
488                if video_download_urls:
489                    video_id, video_download_url = video_download_urls.pop(0)
490                    proxy = {"http": available_proxy,
491                             "https": available_proxy} if available_proxy != "__localhost__" else None
492                    session.headers.update(video_download_headers)
493                    video_requests[video_download_url] = {
494                        "request": session.get(video_download_url, proxies=proxy, timeout=30), # not using stream=True here, as it seems slower; possibly due to short video lengths
495                        "video_id": video_id,
496                        "type": "download",
497                    }
498                    self.proxy_map[available_proxy].update({
499                        "busy": True,
500                        "url": video_download_url
501                    })
502                # Collect video metadata (to find videos to download)
503                elif video_ids:
504                    video_id = video_ids.pop(0)
505                    url = f"https://www.tiktok.com/embed/v2/{video_id}"
506
507                    proxy = {"http": available_proxy,
508                             "https": available_proxy} if available_proxy != "__localhost__" else None
509                    session.headers.update(self.headers)
510                    video_requests[url] = {
511                        "request": session.get(url, proxies=proxy, timeout=30),
512                        "video_id": video_id,
513                        "type": "metadata",
514                    }
515                    self.proxy_map[available_proxy].update({
516                        "busy": True,
517                        "url": url
518                    })
519
520            # wait for async requests to end (after cancelling) before quitting
521            # the worker
522            if self.processor.interrupted:
523                for request in video_requests.values():
524                    request["request"].cancel()
525
526                max_timeout = time.time() + 20
527                while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout:
528                    await asyncio.sleep(0.5)
529
530                raise WorkerInterruptedException("Interrupted while downloading TikTok videos")
531
532            # Extract video download URLs
533            for url in list(video_requests.keys()):
534                video_id = video_requests[url]["video_id"]
535                request = video_requests[url]["request"]
536                request_type = video_requests[url]["type"]
537                request_metadata = {
538                    "success": False,
539                    "url": url,
540                    "error": None,
541                    "from_dataset": self.processor.source_dataset.key,
542                    "post_ids": [video_id],
543                }
544                if not request.done():
545                    continue
546
547                # Release proxy
548                self.release_proxy(url)
549
550                # Collect response
551                try:
552                    response = request.result()
553                except requests.exceptions.RequestException as e:
554                    error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})"
555                    request_metadata["error"] = error_message
556                    download_results[video_id] = request_metadata
557                    self.processor.dataset.log(error_message)
558                    continue
559                finally:
560                    del video_requests[url]
561
562                if response.status_code != 200:
563                    error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping."
564                    request_metadata["error"] = error_message
565                    download_results[video_id] = request_metadata
566                    self.processor.dataset.log(error_message)
567                    continue
568
569                if request_type == "metadata":
570                    # Collect Video Download URL
571                    soup = BeautifulSoup(response.text, "html.parser")
572                    json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__")
573                    video_metadata = None
574                    try:
575                        if json_source.text:
576                            video_metadata = json.loads(json_source.text)
577                        elif json_source.contents[0]:
578                            video_metadata = json.loads(json_source.contents[0])
579                    except json.JSONDecodeError as e:
580                        self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}")
581
582                    if not video_metadata:
583                        # Failed to collect metadata
584                        error_message = f"Failed to find metadata for video {video_id}"
585                        request_metadata["error"] = error_message
586                        download_results[video_id] = request_metadata
587                        self.processor.dataset.log(error_message)
588                        self.consecutive_failures += 1
589                        if self.consecutive_failures > 5:
590                            raise ProcessorException("Too many consecutive failures, stopping")
591                        continue
592
593                    try:
594                        url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0]
595                    except (KeyError, IndexError):
596                        error_message = f"vid: {video_id} - failed to find video download URL"
597                        request_metadata["error"] = error_message
598                        download_results[video_id] = request_metadata
599                        self.processor.dataset.log(error_message)
600                        self.processor.dataset.log(video_metadata["source"]["data"].values())
601                        continue
602                    
603                    # Add new download URL to be collected
604                    self.consecutive_failures = 0
605                    video_download_urls.append((video_id, url))
606                    metadata_collected += 1
607                    self.processor.dataset.update_status("Collected metadata for %i/%i videos" %
608                                                    (metadata_collected, max_videos))
609                    self.processor.dataset.update_progress(metadata_collected / max_videos)
610
611                elif request_type == "download":
612                    # Download video
613                    with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f:
614                        for chunk in response.iter_content(chunk_size=1024 * 1024):
615                            if chunk:
616                                f.write(chunk)
617                    request_metadata["success"] = True
618                    request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}]
619                    download_results[video_id] = request_metadata
620
621                    downloaded_videos += 1
622                    self.processor.dataset.update_status("Downloaded %i/%i videos" %
623                                                    (downloaded_videos, max_videos))
624                    self.processor.dataset.update_progress(downloaded_videos / max_videos)
625
626        return download_results

Download TikTok Videos

This is based on the TikTok downloader from https://jdownloader.org/