datasources.tiktok_urls.search_tiktok_urls
Import scraped TikTok data
It's prohibitively difficult to scrape data from TikTok within 4CAT itself due to its aggressive rate limiting. Instead, import data collected elsewhere.
1""" 2Import scraped TikTok data 3 4It's prohibitively difficult to scrape data from TikTok within 4CAT itself due 5to its aggressive rate limiting. Instead, import data collected elsewhere. 6""" 7import requests 8import asyncio 9import time 10import json 11import re 12 13from requests_futures.sessions import FuturesSession 14from bs4 import BeautifulSoup 15 16from backend.lib.search import Search 17from common.lib.helpers import UserInput 18from common.lib.exceptions import WorkerInterruptedException, QueryParametersException, ProcessorException 19from datasources.tiktok.search_tiktok import SearchTikTok as SearchTikTokByImport 20from common.config_manager import config 21 22class SearchTikTokByID(Search): 23 """ 24 Import scraped TikTok data 25 """ 26 type = "tiktok-urls-search" # job ID 27 category = "Search" # category 28 title = "Search TikTok by post URL" # title displayed in UI 29 description = "Retrieve metadata for TikTok post URLs." # description displayed in UI 30 extension = "ndjson" # extension of result file, used internally and in UI 31 is_local = False # Whether this datasource is locally scraped 32 is_static = False # Whether this datasource is still updated 33 34 # not available as a processor for existing datasets 35 accepts = [None] 36 37 config = { 38 "tiktok-urls-search.proxies": { 39 "type": UserInput.OPTION_TEXT_JSON, 40 "default": [], 41 "help": "Proxies for TikTok data collection" 42 }, 43 "tiktok-urls-search.proxies.wait": { 44 "type": UserInput.OPTION_TEXT, 45 "coerce_type": float, 46 "default": 1.0, 47 "help": "Request wait", 48 "tooltip": "Time to wait before sending a new request from the same IP" 49 } 50 } 51 52 options = { 53 "intro": { 54 "type": UserInput.OPTION_INFO, 55 "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those " 56 "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from " 57 "each post's page in the browser interface " 58 "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of " 59 "details about the post itself such as likes, tags and stickers. Note that some of the metadata is " 60 "only directly available when downloading the results as an .ndjson file." 61 }, 62 "urls": { 63 "type": UserInput.OPTION_TEXT_LARGE, 64 "help": "Post URLs", 65 "tooltip": "Separate by commas or new lines." 66 } 67 } 68 69 def get_items(self, query): 70 """ 71 Retrieve metadata for TikTok URLs 72 73 :param dict query: Search query parameters 74 """ 75 tiktok_scraper = TikTokScraper(processor=self, config=self.config) 76 loop = asyncio.new_event_loop() 77 return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(","))) 78 79 @staticmethod 80 def validate_query(query, request, user): 81 """ 82 Validate TikTok query 83 84 :param dict query: Query parameters, from client-side. 85 :param request: Flask request 86 :param User user: User object of user who has submitted the query 87 :return dict: Safe query parameters 88 """ 89 # reformat queries to be a comma-separated list with no wrapping 90 # whitespace 91 whitespace = re.compile(r"\s+") 92 items = whitespace.sub("", query.get("urls").replace("\n", ",")) 93 94 sanitized_items = [] 95 # handle telegram URLs 96 for item in str(items).split(","): 97 if not item.strip(): 98 continue 99 100 if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \ 101 not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \ 102 not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item): 103 raise QueryParametersException("'%s' is not a valid TikTok video URL" % item) 104 105 sanitized_items.append(item) 106 107 # no query 4 u 108 if not sanitized_items: 109 raise QueryParametersException("You must provide at least one valid TikTok video URL.") 110 111 # simple! 112 return { 113 "urls": ",".join(sanitized_items) 114 } 115 116 @staticmethod 117 def map_item(item): 118 """ 119 Analogous to the other TikTok data source 120 121 :param item: 122 :return: 123 """ 124 return SearchTikTokByImport.map_item(item) 125 126 127class TikTokScraper: 128 proxy_map = None 129 proxy_sleep = 1 130 headers = { 131 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", 132 "Accept-Encoding": "gzip, deflate", 133 "Accept-Language": "en-US,en;q=0.5", 134 "Connection": "keep-alive", 135 "DNT": "1", 136 "Sec-Fetch-Dest": "document", 137 "Sec-Fetch-Mode": "navigate", 138 "Sec-Fetch-Site": "none", 139 "Sec-Fetch-User": "?1", 140 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0" 141 } 142 last_proxy_update = 0 143 last_time_proxy_available = None 144 no_available_proxy_timeout = 600 145 146 VIDEO_NOT_FOUND = "oh no, sire, no video was found" 147 148 def __init__(self, processor, config): 149 """ 150 :param Processor processor: The processor using this function and needing updates 151 """ 152 self.proxy_map = {} 153 self.processor = processor 154 155 def update_proxies(self): 156 """ 157 Get proxies that are available 158 159 :return: 160 """ 161 all_proxies = self.processor.config.get("tiktok-urls-search.proxies") 162 self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep) 163 if not all_proxies: 164 # no proxies? just request directly 165 all_proxies = ["__localhost__"] 166 167 for proxy in all_proxies: 168 if proxy in self.proxy_map: 169 continue 170 else: 171 self.proxy_map[proxy] = { 172 "busy": False, 173 "url": None, 174 "next_request": 0 175 } 176 177 for proxy in list(self.proxy_map.keys()): 178 if proxy not in all_proxies: 179 del self.proxy_map[proxy] 180 181 def get_available_proxies(self): 182 """ 183 Collect proxies from proxy_map that are ready for new requests 184 """ 185 # update proxies every 5 seconds so we can potentially update them 186 # while the scrape is running 187 if self.last_proxy_update < time.time(): 188 self.update_proxies() 189 self.last_proxy_update = time.time() + 5 190 191 # find out whether there is any connection we can use to send the 192 # next request 193 available_proxies = [proxy for proxy in self.proxy_map if 194 not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()] 195 196 if not available_proxies: 197 # No proxy available 198 if self.last_time_proxy_available is None: 199 # First run, possibly issue, but this will allow it to time out 200 self.processor.dataset.log("No available proxy found at start of request_metadata") 201 self.last_time_proxy_available = time.time() 202 203 if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time(): 204 # No available proxies in timeout period 205 raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}") 206 else: 207 self.last_time_proxy_available = time.time() 208 209 return available_proxies 210 211 def release_proxy(self, url): 212 """ 213 Release a proxy to be used later 214 """ 215 # Release proxy 216 used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url] 217 if used_proxy: 218 used_proxy = used_proxy[0] 219 self.proxy_map[used_proxy].update({ 220 "busy": False, 221 "next_request": time.time() + self.proxy_sleep 222 }) 223 else: 224 # TODO: why are we releasing a proxy without a URL? 225 self.processor.dataset.log(f"Unable to find and release proxy associated with {url}") 226 pass 227 228 async def request_metadata(self, urls): 229 """ 230 Request TikTok metadata for a list of URLs 231 232 Uses asyncio to request URLs concurrently if proxy servers are 233 available. Returns a list of metadata, one object per video. 234 235 :param list urls: URLs to collect data for 236 :return list: Metadata 237 """ 238 session = FuturesSession() 239 session.headers.update(self.headers) 240 tiktok_requests = {} 241 finished = 0 242 num_urls = len(urls) 243 seen_urls = set() 244 245 results = [] 246 failed = 0 247 dupes = 0 248 retries = {} 249 250 while urls or tiktok_requests: 251 # give tasks time to run 252 await asyncio.sleep(0.1) 253 254 available_proxies = self.get_available_proxies() 255 256 for available_proxy in available_proxies: 257 url = None 258 while urls and url is None: 259 url = urls.pop(0) 260 url = url.replace("https://", "http://") # https is finicky, lots of blocks 261 262 # Check if url already collected or should be retried 263 if url in seen_urls and url not in retries: 264 finished += 1 265 dupes += 1 266 self.processor.dataset.log("Skipping duplicate of %s" % url) 267 url = None 268 continue 269 270 # Add url to be collected 271 self.processor.dataset.log(f"Requesting: {url}") 272 proxy = {"http": available_proxy, 273 "https": available_proxy} if available_proxy != "__localhost__" else None 274 tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30) 275 seen_urls.add(url) 276 self.proxy_map[available_proxy].update({ 277 "busy": True, 278 "url": url 279 }) 280 281 # wait for async requests to end (after cancelling) before quitting 282 # the worker 283 if self.processor.interrupted: 284 for request in tiktok_requests.values(): 285 request.cancel() 286 287 max_timeout = time.time() + 20 288 while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout: 289 await asyncio.sleep(0.5) 290 291 raise WorkerInterruptedException("Interrupted while fetching TikTok metadata") 292 293 # handle received data 294 for url in list(tiktok_requests.keys()): 295 request = tiktok_requests[url] 296 if not request.done(): 297 continue 298 299 finished += 1 300 self.release_proxy(url) 301 302 # handle the exceptions we know to expect - else just raise and 303 # log 304 exception = request.exception() 305 if exception: 306 failed += 1 307 if isinstance(exception, requests.exceptions.RequestException): 308 self.processor.dataset.update_status( 309 "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception)) 310 else: 311 raise exception 312 313 # retry on requestexceptions 314 try: 315 response = request.result() 316 except requests.exceptions.RequestException: 317 if url not in retries or retries[url] < 3: 318 if url not in retries: 319 retries[url] = 0 320 retries[url] += 1 321 urls.append(url) 322 continue 323 finally: 324 del tiktok_requests[url] 325 326 # video may not exist 327 if response.status_code == 404: 328 failed += 1 329 self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url) 330 skip_to_next = True 331 continue 332 333 # haven't seen these in the wild - 403 or 429 might happen? 334 elif response.status_code != 200: 335 failed += 1 336 self.processor.dataset.update_status( 337 "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url)) 338 continue 339 340 # now! try to extract the JSON from the page 341 soup = BeautifulSoup(response.text, "html.parser") 342 sigil = soup.select_one("script#SIGI_STATE") 343 344 if not sigil: 345 # alternatively, the JSON is here 346 sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__") 347 348 if not sigil: 349 if url not in retries or retries[url] < 3: 350 if url not in retries: 351 retries[url] = 0 352 retries[url] += 1 353 urls.append(url) 354 self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url) 355 else: 356 failed += 1 357 self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url) 358 continue 359 360 try: 361 if sigil.text: 362 metadata = json.loads(sigil.text) 363 elif sigil.contents and len(sigil.contents) > 0: 364 metadata = json.loads(sigil.contents[0]) 365 else: 366 failed += 1 367 self.processor.dataset.log( 368 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 369 continue 370 except json.JSONDecodeError: 371 failed += 1 372 self.processor.dataset.log( 373 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 374 continue 375 376 for video in self.reformat_metadata(metadata): 377 if video == self.VIDEO_NOT_FOUND: 378 failed += 1 379 self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping") 380 continue 381 382 if not video.get("stats") or video.get("createTime") == "0": 383 # sometimes there are empty videos? which seems to 384 # indicate a login wall 385 386 self.processor.dataset.log( 387 f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.") 388 continue 389 else: 390 results.append(video) 391 392 self.processor.dataset.update_status("Processed %s of %s TikTok URLs" % 393 ("{:,}".format(finished), "{:,}".format(num_urls))) 394 self.processor.dataset.update_progress(finished / num_urls) 395 396 notes = [] 397 if failed: 398 notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed)) 399 if dupes: 400 notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes)) 401 402 if notes: 403 self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See " 404 "dataset log for details." % ", ".join(notes)) 405 406 return results 407 408 def reformat_metadata(self, metadata): 409 """ 410 Take embedded JSON and yield one item per post 411 412 :param dict metadata: Metadata extracted from the TikTok video page 413 :return: Yields one dictionary per video 414 """ 415 # may need some extra parsing to find the item data... 416 if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]: 417 try: 418 video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"] 419 except KeyError as e: 420 if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]: 421 yield self.VIDEO_NOT_FOUND 422 return 423 else: 424 raise e.__class__ from e 425 426 metadata = {"ItemModule": { 427 video["id"]: video 428 }} 429 430 if "ItemModule" in metadata: 431 for video_id, item in metadata["ItemModule"].items(): 432 if "CommentItem" in metadata: 433 comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id} 434 if "UserModule" in metadata: 435 for comment_id in list(comments.keys()): 436 username = comments[comment_id]["user"] 437 comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username, 438 username) 439 else: 440 comments = {} 441 442 yield {**item, "comments": list(comments.values())} 443 444 async def download_videos(self, video_ids, staging_area, max_videos): 445 """ 446 Download TikTok Videos 447 448 This is based on the TikTok downloader from https://jdownloader.org/ 449 """ 450 video_download_headers = { 451 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0", 452 "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5", 453 "Accept-Language": "en-US,en;q=0.5", 454 # "Range": "bytes=0-", 455 "Connection": "keep-alive", 456 "Referer": "https://www.tiktok.com/", 457 "Sec-Fetch-Dest": "video", 458 "Sec-Fetch-Mode": "no-cors", 459 "Sec-Fetch-Site": "cross-site", 460 "Accept-Encoding": "identity" 461 } 462 session = FuturesSession() 463 464 download_results = {} 465 downloaded_videos = 0 466 metadata_collected = 0 467 video_requests = {} 468 video_download_urls = [] 469 470 while video_ids or video_download_urls or video_requests: 471 # give tasks time to run 472 await asyncio.sleep(0.1) 473 474 available_proxies = self.get_available_proxies() 475 476 for available_proxy in available_proxies: 477 if downloaded_videos > max_videos: 478 # We're done here 479 video_ids = [] 480 video_download_urls = [] 481 break 482 483 # Download videos (if available) 484 if video_download_urls: 485 video_id, video_download_url = video_download_urls.pop(0) 486 proxy = {"http": available_proxy, 487 "https": available_proxy} if available_proxy != "__localhost__" else None 488 session.headers.update(video_download_headers) 489 video_requests[video_download_url] = { 490 "request": session.get(video_download_url, proxies=proxy, timeout=30), 491 "video_id": video_id, 492 "type": "download", 493 } 494 self.proxy_map[available_proxy].update({ 495 "busy": True, 496 "url": video_download_url 497 }) 498 # Collect video metadata (to find videos to download) 499 elif video_ids: 500 video_id = video_ids.pop(0) 501 url = f"https://www.tiktok.com/embed/v2/{video_id}" 502 503 proxy = {"http": available_proxy, 504 "https": available_proxy} if available_proxy != "__localhost__" else None 505 session.headers.update(self.headers) 506 video_requests[url] = { 507 "request": session.get(url, proxies=proxy, timeout=30), 508 "video_id": video_id, 509 "type": "metadata", 510 } 511 self.proxy_map[available_proxy].update({ 512 "busy": True, 513 "url": url 514 }) 515 516 # wait for async requests to end (after cancelling) before quitting 517 # the worker 518 if self.processor.interrupted: 519 for request in video_requests.values(): 520 request["request"].cancel() 521 522 max_timeout = time.time() + 20 523 while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout: 524 await asyncio.sleep(0.5) 525 526 raise WorkerInterruptedException("Interrupted while downloading TikTok videos") 527 528 # Extract video download URLs 529 for url in list(video_requests.keys()): 530 video_id = video_requests[url]["video_id"] 531 request = video_requests[url]["request"] 532 request_type = video_requests[url]["type"] 533 request_metadata = { 534 "success": False, 535 "url": url, 536 "error": None, 537 "from_dataset": self.processor.source_dataset.key, 538 "post_ids": [video_id], 539 } 540 if not request.done(): 541 continue 542 543 # Release proxy 544 self.release_proxy(url) 545 546 # Collect response 547 try: 548 response = request.result() 549 except requests.exceptions.RequestException as e: 550 error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})" 551 request_metadata["error"] = error_message 552 download_results[video_id] = request_metadata 553 self.processor.dataset.log(error_message) 554 continue 555 finally: 556 del video_requests[url] 557 558 if response.status_code != 200: 559 error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping." 560 request_metadata["error"] = error_message 561 download_results[video_id] = request_metadata 562 self.processor.dataset.log(error_message) 563 continue 564 565 if request_type == "metadata": 566 # Collect Video Download URL 567 soup = BeautifulSoup(response.text, "html.parser") 568 json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__") 569 video_metadata = None 570 try: 571 if json_source.text: 572 video_metadata = json.loads(json_source.text) 573 elif json_source.contents[0]: 574 video_metadata = json.loads(json_source.contents[0]) 575 except json.JSONDecodeError as e: 576 self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}") 577 578 if not video_metadata: 579 # Failed to collect metadata 580 error_message = f"Failed to find metadata for video {video_id}" 581 request_metadata["error"] = error_message 582 download_results[video_id] = request_metadata 583 self.processor.dataset.log(error_message) 584 continue 585 586 try: 587 url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0] 588 except (KeyError, IndexError): 589 error_message = f"vid: {video_id} - failed to find video download URL" 590 request_metadata["error"] = error_message 591 download_results[video_id] = request_metadata 592 self.processor.dataset.log(error_message) 593 self.processor.dataset.log(video_metadata["source"]["data"].values()) 594 continue 595 596 # Add new download URL to be collected 597 video_download_urls.append((video_id, url)) 598 metadata_collected += 1 599 self.processor.dataset.update_status("Collected metadata for %i/%i videos" % 600 (metadata_collected, max_videos)) 601 self.processor.dataset.update_progress(metadata_collected / max_videos) 602 603 elif request_type == "download": 604 # Download video 605 with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f: 606 for chunk in response.iter_content(chunk_size=1024 * 1024): 607 if chunk: 608 f.write(chunk) 609 request_metadata["success"] = True 610 request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}] 611 download_results[video_id] = request_metadata 612 613 downloaded_videos += 1 614 self.processor.dataset.update_status("Downloaded %i/%i videos" % 615 (downloaded_videos, max_videos)) 616 self.processor.dataset.update_progress(downloaded_videos / max_videos) 617 618 return download_results
23class SearchTikTokByID(Search): 24 """ 25 Import scraped TikTok data 26 """ 27 type = "tiktok-urls-search" # job ID 28 category = "Search" # category 29 title = "Search TikTok by post URL" # title displayed in UI 30 description = "Retrieve metadata for TikTok post URLs." # description displayed in UI 31 extension = "ndjson" # extension of result file, used internally and in UI 32 is_local = False # Whether this datasource is locally scraped 33 is_static = False # Whether this datasource is still updated 34 35 # not available as a processor for existing datasets 36 accepts = [None] 37 38 config = { 39 "tiktok-urls-search.proxies": { 40 "type": UserInput.OPTION_TEXT_JSON, 41 "default": [], 42 "help": "Proxies for TikTok data collection" 43 }, 44 "tiktok-urls-search.proxies.wait": { 45 "type": UserInput.OPTION_TEXT, 46 "coerce_type": float, 47 "default": 1.0, 48 "help": "Request wait", 49 "tooltip": "Time to wait before sending a new request from the same IP" 50 } 51 } 52 53 options = { 54 "intro": { 55 "type": UserInput.OPTION_INFO, 56 "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those " 57 "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from " 58 "each post's page in the browser interface " 59 "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of " 60 "details about the post itself such as likes, tags and stickers. Note that some of the metadata is " 61 "only directly available when downloading the results as an .ndjson file." 62 }, 63 "urls": { 64 "type": UserInput.OPTION_TEXT_LARGE, 65 "help": "Post URLs", 66 "tooltip": "Separate by commas or new lines." 67 } 68 } 69 70 def get_items(self, query): 71 """ 72 Retrieve metadata for TikTok URLs 73 74 :param dict query: Search query parameters 75 """ 76 tiktok_scraper = TikTokScraper(processor=self, config=self.config) 77 loop = asyncio.new_event_loop() 78 return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(","))) 79 80 @staticmethod 81 def validate_query(query, request, user): 82 """ 83 Validate TikTok query 84 85 :param dict query: Query parameters, from client-side. 86 :param request: Flask request 87 :param User user: User object of user who has submitted the query 88 :return dict: Safe query parameters 89 """ 90 # reformat queries to be a comma-separated list with no wrapping 91 # whitespace 92 whitespace = re.compile(r"\s+") 93 items = whitespace.sub("", query.get("urls").replace("\n", ",")) 94 95 sanitized_items = [] 96 # handle telegram URLs 97 for item in str(items).split(","): 98 if not item.strip(): 99 continue 100 101 if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \ 102 not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \ 103 not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item): 104 raise QueryParametersException("'%s' is not a valid TikTok video URL" % item) 105 106 sanitized_items.append(item) 107 108 # no query 4 u 109 if not sanitized_items: 110 raise QueryParametersException("You must provide at least one valid TikTok video URL.") 111 112 # simple! 113 return { 114 "urls": ",".join(sanitized_items) 115 } 116 117 @staticmethod 118 def map_item(item): 119 """ 120 Analogous to the other TikTok data source 121 122 :param item: 123 :return: 124 """ 125 return SearchTikTokByImport.map_item(item)
Import scraped TikTok data
70 def get_items(self, query): 71 """ 72 Retrieve metadata for TikTok URLs 73 74 :param dict query: Search query parameters 75 """ 76 tiktok_scraper = TikTokScraper(processor=self, config=self.config) 77 loop = asyncio.new_event_loop() 78 return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))
Retrieve metadata for TikTok URLs
Parameters
- dict query: Search query parameters
80 @staticmethod 81 def validate_query(query, request, user): 82 """ 83 Validate TikTok query 84 85 :param dict query: Query parameters, from client-side. 86 :param request: Flask request 87 :param User user: User object of user who has submitted the query 88 :return dict: Safe query parameters 89 """ 90 # reformat queries to be a comma-separated list with no wrapping 91 # whitespace 92 whitespace = re.compile(r"\s+") 93 items = whitespace.sub("", query.get("urls").replace("\n", ",")) 94 95 sanitized_items = [] 96 # handle telegram URLs 97 for item in str(items).split(","): 98 if not item.strip(): 99 continue 100 101 if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \ 102 not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \ 103 not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item): 104 raise QueryParametersException("'%s' is not a valid TikTok video URL" % item) 105 106 sanitized_items.append(item) 107 108 # no query 4 u 109 if not sanitized_items: 110 raise QueryParametersException("You must provide at least one valid TikTok video URL.") 111 112 # simple! 113 return { 114 "urls": ",".join(sanitized_items) 115 }
Validate TikTok query
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
117 @staticmethod 118 def map_item(item): 119 """ 120 Analogous to the other TikTok data source 121 122 :param item: 123 :return: 124 """ 125 return SearchTikTokByImport.map_item(item)
Analogous to the other TikTok data source
Parameters
- item:
Returns
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor
128class TikTokScraper: 129 proxy_map = None 130 proxy_sleep = 1 131 headers = { 132 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", 133 "Accept-Encoding": "gzip, deflate", 134 "Accept-Language": "en-US,en;q=0.5", 135 "Connection": "keep-alive", 136 "DNT": "1", 137 "Sec-Fetch-Dest": "document", 138 "Sec-Fetch-Mode": "navigate", 139 "Sec-Fetch-Site": "none", 140 "Sec-Fetch-User": "?1", 141 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0" 142 } 143 last_proxy_update = 0 144 last_time_proxy_available = None 145 no_available_proxy_timeout = 600 146 147 VIDEO_NOT_FOUND = "oh no, sire, no video was found" 148 149 def __init__(self, processor, config): 150 """ 151 :param Processor processor: The processor using this function and needing updates 152 """ 153 self.proxy_map = {} 154 self.processor = processor 155 156 def update_proxies(self): 157 """ 158 Get proxies that are available 159 160 :return: 161 """ 162 all_proxies = self.processor.config.get("tiktok-urls-search.proxies") 163 self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep) 164 if not all_proxies: 165 # no proxies? just request directly 166 all_proxies = ["__localhost__"] 167 168 for proxy in all_proxies: 169 if proxy in self.proxy_map: 170 continue 171 else: 172 self.proxy_map[proxy] = { 173 "busy": False, 174 "url": None, 175 "next_request": 0 176 } 177 178 for proxy in list(self.proxy_map.keys()): 179 if proxy not in all_proxies: 180 del self.proxy_map[proxy] 181 182 def get_available_proxies(self): 183 """ 184 Collect proxies from proxy_map that are ready for new requests 185 """ 186 # update proxies every 5 seconds so we can potentially update them 187 # while the scrape is running 188 if self.last_proxy_update < time.time(): 189 self.update_proxies() 190 self.last_proxy_update = time.time() + 5 191 192 # find out whether there is any connection we can use to send the 193 # next request 194 available_proxies = [proxy for proxy in self.proxy_map if 195 not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()] 196 197 if not available_proxies: 198 # No proxy available 199 if self.last_time_proxy_available is None: 200 # First run, possibly issue, but this will allow it to time out 201 self.processor.dataset.log("No available proxy found at start of request_metadata") 202 self.last_time_proxy_available = time.time() 203 204 if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time(): 205 # No available proxies in timeout period 206 raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}") 207 else: 208 self.last_time_proxy_available = time.time() 209 210 return available_proxies 211 212 def release_proxy(self, url): 213 """ 214 Release a proxy to be used later 215 """ 216 # Release proxy 217 used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url] 218 if used_proxy: 219 used_proxy = used_proxy[0] 220 self.proxy_map[used_proxy].update({ 221 "busy": False, 222 "next_request": time.time() + self.proxy_sleep 223 }) 224 else: 225 # TODO: why are we releasing a proxy without a URL? 226 self.processor.dataset.log(f"Unable to find and release proxy associated with {url}") 227 pass 228 229 async def request_metadata(self, urls): 230 """ 231 Request TikTok metadata for a list of URLs 232 233 Uses asyncio to request URLs concurrently if proxy servers are 234 available. Returns a list of metadata, one object per video. 235 236 :param list urls: URLs to collect data for 237 :return list: Metadata 238 """ 239 session = FuturesSession() 240 session.headers.update(self.headers) 241 tiktok_requests = {} 242 finished = 0 243 num_urls = len(urls) 244 seen_urls = set() 245 246 results = [] 247 failed = 0 248 dupes = 0 249 retries = {} 250 251 while urls or tiktok_requests: 252 # give tasks time to run 253 await asyncio.sleep(0.1) 254 255 available_proxies = self.get_available_proxies() 256 257 for available_proxy in available_proxies: 258 url = None 259 while urls and url is None: 260 url = urls.pop(0) 261 url = url.replace("https://", "http://") # https is finicky, lots of blocks 262 263 # Check if url already collected or should be retried 264 if url in seen_urls and url not in retries: 265 finished += 1 266 dupes += 1 267 self.processor.dataset.log("Skipping duplicate of %s" % url) 268 url = None 269 continue 270 271 # Add url to be collected 272 self.processor.dataset.log(f"Requesting: {url}") 273 proxy = {"http": available_proxy, 274 "https": available_proxy} if available_proxy != "__localhost__" else None 275 tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30) 276 seen_urls.add(url) 277 self.proxy_map[available_proxy].update({ 278 "busy": True, 279 "url": url 280 }) 281 282 # wait for async requests to end (after cancelling) before quitting 283 # the worker 284 if self.processor.interrupted: 285 for request in tiktok_requests.values(): 286 request.cancel() 287 288 max_timeout = time.time() + 20 289 while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout: 290 await asyncio.sleep(0.5) 291 292 raise WorkerInterruptedException("Interrupted while fetching TikTok metadata") 293 294 # handle received data 295 for url in list(tiktok_requests.keys()): 296 request = tiktok_requests[url] 297 if not request.done(): 298 continue 299 300 finished += 1 301 self.release_proxy(url) 302 303 # handle the exceptions we know to expect - else just raise and 304 # log 305 exception = request.exception() 306 if exception: 307 failed += 1 308 if isinstance(exception, requests.exceptions.RequestException): 309 self.processor.dataset.update_status( 310 "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception)) 311 else: 312 raise exception 313 314 # retry on requestexceptions 315 try: 316 response = request.result() 317 except requests.exceptions.RequestException: 318 if url not in retries or retries[url] < 3: 319 if url not in retries: 320 retries[url] = 0 321 retries[url] += 1 322 urls.append(url) 323 continue 324 finally: 325 del tiktok_requests[url] 326 327 # video may not exist 328 if response.status_code == 404: 329 failed += 1 330 self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url) 331 skip_to_next = True 332 continue 333 334 # haven't seen these in the wild - 403 or 429 might happen? 335 elif response.status_code != 200: 336 failed += 1 337 self.processor.dataset.update_status( 338 "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url)) 339 continue 340 341 # now! try to extract the JSON from the page 342 soup = BeautifulSoup(response.text, "html.parser") 343 sigil = soup.select_one("script#SIGI_STATE") 344 345 if not sigil: 346 # alternatively, the JSON is here 347 sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__") 348 349 if not sigil: 350 if url not in retries or retries[url] < 3: 351 if url not in retries: 352 retries[url] = 0 353 retries[url] += 1 354 urls.append(url) 355 self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url) 356 else: 357 failed += 1 358 self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url) 359 continue 360 361 try: 362 if sigil.text: 363 metadata = json.loads(sigil.text) 364 elif sigil.contents and len(sigil.contents) > 0: 365 metadata = json.loads(sigil.contents[0]) 366 else: 367 failed += 1 368 self.processor.dataset.log( 369 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 370 continue 371 except json.JSONDecodeError: 372 failed += 1 373 self.processor.dataset.log( 374 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 375 continue 376 377 for video in self.reformat_metadata(metadata): 378 if video == self.VIDEO_NOT_FOUND: 379 failed += 1 380 self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping") 381 continue 382 383 if not video.get("stats") or video.get("createTime") == "0": 384 # sometimes there are empty videos? which seems to 385 # indicate a login wall 386 387 self.processor.dataset.log( 388 f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.") 389 continue 390 else: 391 results.append(video) 392 393 self.processor.dataset.update_status("Processed %s of %s TikTok URLs" % 394 ("{:,}".format(finished), "{:,}".format(num_urls))) 395 self.processor.dataset.update_progress(finished / num_urls) 396 397 notes = [] 398 if failed: 399 notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed)) 400 if dupes: 401 notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes)) 402 403 if notes: 404 self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See " 405 "dataset log for details." % ", ".join(notes)) 406 407 return results 408 409 def reformat_metadata(self, metadata): 410 """ 411 Take embedded JSON and yield one item per post 412 413 :param dict metadata: Metadata extracted from the TikTok video page 414 :return: Yields one dictionary per video 415 """ 416 # may need some extra parsing to find the item data... 417 if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]: 418 try: 419 video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"] 420 except KeyError as e: 421 if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]: 422 yield self.VIDEO_NOT_FOUND 423 return 424 else: 425 raise e.__class__ from e 426 427 metadata = {"ItemModule": { 428 video["id"]: video 429 }} 430 431 if "ItemModule" in metadata: 432 for video_id, item in metadata["ItemModule"].items(): 433 if "CommentItem" in metadata: 434 comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id} 435 if "UserModule" in metadata: 436 for comment_id in list(comments.keys()): 437 username = comments[comment_id]["user"] 438 comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username, 439 username) 440 else: 441 comments = {} 442 443 yield {**item, "comments": list(comments.values())} 444 445 async def download_videos(self, video_ids, staging_area, max_videos): 446 """ 447 Download TikTok Videos 448 449 This is based on the TikTok downloader from https://jdownloader.org/ 450 """ 451 video_download_headers = { 452 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0", 453 "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5", 454 "Accept-Language": "en-US,en;q=0.5", 455 # "Range": "bytes=0-", 456 "Connection": "keep-alive", 457 "Referer": "https://www.tiktok.com/", 458 "Sec-Fetch-Dest": "video", 459 "Sec-Fetch-Mode": "no-cors", 460 "Sec-Fetch-Site": "cross-site", 461 "Accept-Encoding": "identity" 462 } 463 session = FuturesSession() 464 465 download_results = {} 466 downloaded_videos = 0 467 metadata_collected = 0 468 video_requests = {} 469 video_download_urls = [] 470 471 while video_ids or video_download_urls or video_requests: 472 # give tasks time to run 473 await asyncio.sleep(0.1) 474 475 available_proxies = self.get_available_proxies() 476 477 for available_proxy in available_proxies: 478 if downloaded_videos > max_videos: 479 # We're done here 480 video_ids = [] 481 video_download_urls = [] 482 break 483 484 # Download videos (if available) 485 if video_download_urls: 486 video_id, video_download_url = video_download_urls.pop(0) 487 proxy = {"http": available_proxy, 488 "https": available_proxy} if available_proxy != "__localhost__" else None 489 session.headers.update(video_download_headers) 490 video_requests[video_download_url] = { 491 "request": session.get(video_download_url, proxies=proxy, timeout=30), 492 "video_id": video_id, 493 "type": "download", 494 } 495 self.proxy_map[available_proxy].update({ 496 "busy": True, 497 "url": video_download_url 498 }) 499 # Collect video metadata (to find videos to download) 500 elif video_ids: 501 video_id = video_ids.pop(0) 502 url = f"https://www.tiktok.com/embed/v2/{video_id}" 503 504 proxy = {"http": available_proxy, 505 "https": available_proxy} if available_proxy != "__localhost__" else None 506 session.headers.update(self.headers) 507 video_requests[url] = { 508 "request": session.get(url, proxies=proxy, timeout=30), 509 "video_id": video_id, 510 "type": "metadata", 511 } 512 self.proxy_map[available_proxy].update({ 513 "busy": True, 514 "url": url 515 }) 516 517 # wait for async requests to end (after cancelling) before quitting 518 # the worker 519 if self.processor.interrupted: 520 for request in video_requests.values(): 521 request["request"].cancel() 522 523 max_timeout = time.time() + 20 524 while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout: 525 await asyncio.sleep(0.5) 526 527 raise WorkerInterruptedException("Interrupted while downloading TikTok videos") 528 529 # Extract video download URLs 530 for url in list(video_requests.keys()): 531 video_id = video_requests[url]["video_id"] 532 request = video_requests[url]["request"] 533 request_type = video_requests[url]["type"] 534 request_metadata = { 535 "success": False, 536 "url": url, 537 "error": None, 538 "from_dataset": self.processor.source_dataset.key, 539 "post_ids": [video_id], 540 } 541 if not request.done(): 542 continue 543 544 # Release proxy 545 self.release_proxy(url) 546 547 # Collect response 548 try: 549 response = request.result() 550 except requests.exceptions.RequestException as e: 551 error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})" 552 request_metadata["error"] = error_message 553 download_results[video_id] = request_metadata 554 self.processor.dataset.log(error_message) 555 continue 556 finally: 557 del video_requests[url] 558 559 if response.status_code != 200: 560 error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping." 561 request_metadata["error"] = error_message 562 download_results[video_id] = request_metadata 563 self.processor.dataset.log(error_message) 564 continue 565 566 if request_type == "metadata": 567 # Collect Video Download URL 568 soup = BeautifulSoup(response.text, "html.parser") 569 json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__") 570 video_metadata = None 571 try: 572 if json_source.text: 573 video_metadata = json.loads(json_source.text) 574 elif json_source.contents[0]: 575 video_metadata = json.loads(json_source.contents[0]) 576 except json.JSONDecodeError as e: 577 self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}") 578 579 if not video_metadata: 580 # Failed to collect metadata 581 error_message = f"Failed to find metadata for video {video_id}" 582 request_metadata["error"] = error_message 583 download_results[video_id] = request_metadata 584 self.processor.dataset.log(error_message) 585 continue 586 587 try: 588 url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0] 589 except (KeyError, IndexError): 590 error_message = f"vid: {video_id} - failed to find video download URL" 591 request_metadata["error"] = error_message 592 download_results[video_id] = request_metadata 593 self.processor.dataset.log(error_message) 594 self.processor.dataset.log(video_metadata["source"]["data"].values()) 595 continue 596 597 # Add new download URL to be collected 598 video_download_urls.append((video_id, url)) 599 metadata_collected += 1 600 self.processor.dataset.update_status("Collected metadata for %i/%i videos" % 601 (metadata_collected, max_videos)) 602 self.processor.dataset.update_progress(metadata_collected / max_videos) 603 604 elif request_type == "download": 605 # Download video 606 with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f: 607 for chunk in response.iter_content(chunk_size=1024 * 1024): 608 if chunk: 609 f.write(chunk) 610 request_metadata["success"] = True 611 request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}] 612 download_results[video_id] = request_metadata 613 614 downloaded_videos += 1 615 self.processor.dataset.update_status("Downloaded %i/%i videos" % 616 (downloaded_videos, max_videos)) 617 self.processor.dataset.update_progress(downloaded_videos / max_videos) 618 619 return download_results
149 def __init__(self, processor, config): 150 """ 151 :param Processor processor: The processor using this function and needing updates 152 """ 153 self.proxy_map = {} 154 self.processor = processor
Parameters
- Processor processor: The processor using this function and needing updates
156 def update_proxies(self): 157 """ 158 Get proxies that are available 159 160 :return: 161 """ 162 all_proxies = self.processor.config.get("tiktok-urls-search.proxies") 163 self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep) 164 if not all_proxies: 165 # no proxies? just request directly 166 all_proxies = ["__localhost__"] 167 168 for proxy in all_proxies: 169 if proxy in self.proxy_map: 170 continue 171 else: 172 self.proxy_map[proxy] = { 173 "busy": False, 174 "url": None, 175 "next_request": 0 176 } 177 178 for proxy in list(self.proxy_map.keys()): 179 if proxy not in all_proxies: 180 del self.proxy_map[proxy]
Get proxies that are available
Returns
182 def get_available_proxies(self): 183 """ 184 Collect proxies from proxy_map that are ready for new requests 185 """ 186 # update proxies every 5 seconds so we can potentially update them 187 # while the scrape is running 188 if self.last_proxy_update < time.time(): 189 self.update_proxies() 190 self.last_proxy_update = time.time() + 5 191 192 # find out whether there is any connection we can use to send the 193 # next request 194 available_proxies = [proxy for proxy in self.proxy_map if 195 not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()] 196 197 if not available_proxies: 198 # No proxy available 199 if self.last_time_proxy_available is None: 200 # First run, possibly issue, but this will allow it to time out 201 self.processor.dataset.log("No available proxy found at start of request_metadata") 202 self.last_time_proxy_available = time.time() 203 204 if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time(): 205 # No available proxies in timeout period 206 raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}") 207 else: 208 self.last_time_proxy_available = time.time() 209 210 return available_proxies
Collect proxies from proxy_map that are ready for new requests
212 def release_proxy(self, url): 213 """ 214 Release a proxy to be used later 215 """ 216 # Release proxy 217 used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url] 218 if used_proxy: 219 used_proxy = used_proxy[0] 220 self.proxy_map[used_proxy].update({ 221 "busy": False, 222 "next_request": time.time() + self.proxy_sleep 223 }) 224 else: 225 # TODO: why are we releasing a proxy without a URL? 226 self.processor.dataset.log(f"Unable to find and release proxy associated with {url}") 227 pass
Release a proxy to be used later
229 async def request_metadata(self, urls): 230 """ 231 Request TikTok metadata for a list of URLs 232 233 Uses asyncio to request URLs concurrently if proxy servers are 234 available. Returns a list of metadata, one object per video. 235 236 :param list urls: URLs to collect data for 237 :return list: Metadata 238 """ 239 session = FuturesSession() 240 session.headers.update(self.headers) 241 tiktok_requests = {} 242 finished = 0 243 num_urls = len(urls) 244 seen_urls = set() 245 246 results = [] 247 failed = 0 248 dupes = 0 249 retries = {} 250 251 while urls or tiktok_requests: 252 # give tasks time to run 253 await asyncio.sleep(0.1) 254 255 available_proxies = self.get_available_proxies() 256 257 for available_proxy in available_proxies: 258 url = None 259 while urls and url is None: 260 url = urls.pop(0) 261 url = url.replace("https://", "http://") # https is finicky, lots of blocks 262 263 # Check if url already collected or should be retried 264 if url in seen_urls and url not in retries: 265 finished += 1 266 dupes += 1 267 self.processor.dataset.log("Skipping duplicate of %s" % url) 268 url = None 269 continue 270 271 # Add url to be collected 272 self.processor.dataset.log(f"Requesting: {url}") 273 proxy = {"http": available_proxy, 274 "https": available_proxy} if available_proxy != "__localhost__" else None 275 tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30) 276 seen_urls.add(url) 277 self.proxy_map[available_proxy].update({ 278 "busy": True, 279 "url": url 280 }) 281 282 # wait for async requests to end (after cancelling) before quitting 283 # the worker 284 if self.processor.interrupted: 285 for request in tiktok_requests.values(): 286 request.cancel() 287 288 max_timeout = time.time() + 20 289 while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout: 290 await asyncio.sleep(0.5) 291 292 raise WorkerInterruptedException("Interrupted while fetching TikTok metadata") 293 294 # handle received data 295 for url in list(tiktok_requests.keys()): 296 request = tiktok_requests[url] 297 if not request.done(): 298 continue 299 300 finished += 1 301 self.release_proxy(url) 302 303 # handle the exceptions we know to expect - else just raise and 304 # log 305 exception = request.exception() 306 if exception: 307 failed += 1 308 if isinstance(exception, requests.exceptions.RequestException): 309 self.processor.dataset.update_status( 310 "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception)) 311 else: 312 raise exception 313 314 # retry on requestexceptions 315 try: 316 response = request.result() 317 except requests.exceptions.RequestException: 318 if url not in retries or retries[url] < 3: 319 if url not in retries: 320 retries[url] = 0 321 retries[url] += 1 322 urls.append(url) 323 continue 324 finally: 325 del tiktok_requests[url] 326 327 # video may not exist 328 if response.status_code == 404: 329 failed += 1 330 self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url) 331 skip_to_next = True 332 continue 333 334 # haven't seen these in the wild - 403 or 429 might happen? 335 elif response.status_code != 200: 336 failed += 1 337 self.processor.dataset.update_status( 338 "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url)) 339 continue 340 341 # now! try to extract the JSON from the page 342 soup = BeautifulSoup(response.text, "html.parser") 343 sigil = soup.select_one("script#SIGI_STATE") 344 345 if not sigil: 346 # alternatively, the JSON is here 347 sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__") 348 349 if not sigil: 350 if url not in retries or retries[url] < 3: 351 if url not in retries: 352 retries[url] = 0 353 retries[url] += 1 354 urls.append(url) 355 self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url) 356 else: 357 failed += 1 358 self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url) 359 continue 360 361 try: 362 if sigil.text: 363 metadata = json.loads(sigil.text) 364 elif sigil.contents and len(sigil.contents) > 0: 365 metadata = json.loads(sigil.contents[0]) 366 else: 367 failed += 1 368 self.processor.dataset.log( 369 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 370 continue 371 except json.JSONDecodeError: 372 failed += 1 373 self.processor.dataset.log( 374 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 375 continue 376 377 for video in self.reformat_metadata(metadata): 378 if video == self.VIDEO_NOT_FOUND: 379 failed += 1 380 self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping") 381 continue 382 383 if not video.get("stats") or video.get("createTime") == "0": 384 # sometimes there are empty videos? which seems to 385 # indicate a login wall 386 387 self.processor.dataset.log( 388 f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.") 389 continue 390 else: 391 results.append(video) 392 393 self.processor.dataset.update_status("Processed %s of %s TikTok URLs" % 394 ("{:,}".format(finished), "{:,}".format(num_urls))) 395 self.processor.dataset.update_progress(finished / num_urls) 396 397 notes = [] 398 if failed: 399 notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed)) 400 if dupes: 401 notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes)) 402 403 if notes: 404 self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See " 405 "dataset log for details." % ", ".join(notes)) 406 407 return results
Request TikTok metadata for a list of URLs
Uses asyncio to request URLs concurrently if proxy servers are available. Returns a list of metadata, one object per video.
Parameters
- list urls: URLs to collect data for
Returns
Metadata
409 def reformat_metadata(self, metadata): 410 """ 411 Take embedded JSON and yield one item per post 412 413 :param dict metadata: Metadata extracted from the TikTok video page 414 :return: Yields one dictionary per video 415 """ 416 # may need some extra parsing to find the item data... 417 if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]: 418 try: 419 video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"] 420 except KeyError as e: 421 if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]: 422 yield self.VIDEO_NOT_FOUND 423 return 424 else: 425 raise e.__class__ from e 426 427 metadata = {"ItemModule": { 428 video["id"]: video 429 }} 430 431 if "ItemModule" in metadata: 432 for video_id, item in metadata["ItemModule"].items(): 433 if "CommentItem" in metadata: 434 comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id} 435 if "UserModule" in metadata: 436 for comment_id in list(comments.keys()): 437 username = comments[comment_id]["user"] 438 comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username, 439 username) 440 else: 441 comments = {} 442 443 yield {**item, "comments": list(comments.values())}
Take embedded JSON and yield one item per post
Parameters
- dict metadata: Metadata extracted from the TikTok video page
Returns
Yields one dictionary per video
445 async def download_videos(self, video_ids, staging_area, max_videos): 446 """ 447 Download TikTok Videos 448 449 This is based on the TikTok downloader from https://jdownloader.org/ 450 """ 451 video_download_headers = { 452 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0", 453 "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5", 454 "Accept-Language": "en-US,en;q=0.5", 455 # "Range": "bytes=0-", 456 "Connection": "keep-alive", 457 "Referer": "https://www.tiktok.com/", 458 "Sec-Fetch-Dest": "video", 459 "Sec-Fetch-Mode": "no-cors", 460 "Sec-Fetch-Site": "cross-site", 461 "Accept-Encoding": "identity" 462 } 463 session = FuturesSession() 464 465 download_results = {} 466 downloaded_videos = 0 467 metadata_collected = 0 468 video_requests = {} 469 video_download_urls = [] 470 471 while video_ids or video_download_urls or video_requests: 472 # give tasks time to run 473 await asyncio.sleep(0.1) 474 475 available_proxies = self.get_available_proxies() 476 477 for available_proxy in available_proxies: 478 if downloaded_videos > max_videos: 479 # We're done here 480 video_ids = [] 481 video_download_urls = [] 482 break 483 484 # Download videos (if available) 485 if video_download_urls: 486 video_id, video_download_url = video_download_urls.pop(0) 487 proxy = {"http": available_proxy, 488 "https": available_proxy} if available_proxy != "__localhost__" else None 489 session.headers.update(video_download_headers) 490 video_requests[video_download_url] = { 491 "request": session.get(video_download_url, proxies=proxy, timeout=30), 492 "video_id": video_id, 493 "type": "download", 494 } 495 self.proxy_map[available_proxy].update({ 496 "busy": True, 497 "url": video_download_url 498 }) 499 # Collect video metadata (to find videos to download) 500 elif video_ids: 501 video_id = video_ids.pop(0) 502 url = f"https://www.tiktok.com/embed/v2/{video_id}" 503 504 proxy = {"http": available_proxy, 505 "https": available_proxy} if available_proxy != "__localhost__" else None 506 session.headers.update(self.headers) 507 video_requests[url] = { 508 "request": session.get(url, proxies=proxy, timeout=30), 509 "video_id": video_id, 510 "type": "metadata", 511 } 512 self.proxy_map[available_proxy].update({ 513 "busy": True, 514 "url": url 515 }) 516 517 # wait for async requests to end (after cancelling) before quitting 518 # the worker 519 if self.processor.interrupted: 520 for request in video_requests.values(): 521 request["request"].cancel() 522 523 max_timeout = time.time() + 20 524 while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout: 525 await asyncio.sleep(0.5) 526 527 raise WorkerInterruptedException("Interrupted while downloading TikTok videos") 528 529 # Extract video download URLs 530 for url in list(video_requests.keys()): 531 video_id = video_requests[url]["video_id"] 532 request = video_requests[url]["request"] 533 request_type = video_requests[url]["type"] 534 request_metadata = { 535 "success": False, 536 "url": url, 537 "error": None, 538 "from_dataset": self.processor.source_dataset.key, 539 "post_ids": [video_id], 540 } 541 if not request.done(): 542 continue 543 544 # Release proxy 545 self.release_proxy(url) 546 547 # Collect response 548 try: 549 response = request.result() 550 except requests.exceptions.RequestException as e: 551 error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})" 552 request_metadata["error"] = error_message 553 download_results[video_id] = request_metadata 554 self.processor.dataset.log(error_message) 555 continue 556 finally: 557 del video_requests[url] 558 559 if response.status_code != 200: 560 error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping." 561 request_metadata["error"] = error_message 562 download_results[video_id] = request_metadata 563 self.processor.dataset.log(error_message) 564 continue 565 566 if request_type == "metadata": 567 # Collect Video Download URL 568 soup = BeautifulSoup(response.text, "html.parser") 569 json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__") 570 video_metadata = None 571 try: 572 if json_source.text: 573 video_metadata = json.loads(json_source.text) 574 elif json_source.contents[0]: 575 video_metadata = json.loads(json_source.contents[0]) 576 except json.JSONDecodeError as e: 577 self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}") 578 579 if not video_metadata: 580 # Failed to collect metadata 581 error_message = f"Failed to find metadata for video {video_id}" 582 request_metadata["error"] = error_message 583 download_results[video_id] = request_metadata 584 self.processor.dataset.log(error_message) 585 continue 586 587 try: 588 url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0] 589 except (KeyError, IndexError): 590 error_message = f"vid: {video_id} - failed to find video download URL" 591 request_metadata["error"] = error_message 592 download_results[video_id] = request_metadata 593 self.processor.dataset.log(error_message) 594 self.processor.dataset.log(video_metadata["source"]["data"].values()) 595 continue 596 597 # Add new download URL to be collected 598 video_download_urls.append((video_id, url)) 599 metadata_collected += 1 600 self.processor.dataset.update_status("Collected metadata for %i/%i videos" % 601 (metadata_collected, max_videos)) 602 self.processor.dataset.update_progress(metadata_collected / max_videos) 603 604 elif request_type == "download": 605 # Download video 606 with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f: 607 for chunk in response.iter_content(chunk_size=1024 * 1024): 608 if chunk: 609 f.write(chunk) 610 request_metadata["success"] = True 611 request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}] 612 download_results[video_id] = request_metadata 613 614 downloaded_videos += 1 615 self.processor.dataset.update_status("Downloaded %i/%i videos" % 616 (downloaded_videos, max_videos)) 617 self.processor.dataset.update_progress(downloaded_videos / max_videos) 618 619 return download_results
Download TikTok Videos
This is based on the TikTok downloader from https://jdownloader.org/