datasources.tiktok_urls.search_tiktok_urls
Import scraped TikTok data
It's prohibitively difficult to scrape data from TikTok within 4CAT itself due to its aggressive rate limiting. Instead, import data collected elsewhere.
1""" 2Import scraped TikTok data 3 4It's prohibitively difficult to scrape data from TikTok within 4CAT itself due 5to its aggressive rate limiting. Instead, import data collected elsewhere. 6""" 7import requests 8import asyncio 9import time 10import json 11import re 12 13from requests_futures.sessions import FuturesSession 14from bs4 import BeautifulSoup 15 16from backend.lib.search import Search 17from common.lib.helpers import UserInput 18from common.lib.exceptions import WorkerInterruptedException, QueryParametersException, ProcessorException 19from datasources.tiktok.search_tiktok import SearchTikTok as SearchTikTokByImport 20 21class SearchTikTokByID(Search): 22 """ 23 Import scraped TikTok data 24 """ 25 type = "tiktok-urls-search" # job ID 26 category = "Search" # category 27 title = "Search TikTok by post URL" # title displayed in UI 28 description = "Retrieve metadata for TikTok post URLs." # description displayed in UI 29 extension = "ndjson" # extension of result file, used internally and in UI 30 is_local = False # Whether this datasource is locally scraped 31 is_static = False # Whether this datasource is still updated 32 33 # not available as a processor for existing datasets 34 accepts = [None] 35 36 config = { 37 "tiktok-urls-search.proxies": { 38 "type": UserInput.OPTION_TEXT_JSON, 39 "default": [], 40 "help": "Proxies for TikTok data collection" 41 }, 42 "tiktok-urls-search.proxies.wait": { 43 "type": UserInput.OPTION_TEXT, 44 "coerce_type": float, 45 "default": 1.0, 46 "help": "Request wait", 47 "tooltip": "Time to wait before sending a new request from the same IP" 48 } 49 } 50 51 options = { 52 "intro": { 53 "type": UserInput.OPTION_INFO, 54 "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those " 55 "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from " 56 "each post's page in the browser interface " 57 "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of " 58 "details about the post itself such as likes, tags and stickers. Note that some of the metadata is " 59 "only directly available when downloading the results as an .ndjson file." 60 }, 61 "urls": { 62 "type": UserInput.OPTION_TEXT_LARGE, 63 "help": "Post URLs", 64 "tooltip": "Separate by commas or new lines." 65 } 66 } 67 68 def get_items(self, query): 69 """ 70 Retrieve metadata for TikTok URLs 71 72 :param dict query: Search query parameters 73 """ 74 tiktok_scraper = TikTokScraper(processor=self, config=self.config) 75 loop = asyncio.new_event_loop() 76 return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(","))) 77 78 @staticmethod 79 def validate_query(query, request, config): 80 """ 81 Validate TikTok query 82 83 :param dict query: Query parameters, from client-side. 84 :param request: Flask request 85 :param ConfigManager|None config: Configuration reader (context-aware) 86 :return dict: Safe query parameters 87 """ 88 if not query.get("urls"): 89 # no URLs provided 90 raise QueryParametersException("Missing \"Post URLs\" (\"urls\" parameter). Please provide a list of TikTok video URLs.") 91 92 # reformat queries to be a comma-separated list with no wrapping 93 # whitespace 94 whitespace = re.compile(r"\s+") 95 items = whitespace.sub("", query.get("urls").replace("\n", ",")) 96 97 sanitized_items = [] 98 # handle telegram URLs 99 for item in str(items).split(","): 100 if not item.strip(): 101 continue 102 103 if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \ 104 not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \ 105 not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item): 106 raise QueryParametersException("'%s' is not a valid TikTok video URL" % item) 107 108 sanitized_items.append(item) 109 110 # no query 4 u 111 if not sanitized_items: 112 raise QueryParametersException("You must provide at least one valid TikTok video URL.") 113 114 # simple! 115 return { 116 "urls": ",".join(sanitized_items) 117 } 118 119 @staticmethod 120 def map_item(item): 121 """ 122 Analogous to the other TikTok data source 123 124 :param item: 125 :return: 126 """ 127 return SearchTikTokByImport.map_item(item) 128 129 130class TikTokScraper: 131 proxy_map = None 132 proxy_sleep = 1 133 headers = { 134 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", 135 "Accept-Encoding": "gzip, deflate", 136 "Accept-Language": "en-US,en;q=0.5", 137 "Connection": "keep-alive", 138 "DNT": "1", 139 "Sec-Fetch-Dest": "document", 140 "Sec-Fetch-Mode": "navigate", 141 "Sec-Fetch-Site": "none", 142 "Sec-Fetch-User": "?1", 143 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0" 144 } 145 last_proxy_update = 0 146 last_time_proxy_available = None 147 no_available_proxy_timeout = 600 148 consecutive_failures = 0 149 150 VIDEO_NOT_FOUND = "oh no, sire, no video was found" 151 152 def __init__(self, processor, config): 153 """ 154 :param Processor processor: The processor using this function and needing updates 155 """ 156 self.proxy_map = {} 157 self.processor = processor 158 159 def update_proxies(self): 160 """ 161 Get proxies that are available 162 163 :return: 164 """ 165 all_proxies = self.processor.config.get("tiktok-urls-search.proxies") 166 self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep) 167 if not all_proxies: 168 # no proxies? just request directly 169 all_proxies = ["__localhost__"] 170 171 for proxy in all_proxies: 172 if proxy in self.proxy_map: 173 continue 174 else: 175 self.proxy_map[proxy] = { 176 "busy": False, 177 "url": None, 178 "next_request": 0 179 } 180 181 for proxy in list(self.proxy_map.keys()): 182 if proxy not in all_proxies: 183 del self.proxy_map[proxy] 184 185 def get_available_proxies(self): 186 """ 187 Collect proxies from proxy_map that are ready for new requests 188 """ 189 # update proxies every 5 seconds so we can potentially update them 190 # while the scrape is running 191 if self.last_proxy_update < time.time(): 192 self.update_proxies() 193 self.last_proxy_update = time.time() + 5 194 195 # find out whether there is any connection we can use to send the 196 # next request 197 available_proxies = [proxy for proxy in self.proxy_map if 198 not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()] 199 200 if not available_proxies: 201 # No proxy available 202 if self.last_time_proxy_available is None: 203 # First run, possibly issue, but this will allow it to time out 204 self.processor.dataset.log("No available proxy found at start of request_metadata") 205 self.last_time_proxy_available = time.time() 206 207 if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time(): 208 # No available proxies in timeout period 209 raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}") 210 else: 211 self.last_time_proxy_available = time.time() 212 213 return available_proxies 214 215 def release_proxy(self, url): 216 """ 217 Release a proxy to be used later 218 """ 219 # Release proxy 220 used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url] 221 if used_proxy: 222 used_proxy = used_proxy[0] 223 self.proxy_map[used_proxy].update({ 224 "busy": False, 225 "next_request": time.time() + self.proxy_sleep 226 }) 227 else: 228 # TODO: why are we releasing a proxy without a URL? 229 self.processor.dataset.log(f"Unable to find and release proxy associated with {url}") 230 pass 231 232 async def request_metadata(self, urls): 233 """ 234 Request TikTok metadata for a list of URLs 235 236 Uses asyncio to request URLs concurrently if proxy servers are 237 available. Returns a list of metadata, one object per video. 238 239 :param list urls: URLs to collect data for 240 :return list: Metadata 241 """ 242 session = FuturesSession() 243 session.headers.update(self.headers) 244 tiktok_requests = {} 245 finished = 0 246 num_urls = len(urls) 247 seen_urls = set() 248 249 results = [] 250 failed = 0 251 dupes = 0 252 retries = {} 253 254 while urls or tiktok_requests: 255 # give tasks time to run 256 await asyncio.sleep(0.1) 257 258 available_proxies = self.get_available_proxies() 259 260 for available_proxy in available_proxies: 261 url = None 262 while urls and url is None: 263 url = urls.pop(0) 264 url = url.replace("https://", "http://") # https is finicky, lots of blocks 265 266 # Check if url already collected or should be retried 267 if url in seen_urls and url not in retries: 268 finished += 1 269 dupes += 1 270 self.processor.dataset.log("Skipping duplicate of %s" % url) 271 url = None 272 continue 273 274 # Add url to be collected 275 self.processor.dataset.log(f"Requesting: {url}") 276 proxy = {"http": available_proxy, 277 "https": available_proxy} if available_proxy != "__localhost__" else None 278 tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30) 279 seen_urls.add(url) 280 self.proxy_map[available_proxy].update({ 281 "busy": True, 282 "url": url 283 }) 284 285 # wait for async requests to end (after cancelling) before quitting 286 # the worker 287 if self.processor.interrupted: 288 for request in tiktok_requests.values(): 289 request.cancel() 290 291 max_timeout = time.time() + 20 292 while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout: 293 await asyncio.sleep(0.5) 294 295 raise WorkerInterruptedException("Interrupted while fetching TikTok metadata") 296 297 # handle received data 298 for url in list(tiktok_requests.keys()): 299 request = tiktok_requests[url] 300 if not request.done(): 301 continue 302 303 finished += 1 304 self.release_proxy(url) 305 306 # handle the exceptions we know to expect - else just raise and 307 # log 308 exception = request.exception() 309 if exception: 310 failed += 1 311 if isinstance(exception, requests.exceptions.RequestException): 312 self.processor.dataset.update_status( 313 "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception)) 314 else: 315 raise exception 316 317 # retry on requestexceptions 318 try: 319 response = request.result() 320 except requests.exceptions.RequestException: 321 if url not in retries or retries[url] < 3: 322 if url not in retries: 323 retries[url] = 0 324 retries[url] += 1 325 urls.append(url) 326 continue 327 finally: 328 del tiktok_requests[url] 329 330 # video may not exist 331 if response.status_code == 404: 332 failed += 1 333 self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url) 334 continue 335 336 # haven't seen these in the wild - 403 or 429 might happen? 337 elif response.status_code != 200: 338 failed += 1 339 self.processor.dataset.update_status( 340 "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url)) 341 continue 342 343 # now! try to extract the JSON from the page 344 soup = BeautifulSoup(response.text, "html.parser") 345 sigil = soup.select_one("script#SIGI_STATE") 346 347 if not sigil: 348 # alternatively, the JSON is here 349 sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__") 350 351 if not sigil: 352 if url not in retries or retries[url] < 3: 353 if url not in retries: 354 retries[url] = 0 355 retries[url] += 1 356 urls.append(url) 357 self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url) 358 else: 359 failed += 1 360 self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url) 361 continue 362 363 try: 364 if sigil.text: 365 metadata = json.loads(sigil.text) 366 elif sigil.contents and len(sigil.contents) > 0: 367 metadata = json.loads(sigil.contents[0]) 368 else: 369 failed += 1 370 self.processor.dataset.log( 371 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 372 continue 373 except json.JSONDecodeError: 374 failed += 1 375 self.processor.dataset.log( 376 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 377 continue 378 379 for video in self.reformat_metadata(metadata): 380 if video == self.VIDEO_NOT_FOUND: 381 failed += 1 382 self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping") 383 continue 384 385 if not video.get("stats") or video.get("createTime") == "0": 386 # sometimes there are empty videos? which seems to 387 # indicate a login wall 388 389 self.processor.dataset.log( 390 f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.") 391 continue 392 else: 393 results.append(video) 394 395 self.processor.dataset.update_status("Processed %s of %s TikTok URLs" % 396 ("{:,}".format(finished), "{:,}".format(num_urls))) 397 self.processor.dataset.update_progress(finished / num_urls) 398 399 notes = [] 400 if failed: 401 notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed)) 402 if dupes: 403 notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes)) 404 405 if notes: 406 self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See " 407 "dataset log for details." % ", ".join(notes)) 408 409 return results 410 411 def reformat_metadata(self, metadata): 412 """ 413 Take embedded JSON and yield one item per post 414 415 :param dict metadata: Metadata extracted from the TikTok video page 416 :return: Yields one dictionary per video 417 """ 418 # may need some extra parsing to find the item data... 419 if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]: 420 try: 421 video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"] 422 except KeyError as e: 423 if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]: 424 yield self.VIDEO_NOT_FOUND 425 return 426 else: 427 raise e.__class__ from e 428 429 metadata = {"ItemModule": { 430 video["id"]: video 431 }} 432 433 if "ItemModule" in metadata: 434 for video_id, item in metadata["ItemModule"].items(): 435 if "CommentItem" in metadata: 436 comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id} 437 if "UserModule" in metadata: 438 for comment_id in list(comments.keys()): 439 username = comments[comment_id]["user"] 440 comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username, 441 username) 442 else: 443 comments = {} 444 445 yield {**item, "comments": list(comments.values())} 446 447 async def download_videos(self, video_ids, staging_area, max_videos): 448 """ 449 Download TikTok Videos 450 451 This is based on the TikTok downloader from https://jdownloader.org/ 452 """ 453 video_download_headers = { 454 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0", 455 "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5", 456 "Accept-Language": "en-US,en;q=0.5", 457 # "Range": "bytes=0-", 458 "Connection": "keep-alive", 459 "Referer": "https://www.tiktok.com/", 460 "Sec-Fetch-Dest": "video", 461 "Sec-Fetch-Mode": "no-cors", 462 "Sec-Fetch-Site": "cross-site", 463 "Accept-Encoding": "identity" 464 } 465 session = FuturesSession() 466 467 download_results = {} 468 downloaded_videos = 0 469 metadata_collected = 0 470 video_requests = {} 471 video_download_urls = [] 472 473 while video_ids or video_download_urls or video_requests: 474 # give tasks time to run 475 await asyncio.sleep(0.1) 476 477 available_proxies = self.get_available_proxies() 478 479 for available_proxy in available_proxies: 480 if downloaded_videos > max_videos: 481 # We're done here 482 video_ids = [] 483 video_download_urls = [] 484 break 485 486 # Download videos (if available) 487 if video_download_urls: 488 video_id, video_download_url = video_download_urls.pop(0) 489 proxy = {"http": available_proxy, 490 "https": available_proxy} if available_proxy != "__localhost__" else None 491 session.headers.update(video_download_headers) 492 video_requests[video_download_url] = { 493 "request": session.get(video_download_url, proxies=proxy, timeout=30), # not using stream=True here, as it seems slower; possibly due to short video lengths 494 "video_id": video_id, 495 "type": "download", 496 } 497 self.proxy_map[available_proxy].update({ 498 "busy": True, 499 "url": video_download_url 500 }) 501 # Collect video metadata (to find videos to download) 502 elif video_ids: 503 video_id = video_ids.pop(0) 504 url = f"https://www.tiktok.com/embed/v2/{video_id}" 505 506 proxy = {"http": available_proxy, 507 "https": available_proxy} if available_proxy != "__localhost__" else None 508 session.headers.update(self.headers) 509 video_requests[url] = { 510 "request": session.get(url, proxies=proxy, timeout=30), 511 "video_id": video_id, 512 "type": "metadata", 513 } 514 self.proxy_map[available_proxy].update({ 515 "busy": True, 516 "url": url 517 }) 518 519 # wait for async requests to end (after cancelling) before quitting 520 # the worker 521 if self.processor.interrupted: 522 for request in video_requests.values(): 523 request["request"].cancel() 524 525 max_timeout = time.time() + 20 526 while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout: 527 await asyncio.sleep(0.5) 528 529 raise WorkerInterruptedException("Interrupted while downloading TikTok videos") 530 531 # Extract video download URLs 532 for url in list(video_requests.keys()): 533 video_id = video_requests[url]["video_id"] 534 request = video_requests[url]["request"] 535 request_type = video_requests[url]["type"] 536 request_metadata = { 537 "success": False, 538 "url": url, 539 "error": None, 540 "from_dataset": self.processor.source_dataset.key, 541 "post_ids": [video_id], 542 } 543 if not request.done(): 544 continue 545 546 # Release proxy 547 self.release_proxy(url) 548 549 # Collect response 550 try: 551 response = request.result() 552 except requests.exceptions.RequestException as e: 553 error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})" 554 request_metadata["error"] = error_message 555 download_results[video_id] = request_metadata 556 self.processor.dataset.log(error_message) 557 continue 558 finally: 559 del video_requests[url] 560 561 if response.status_code != 200: 562 error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping." 563 request_metadata["error"] = error_message 564 download_results[video_id] = request_metadata 565 self.processor.dataset.log(error_message) 566 continue 567 568 if request_type == "metadata": 569 # Collect Video Download URL 570 soup = BeautifulSoup(response.text, "html.parser") 571 json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__") 572 video_metadata = None 573 try: 574 if json_source.text: 575 video_metadata = json.loads(json_source.text) 576 elif json_source.contents[0]: 577 video_metadata = json.loads(json_source.contents[0]) 578 except json.JSONDecodeError as e: 579 self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}") 580 581 if not video_metadata: 582 # Failed to collect metadata 583 error_message = f"Failed to find metadata for video {video_id}" 584 request_metadata["error"] = error_message 585 download_results[video_id] = request_metadata 586 self.processor.dataset.log(error_message) 587 self.consecutive_failures += 1 588 if self.consecutive_failures > 5: 589 raise ProcessorException("Too many consecutive failures, stopping") 590 continue 591 592 try: 593 url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0] 594 except (KeyError, IndexError): 595 error_message = f"vid: {video_id} - failed to find video download URL" 596 request_metadata["error"] = error_message 597 download_results[video_id] = request_metadata 598 self.processor.dataset.log(error_message) 599 self.processor.dataset.log(video_metadata["source"]["data"].values()) 600 continue 601 602 # Add new download URL to be collected 603 self.consecutive_failures = 0 604 video_download_urls.append((video_id, url)) 605 metadata_collected += 1 606 self.processor.dataset.update_status("Collected metadata for %i/%i videos" % 607 (metadata_collected, max_videos)) 608 self.processor.dataset.update_progress(metadata_collected / max_videos) 609 610 elif request_type == "download": 611 # Download video 612 with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f: 613 for chunk in response.iter_content(chunk_size=1024 * 1024): 614 if chunk: 615 f.write(chunk) 616 request_metadata["success"] = True 617 request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}] 618 download_results[video_id] = request_metadata 619 620 downloaded_videos += 1 621 self.processor.dataset.update_status("Downloaded %i/%i videos" % 622 (downloaded_videos, max_videos)) 623 self.processor.dataset.update_progress(downloaded_videos / max_videos) 624 625 return download_results
22class SearchTikTokByID(Search): 23 """ 24 Import scraped TikTok data 25 """ 26 type = "tiktok-urls-search" # job ID 27 category = "Search" # category 28 title = "Search TikTok by post URL" # title displayed in UI 29 description = "Retrieve metadata for TikTok post URLs." # description displayed in UI 30 extension = "ndjson" # extension of result file, used internally and in UI 31 is_local = False # Whether this datasource is locally scraped 32 is_static = False # Whether this datasource is still updated 33 34 # not available as a processor for existing datasets 35 accepts = [None] 36 37 config = { 38 "tiktok-urls-search.proxies": { 39 "type": UserInput.OPTION_TEXT_JSON, 40 "default": [], 41 "help": "Proxies for TikTok data collection" 42 }, 43 "tiktok-urls-search.proxies.wait": { 44 "type": UserInput.OPTION_TEXT, 45 "coerce_type": float, 46 "default": 1.0, 47 "help": "Request wait", 48 "tooltip": "Time to wait before sending a new request from the same IP" 49 } 50 } 51 52 options = { 53 "intro": { 54 "type": UserInput.OPTION_INFO, 55 "help": "This data source can retrieve metadata for TikTok posts based on a list of URLs for those " 56 "posts.\n\nEnter a list of TikTok post URLs. Metadata for each post will be extracted from " 57 "each post's page in the browser interface " 58 "([example](https://www.tiktok.com/@willsmith/video/7079929224945093934)). This includes a lot of " 59 "details about the post itself such as likes, tags and stickers. Note that some of the metadata is " 60 "only directly available when downloading the results as an .ndjson file." 61 }, 62 "urls": { 63 "type": UserInput.OPTION_TEXT_LARGE, 64 "help": "Post URLs", 65 "tooltip": "Separate by commas or new lines." 66 } 67 } 68 69 def get_items(self, query): 70 """ 71 Retrieve metadata for TikTok URLs 72 73 :param dict query: Search query parameters 74 """ 75 tiktok_scraper = TikTokScraper(processor=self, config=self.config) 76 loop = asyncio.new_event_loop() 77 return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(","))) 78 79 @staticmethod 80 def validate_query(query, request, config): 81 """ 82 Validate TikTok query 83 84 :param dict query: Query parameters, from client-side. 85 :param request: Flask request 86 :param ConfigManager|None config: Configuration reader (context-aware) 87 :return dict: Safe query parameters 88 """ 89 if not query.get("urls"): 90 # no URLs provided 91 raise QueryParametersException("Missing \"Post URLs\" (\"urls\" parameter). Please provide a list of TikTok video URLs.") 92 93 # reformat queries to be a comma-separated list with no wrapping 94 # whitespace 95 whitespace = re.compile(r"\s+") 96 items = whitespace.sub("", query.get("urls").replace("\n", ",")) 97 98 sanitized_items = [] 99 # handle telegram URLs 100 for item in str(items).split(","): 101 if not item.strip(): 102 continue 103 104 if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \ 105 not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \ 106 not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item): 107 raise QueryParametersException("'%s' is not a valid TikTok video URL" % item) 108 109 sanitized_items.append(item) 110 111 # no query 4 u 112 if not sanitized_items: 113 raise QueryParametersException("You must provide at least one valid TikTok video URL.") 114 115 # simple! 116 return { 117 "urls": ",".join(sanitized_items) 118 } 119 120 @staticmethod 121 def map_item(item): 122 """ 123 Analogous to the other TikTok data source 124 125 :param item: 126 :return: 127 """ 128 return SearchTikTokByImport.map_item(item)
Import scraped TikTok data
69 def get_items(self, query): 70 """ 71 Retrieve metadata for TikTok URLs 72 73 :param dict query: Search query parameters 74 """ 75 tiktok_scraper = TikTokScraper(processor=self, config=self.config) 76 loop = asyncio.new_event_loop() 77 return loop.run_until_complete(tiktok_scraper.request_metadata(query["urls"].split(",")))
Retrieve metadata for TikTok URLs
Parameters
- dict query: Search query parameters
79 @staticmethod 80 def validate_query(query, request, config): 81 """ 82 Validate TikTok query 83 84 :param dict query: Query parameters, from client-side. 85 :param request: Flask request 86 :param ConfigManager|None config: Configuration reader (context-aware) 87 :return dict: Safe query parameters 88 """ 89 if not query.get("urls"): 90 # no URLs provided 91 raise QueryParametersException("Missing \"Post URLs\" (\"urls\" parameter). Please provide a list of TikTok video URLs.") 92 93 # reformat queries to be a comma-separated list with no wrapping 94 # whitespace 95 whitespace = re.compile(r"\s+") 96 items = whitespace.sub("", query.get("urls").replace("\n", ",")) 97 98 sanitized_items = [] 99 # handle telegram URLs 100 for item in str(items).split(","): 101 if not item.strip(): 102 continue 103 104 if not re.match(r"https?://www\.tiktokv\.com/share/video/[0-9]+/", item) and \ 105 not re.match(r"https?://www\.tiktok\.com/@[^/]+/video/[0-9]+.*", item) and \ 106 not re.match(r"https?://tiktok\.com/@[^/]+/video/[0-9]+.*", item): 107 raise QueryParametersException("'%s' is not a valid TikTok video URL" % item) 108 109 sanitized_items.append(item) 110 111 # no query 4 u 112 if not sanitized_items: 113 raise QueryParametersException("You must provide at least one valid TikTok video URL.") 114 115 # simple! 116 return { 117 "urls": ",".join(sanitized_items) 118 }
Validate TikTok query
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- ConfigManager|None config: Configuration reader (context-aware)
Returns
Safe query parameters
120 @staticmethod 121 def map_item(item): 122 """ 123 Analogous to the other TikTok data source 124 125 :param item: 126 :return: 127 """ 128 return SearchTikTokByImport.map_item(item)
Analogous to the other TikTok data source
Parameters
- item:
Returns
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- max_workers
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- iterate_proxied_requests
- push_proxied_request
- flush_proxied_requests
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- save_annotations
- map_item_method_available
- get_mapped_item
- is_filter
- get_options
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor
131class TikTokScraper: 132 proxy_map = None 133 proxy_sleep = 1 134 headers = { 135 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", 136 "Accept-Encoding": "gzip, deflate", 137 "Accept-Language": "en-US,en;q=0.5", 138 "Connection": "keep-alive", 139 "DNT": "1", 140 "Sec-Fetch-Dest": "document", 141 "Sec-Fetch-Mode": "navigate", 142 "Sec-Fetch-Site": "none", 143 "Sec-Fetch-User": "?1", 144 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0" 145 } 146 last_proxy_update = 0 147 last_time_proxy_available = None 148 no_available_proxy_timeout = 600 149 consecutive_failures = 0 150 151 VIDEO_NOT_FOUND = "oh no, sire, no video was found" 152 153 def __init__(self, processor, config): 154 """ 155 :param Processor processor: The processor using this function and needing updates 156 """ 157 self.proxy_map = {} 158 self.processor = processor 159 160 def update_proxies(self): 161 """ 162 Get proxies that are available 163 164 :return: 165 """ 166 all_proxies = self.processor.config.get("tiktok-urls-search.proxies") 167 self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep) 168 if not all_proxies: 169 # no proxies? just request directly 170 all_proxies = ["__localhost__"] 171 172 for proxy in all_proxies: 173 if proxy in self.proxy_map: 174 continue 175 else: 176 self.proxy_map[proxy] = { 177 "busy": False, 178 "url": None, 179 "next_request": 0 180 } 181 182 for proxy in list(self.proxy_map.keys()): 183 if proxy not in all_proxies: 184 del self.proxy_map[proxy] 185 186 def get_available_proxies(self): 187 """ 188 Collect proxies from proxy_map that are ready for new requests 189 """ 190 # update proxies every 5 seconds so we can potentially update them 191 # while the scrape is running 192 if self.last_proxy_update < time.time(): 193 self.update_proxies() 194 self.last_proxy_update = time.time() + 5 195 196 # find out whether there is any connection we can use to send the 197 # next request 198 available_proxies = [proxy for proxy in self.proxy_map if 199 not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()] 200 201 if not available_proxies: 202 # No proxy available 203 if self.last_time_proxy_available is None: 204 # First run, possibly issue, but this will allow it to time out 205 self.processor.dataset.log("No available proxy found at start of request_metadata") 206 self.last_time_proxy_available = time.time() 207 208 if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time(): 209 # No available proxies in timeout period 210 raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}") 211 else: 212 self.last_time_proxy_available = time.time() 213 214 return available_proxies 215 216 def release_proxy(self, url): 217 """ 218 Release a proxy to be used later 219 """ 220 # Release proxy 221 used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url] 222 if used_proxy: 223 used_proxy = used_proxy[0] 224 self.proxy_map[used_proxy].update({ 225 "busy": False, 226 "next_request": time.time() + self.proxy_sleep 227 }) 228 else: 229 # TODO: why are we releasing a proxy without a URL? 230 self.processor.dataset.log(f"Unable to find and release proxy associated with {url}") 231 pass 232 233 async def request_metadata(self, urls): 234 """ 235 Request TikTok metadata for a list of URLs 236 237 Uses asyncio to request URLs concurrently if proxy servers are 238 available. Returns a list of metadata, one object per video. 239 240 :param list urls: URLs to collect data for 241 :return list: Metadata 242 """ 243 session = FuturesSession() 244 session.headers.update(self.headers) 245 tiktok_requests = {} 246 finished = 0 247 num_urls = len(urls) 248 seen_urls = set() 249 250 results = [] 251 failed = 0 252 dupes = 0 253 retries = {} 254 255 while urls or tiktok_requests: 256 # give tasks time to run 257 await asyncio.sleep(0.1) 258 259 available_proxies = self.get_available_proxies() 260 261 for available_proxy in available_proxies: 262 url = None 263 while urls and url is None: 264 url = urls.pop(0) 265 url = url.replace("https://", "http://") # https is finicky, lots of blocks 266 267 # Check if url already collected or should be retried 268 if url in seen_urls and url not in retries: 269 finished += 1 270 dupes += 1 271 self.processor.dataset.log("Skipping duplicate of %s" % url) 272 url = None 273 continue 274 275 # Add url to be collected 276 self.processor.dataset.log(f"Requesting: {url}") 277 proxy = {"http": available_proxy, 278 "https": available_proxy} if available_proxy != "__localhost__" else None 279 tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30) 280 seen_urls.add(url) 281 self.proxy_map[available_proxy].update({ 282 "busy": True, 283 "url": url 284 }) 285 286 # wait for async requests to end (after cancelling) before quitting 287 # the worker 288 if self.processor.interrupted: 289 for request in tiktok_requests.values(): 290 request.cancel() 291 292 max_timeout = time.time() + 20 293 while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout: 294 await asyncio.sleep(0.5) 295 296 raise WorkerInterruptedException("Interrupted while fetching TikTok metadata") 297 298 # handle received data 299 for url in list(tiktok_requests.keys()): 300 request = tiktok_requests[url] 301 if not request.done(): 302 continue 303 304 finished += 1 305 self.release_proxy(url) 306 307 # handle the exceptions we know to expect - else just raise and 308 # log 309 exception = request.exception() 310 if exception: 311 failed += 1 312 if isinstance(exception, requests.exceptions.RequestException): 313 self.processor.dataset.update_status( 314 "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception)) 315 else: 316 raise exception 317 318 # retry on requestexceptions 319 try: 320 response = request.result() 321 except requests.exceptions.RequestException: 322 if url not in retries or retries[url] < 3: 323 if url not in retries: 324 retries[url] = 0 325 retries[url] += 1 326 urls.append(url) 327 continue 328 finally: 329 del tiktok_requests[url] 330 331 # video may not exist 332 if response.status_code == 404: 333 failed += 1 334 self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url) 335 continue 336 337 # haven't seen these in the wild - 403 or 429 might happen? 338 elif response.status_code != 200: 339 failed += 1 340 self.processor.dataset.update_status( 341 "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url)) 342 continue 343 344 # now! try to extract the JSON from the page 345 soup = BeautifulSoup(response.text, "html.parser") 346 sigil = soup.select_one("script#SIGI_STATE") 347 348 if not sigil: 349 # alternatively, the JSON is here 350 sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__") 351 352 if not sigil: 353 if url not in retries or retries[url] < 3: 354 if url not in retries: 355 retries[url] = 0 356 retries[url] += 1 357 urls.append(url) 358 self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url) 359 else: 360 failed += 1 361 self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url) 362 continue 363 364 try: 365 if sigil.text: 366 metadata = json.loads(sigil.text) 367 elif sigil.contents and len(sigil.contents) > 0: 368 metadata = json.loads(sigil.contents[0]) 369 else: 370 failed += 1 371 self.processor.dataset.log( 372 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 373 continue 374 except json.JSONDecodeError: 375 failed += 1 376 self.processor.dataset.log( 377 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 378 continue 379 380 for video in self.reformat_metadata(metadata): 381 if video == self.VIDEO_NOT_FOUND: 382 failed += 1 383 self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping") 384 continue 385 386 if not video.get("stats") or video.get("createTime") == "0": 387 # sometimes there are empty videos? which seems to 388 # indicate a login wall 389 390 self.processor.dataset.log( 391 f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.") 392 continue 393 else: 394 results.append(video) 395 396 self.processor.dataset.update_status("Processed %s of %s TikTok URLs" % 397 ("{:,}".format(finished), "{:,}".format(num_urls))) 398 self.processor.dataset.update_progress(finished / num_urls) 399 400 notes = [] 401 if failed: 402 notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed)) 403 if dupes: 404 notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes)) 405 406 if notes: 407 self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See " 408 "dataset log for details." % ", ".join(notes)) 409 410 return results 411 412 def reformat_metadata(self, metadata): 413 """ 414 Take embedded JSON and yield one item per post 415 416 :param dict metadata: Metadata extracted from the TikTok video page 417 :return: Yields one dictionary per video 418 """ 419 # may need some extra parsing to find the item data... 420 if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]: 421 try: 422 video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"] 423 except KeyError as e: 424 if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]: 425 yield self.VIDEO_NOT_FOUND 426 return 427 else: 428 raise e.__class__ from e 429 430 metadata = {"ItemModule": { 431 video["id"]: video 432 }} 433 434 if "ItemModule" in metadata: 435 for video_id, item in metadata["ItemModule"].items(): 436 if "CommentItem" in metadata: 437 comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id} 438 if "UserModule" in metadata: 439 for comment_id in list(comments.keys()): 440 username = comments[comment_id]["user"] 441 comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username, 442 username) 443 else: 444 comments = {} 445 446 yield {**item, "comments": list(comments.values())} 447 448 async def download_videos(self, video_ids, staging_area, max_videos): 449 """ 450 Download TikTok Videos 451 452 This is based on the TikTok downloader from https://jdownloader.org/ 453 """ 454 video_download_headers = { 455 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0", 456 "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5", 457 "Accept-Language": "en-US,en;q=0.5", 458 # "Range": "bytes=0-", 459 "Connection": "keep-alive", 460 "Referer": "https://www.tiktok.com/", 461 "Sec-Fetch-Dest": "video", 462 "Sec-Fetch-Mode": "no-cors", 463 "Sec-Fetch-Site": "cross-site", 464 "Accept-Encoding": "identity" 465 } 466 session = FuturesSession() 467 468 download_results = {} 469 downloaded_videos = 0 470 metadata_collected = 0 471 video_requests = {} 472 video_download_urls = [] 473 474 while video_ids or video_download_urls or video_requests: 475 # give tasks time to run 476 await asyncio.sleep(0.1) 477 478 available_proxies = self.get_available_proxies() 479 480 for available_proxy in available_proxies: 481 if downloaded_videos > max_videos: 482 # We're done here 483 video_ids = [] 484 video_download_urls = [] 485 break 486 487 # Download videos (if available) 488 if video_download_urls: 489 video_id, video_download_url = video_download_urls.pop(0) 490 proxy = {"http": available_proxy, 491 "https": available_proxy} if available_proxy != "__localhost__" else None 492 session.headers.update(video_download_headers) 493 video_requests[video_download_url] = { 494 "request": session.get(video_download_url, proxies=proxy, timeout=30), # not using stream=True here, as it seems slower; possibly due to short video lengths 495 "video_id": video_id, 496 "type": "download", 497 } 498 self.proxy_map[available_proxy].update({ 499 "busy": True, 500 "url": video_download_url 501 }) 502 # Collect video metadata (to find videos to download) 503 elif video_ids: 504 video_id = video_ids.pop(0) 505 url = f"https://www.tiktok.com/embed/v2/{video_id}" 506 507 proxy = {"http": available_proxy, 508 "https": available_proxy} if available_proxy != "__localhost__" else None 509 session.headers.update(self.headers) 510 video_requests[url] = { 511 "request": session.get(url, proxies=proxy, timeout=30), 512 "video_id": video_id, 513 "type": "metadata", 514 } 515 self.proxy_map[available_proxy].update({ 516 "busy": True, 517 "url": url 518 }) 519 520 # wait for async requests to end (after cancelling) before quitting 521 # the worker 522 if self.processor.interrupted: 523 for request in video_requests.values(): 524 request["request"].cancel() 525 526 max_timeout = time.time() + 20 527 while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout: 528 await asyncio.sleep(0.5) 529 530 raise WorkerInterruptedException("Interrupted while downloading TikTok videos") 531 532 # Extract video download URLs 533 for url in list(video_requests.keys()): 534 video_id = video_requests[url]["video_id"] 535 request = video_requests[url]["request"] 536 request_type = video_requests[url]["type"] 537 request_metadata = { 538 "success": False, 539 "url": url, 540 "error": None, 541 "from_dataset": self.processor.source_dataset.key, 542 "post_ids": [video_id], 543 } 544 if not request.done(): 545 continue 546 547 # Release proxy 548 self.release_proxy(url) 549 550 # Collect response 551 try: 552 response = request.result() 553 except requests.exceptions.RequestException as e: 554 error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})" 555 request_metadata["error"] = error_message 556 download_results[video_id] = request_metadata 557 self.processor.dataset.log(error_message) 558 continue 559 finally: 560 del video_requests[url] 561 562 if response.status_code != 200: 563 error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping." 564 request_metadata["error"] = error_message 565 download_results[video_id] = request_metadata 566 self.processor.dataset.log(error_message) 567 continue 568 569 if request_type == "metadata": 570 # Collect Video Download URL 571 soup = BeautifulSoup(response.text, "html.parser") 572 json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__") 573 video_metadata = None 574 try: 575 if json_source.text: 576 video_metadata = json.loads(json_source.text) 577 elif json_source.contents[0]: 578 video_metadata = json.loads(json_source.contents[0]) 579 except json.JSONDecodeError as e: 580 self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}") 581 582 if not video_metadata: 583 # Failed to collect metadata 584 error_message = f"Failed to find metadata for video {video_id}" 585 request_metadata["error"] = error_message 586 download_results[video_id] = request_metadata 587 self.processor.dataset.log(error_message) 588 self.consecutive_failures += 1 589 if self.consecutive_failures > 5: 590 raise ProcessorException("Too many consecutive failures, stopping") 591 continue 592 593 try: 594 url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0] 595 except (KeyError, IndexError): 596 error_message = f"vid: {video_id} - failed to find video download URL" 597 request_metadata["error"] = error_message 598 download_results[video_id] = request_metadata 599 self.processor.dataset.log(error_message) 600 self.processor.dataset.log(video_metadata["source"]["data"].values()) 601 continue 602 603 # Add new download URL to be collected 604 self.consecutive_failures = 0 605 video_download_urls.append((video_id, url)) 606 metadata_collected += 1 607 self.processor.dataset.update_status("Collected metadata for %i/%i videos" % 608 (metadata_collected, max_videos)) 609 self.processor.dataset.update_progress(metadata_collected / max_videos) 610 611 elif request_type == "download": 612 # Download video 613 with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f: 614 for chunk in response.iter_content(chunk_size=1024 * 1024): 615 if chunk: 616 f.write(chunk) 617 request_metadata["success"] = True 618 request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}] 619 download_results[video_id] = request_metadata 620 621 downloaded_videos += 1 622 self.processor.dataset.update_status("Downloaded %i/%i videos" % 623 (downloaded_videos, max_videos)) 624 self.processor.dataset.update_progress(downloaded_videos / max_videos) 625 626 return download_results
153 def __init__(self, processor, config): 154 """ 155 :param Processor processor: The processor using this function and needing updates 156 """ 157 self.proxy_map = {} 158 self.processor = processor
Parameters
- Processor processor: The processor using this function and needing updates
160 def update_proxies(self): 161 """ 162 Get proxies that are available 163 164 :return: 165 """ 166 all_proxies = self.processor.config.get("tiktok-urls-search.proxies") 167 self.proxy_sleep = self.processor.config.get("tiktok-urls-search.proxies.wait", self.proxy_sleep) 168 if not all_proxies: 169 # no proxies? just request directly 170 all_proxies = ["__localhost__"] 171 172 for proxy in all_proxies: 173 if proxy in self.proxy_map: 174 continue 175 else: 176 self.proxy_map[proxy] = { 177 "busy": False, 178 "url": None, 179 "next_request": 0 180 } 181 182 for proxy in list(self.proxy_map.keys()): 183 if proxy not in all_proxies: 184 del self.proxy_map[proxy]
Get proxies that are available
Returns
186 def get_available_proxies(self): 187 """ 188 Collect proxies from proxy_map that are ready for new requests 189 """ 190 # update proxies every 5 seconds so we can potentially update them 191 # while the scrape is running 192 if self.last_proxy_update < time.time(): 193 self.update_proxies() 194 self.last_proxy_update = time.time() + 5 195 196 # find out whether there is any connection we can use to send the 197 # next request 198 available_proxies = [proxy for proxy in self.proxy_map if 199 not self.proxy_map[proxy]["busy"] and self.proxy_map[proxy]["next_request"] <= time.time()] 200 201 if not available_proxies: 202 # No proxy available 203 if self.last_time_proxy_available is None: 204 # First run, possibly issue, but this will allow it to time out 205 self.processor.dataset.log("No available proxy found at start of request_metadata") 206 self.last_time_proxy_available = time.time() 207 208 if self.last_time_proxy_available + self.no_available_proxy_timeout < time.time(): 209 # No available proxies in timeout period 210 raise ProcessorException(f"Error: No proxy found available after {self.no_available_proxy_timeout}") 211 else: 212 self.last_time_proxy_available = time.time() 213 214 return available_proxies
Collect proxies from proxy_map that are ready for new requests
216 def release_proxy(self, url): 217 """ 218 Release a proxy to be used later 219 """ 220 # Release proxy 221 used_proxy = [proxy for proxy in self.proxy_map if self.proxy_map[proxy]["url"] == url] 222 if used_proxy: 223 used_proxy = used_proxy[0] 224 self.proxy_map[used_proxy].update({ 225 "busy": False, 226 "next_request": time.time() + self.proxy_sleep 227 }) 228 else: 229 # TODO: why are we releasing a proxy without a URL? 230 self.processor.dataset.log(f"Unable to find and release proxy associated with {url}") 231 pass
Release a proxy to be used later
233 async def request_metadata(self, urls): 234 """ 235 Request TikTok metadata for a list of URLs 236 237 Uses asyncio to request URLs concurrently if proxy servers are 238 available. Returns a list of metadata, one object per video. 239 240 :param list urls: URLs to collect data for 241 :return list: Metadata 242 """ 243 session = FuturesSession() 244 session.headers.update(self.headers) 245 tiktok_requests = {} 246 finished = 0 247 num_urls = len(urls) 248 seen_urls = set() 249 250 results = [] 251 failed = 0 252 dupes = 0 253 retries = {} 254 255 while urls or tiktok_requests: 256 # give tasks time to run 257 await asyncio.sleep(0.1) 258 259 available_proxies = self.get_available_proxies() 260 261 for available_proxy in available_proxies: 262 url = None 263 while urls and url is None: 264 url = urls.pop(0) 265 url = url.replace("https://", "http://") # https is finicky, lots of blocks 266 267 # Check if url already collected or should be retried 268 if url in seen_urls and url not in retries: 269 finished += 1 270 dupes += 1 271 self.processor.dataset.log("Skipping duplicate of %s" % url) 272 url = None 273 continue 274 275 # Add url to be collected 276 self.processor.dataset.log(f"Requesting: {url}") 277 proxy = {"http": available_proxy, 278 "https": available_proxy} if available_proxy != "__localhost__" else None 279 tiktok_requests[url] = session.get(url, proxies=proxy, timeout=30) 280 seen_urls.add(url) 281 self.proxy_map[available_proxy].update({ 282 "busy": True, 283 "url": url 284 }) 285 286 # wait for async requests to end (after cancelling) before quitting 287 # the worker 288 if self.processor.interrupted: 289 for request in tiktok_requests.values(): 290 request.cancel() 291 292 max_timeout = time.time() + 20 293 while not all([r for r in tiktok_requests.values() if r.done()]) and time.time() < max_timeout: 294 await asyncio.sleep(0.5) 295 296 raise WorkerInterruptedException("Interrupted while fetching TikTok metadata") 297 298 # handle received data 299 for url in list(tiktok_requests.keys()): 300 request = tiktok_requests[url] 301 if not request.done(): 302 continue 303 304 finished += 1 305 self.release_proxy(url) 306 307 # handle the exceptions we know to expect - else just raise and 308 # log 309 exception = request.exception() 310 if exception: 311 failed += 1 312 if isinstance(exception, requests.exceptions.RequestException): 313 self.processor.dataset.update_status( 314 "Video at %s could not be retrieved (%s: %s)" % (url, type(exception).__name__, exception)) 315 else: 316 raise exception 317 318 # retry on requestexceptions 319 try: 320 response = request.result() 321 except requests.exceptions.RequestException: 322 if url not in retries or retries[url] < 3: 323 if url not in retries: 324 retries[url] = 0 325 retries[url] += 1 326 urls.append(url) 327 continue 328 finally: 329 del tiktok_requests[url] 330 331 # video may not exist 332 if response.status_code == 404: 333 failed += 1 334 self.processor.dataset.log("Video at %s no longer exists (404), skipping" % response.url) 335 continue 336 337 # haven't seen these in the wild - 403 or 429 might happen? 338 elif response.status_code != 200: 339 failed += 1 340 self.processor.dataset.update_status( 341 "Received unexpected HTTP response %i for %s, skipping." % (response.status_code, response.url)) 342 continue 343 344 # now! try to extract the JSON from the page 345 soup = BeautifulSoup(response.text, "html.parser") 346 sigil = soup.select_one("script#SIGI_STATE") 347 348 if not sigil: 349 # alternatively, the JSON is here 350 sigil = soup.select_one("script#__UNIVERSAL_DATA_FOR_REHYDRATION__") 351 352 if not sigil: 353 if url not in retries or retries[url] < 3: 354 if url not in retries: 355 retries[url] = 0 356 retries[url] += 1 357 urls.append(url) 358 self.processor.dataset.log("No embedded metadata found for video %s, retrying" % url) 359 else: 360 failed += 1 361 self.processor.dataset.log("No embedded metadata found for video %s, skipping" % url) 362 continue 363 364 try: 365 if sigil.text: 366 metadata = json.loads(sigil.text) 367 elif sigil.contents and len(sigil.contents) > 0: 368 metadata = json.loads(sigil.contents[0]) 369 else: 370 failed += 1 371 self.processor.dataset.log( 372 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 373 continue 374 except json.JSONDecodeError: 375 failed += 1 376 self.processor.dataset.log( 377 "Embedded metadata was found for video %s, but it could not be parsed, skipping" % url) 378 continue 379 380 for video in self.reformat_metadata(metadata): 381 if video == self.VIDEO_NOT_FOUND: 382 failed += 1 383 self.processor.dataset.log(f"Video for {url} not found, may have been removed, skipping") 384 continue 385 386 if not video.get("stats") or video.get("createTime") == "0": 387 # sometimes there are empty videos? which seems to 388 # indicate a login wall 389 390 self.processor.dataset.log( 391 f"Empty metadata returned for video {url} ({video['id']}), skipping. This likely means that the post requires logging in to view.") 392 continue 393 else: 394 results.append(video) 395 396 self.processor.dataset.update_status("Processed %s of %s TikTok URLs" % 397 ("{:,}".format(finished), "{:,}".format(num_urls))) 398 self.processor.dataset.update_progress(finished / num_urls) 399 400 notes = [] 401 if failed: 402 notes.append("%s URL(s) failed or did not exist anymore" % "{:,}".format(failed)) 403 if dupes: 404 notes.append("skipped %s duplicate(s)" % "{:,}".format(dupes)) 405 406 if notes: 407 self.processor.dataset.update_status("Dataset completed, but not all URLs were collected (%s). See " 408 "dataset log for details." % ", ".join(notes)) 409 410 return results
Request TikTok metadata for a list of URLs
Uses asyncio to request URLs concurrently if proxy servers are available. Returns a list of metadata, one object per video.
Parameters
- list urls: URLs to collect data for
Returns
Metadata
412 def reformat_metadata(self, metadata): 413 """ 414 Take embedded JSON and yield one item per post 415 416 :param dict metadata: Metadata extracted from the TikTok video page 417 :return: Yields one dictionary per video 418 """ 419 # may need some extra parsing to find the item data... 420 if "__DEFAULT_SCOPE__" in metadata and "webapp.video-detail" in metadata["__DEFAULT_SCOPE__"]: 421 try: 422 video = metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]["itemInfo"]["itemStruct"] 423 except KeyError as e: 424 if "statusCode" in metadata["__DEFAULT_SCOPE__"]["webapp.video-detail"]: 425 yield self.VIDEO_NOT_FOUND 426 return 427 else: 428 raise e.__class__ from e 429 430 metadata = {"ItemModule": { 431 video["id"]: video 432 }} 433 434 if "ItemModule" in metadata: 435 for video_id, item in metadata["ItemModule"].items(): 436 if "CommentItem" in metadata: 437 comments = {i: c for i, c in metadata["CommentItem"].items() if c["aweme_id"] == video_id} 438 if "UserModule" in metadata: 439 for comment_id in list(comments.keys()): 440 username = comments[comment_id]["user"] 441 comments[comment_id]["user"] = metadata["UserModule"].get("users", {}).get(username, 442 username) 443 else: 444 comments = {} 445 446 yield {**item, "comments": list(comments.values())}
Take embedded JSON and yield one item per post
Parameters
- dict metadata: Metadata extracted from the TikTok video page
Returns
Yields one dictionary per video
448 async def download_videos(self, video_ids, staging_area, max_videos): 449 """ 450 Download TikTok Videos 451 452 This is based on the TikTok downloader from https://jdownloader.org/ 453 """ 454 video_download_headers = { 455 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/110.0", 456 "Accept": "video/webm,video/ogg,video/*;q=0.9,application/ogg;q=0.7,audio/*;q=0.6,*/*;q=0.5", 457 "Accept-Language": "en-US,en;q=0.5", 458 # "Range": "bytes=0-", 459 "Connection": "keep-alive", 460 "Referer": "https://www.tiktok.com/", 461 "Sec-Fetch-Dest": "video", 462 "Sec-Fetch-Mode": "no-cors", 463 "Sec-Fetch-Site": "cross-site", 464 "Accept-Encoding": "identity" 465 } 466 session = FuturesSession() 467 468 download_results = {} 469 downloaded_videos = 0 470 metadata_collected = 0 471 video_requests = {} 472 video_download_urls = [] 473 474 while video_ids or video_download_urls or video_requests: 475 # give tasks time to run 476 await asyncio.sleep(0.1) 477 478 available_proxies = self.get_available_proxies() 479 480 for available_proxy in available_proxies: 481 if downloaded_videos > max_videos: 482 # We're done here 483 video_ids = [] 484 video_download_urls = [] 485 break 486 487 # Download videos (if available) 488 if video_download_urls: 489 video_id, video_download_url = video_download_urls.pop(0) 490 proxy = {"http": available_proxy, 491 "https": available_proxy} if available_proxy != "__localhost__" else None 492 session.headers.update(video_download_headers) 493 video_requests[video_download_url] = { 494 "request": session.get(video_download_url, proxies=proxy, timeout=30), # not using stream=True here, as it seems slower; possibly due to short video lengths 495 "video_id": video_id, 496 "type": "download", 497 } 498 self.proxy_map[available_proxy].update({ 499 "busy": True, 500 "url": video_download_url 501 }) 502 # Collect video metadata (to find videos to download) 503 elif video_ids: 504 video_id = video_ids.pop(0) 505 url = f"https://www.tiktok.com/embed/v2/{video_id}" 506 507 proxy = {"http": available_proxy, 508 "https": available_proxy} if available_proxy != "__localhost__" else None 509 session.headers.update(self.headers) 510 video_requests[url] = { 511 "request": session.get(url, proxies=proxy, timeout=30), 512 "video_id": video_id, 513 "type": "metadata", 514 } 515 self.proxy_map[available_proxy].update({ 516 "busy": True, 517 "url": url 518 }) 519 520 # wait for async requests to end (after cancelling) before quitting 521 # the worker 522 if self.processor.interrupted: 523 for request in video_requests.values(): 524 request["request"].cancel() 525 526 max_timeout = time.time() + 20 527 while not all([r["request"] for r in video_requests.values() if r["request"].done()]) and time.time() < max_timeout: 528 await asyncio.sleep(0.5) 529 530 raise WorkerInterruptedException("Interrupted while downloading TikTok videos") 531 532 # Extract video download URLs 533 for url in list(video_requests.keys()): 534 video_id = video_requests[url]["video_id"] 535 request = video_requests[url]["request"] 536 request_type = video_requests[url]["type"] 537 request_metadata = { 538 "success": False, 539 "url": url, 540 "error": None, 541 "from_dataset": self.processor.source_dataset.key, 542 "post_ids": [video_id], 543 } 544 if not request.done(): 545 continue 546 547 # Release proxy 548 self.release_proxy(url) 549 550 # Collect response 551 try: 552 response = request.result() 553 except requests.exceptions.RequestException as e: 554 error_message = f"URL {url} could not be retrieved ({type(e).__name__}: {e})" 555 request_metadata["error"] = error_message 556 download_results[video_id] = request_metadata 557 self.processor.dataset.log(error_message) 558 continue 559 finally: 560 del video_requests[url] 561 562 if response.status_code != 200: 563 error_message = f"Received unexpected HTTP response ({response.status_code}) {response.reason} for {url}, skipping." 564 request_metadata["error"] = error_message 565 download_results[video_id] = request_metadata 566 self.processor.dataset.log(error_message) 567 continue 568 569 if request_type == "metadata": 570 # Collect Video Download URL 571 soup = BeautifulSoup(response.text, "html.parser") 572 json_source = soup.select_one("script#__FRONTITY_CONNECT_STATE__") 573 video_metadata = None 574 try: 575 if json_source.text: 576 video_metadata = json.loads(json_source.text) 577 elif json_source.contents[0]: 578 video_metadata = json.loads(json_source.contents[0]) 579 except json.JSONDecodeError as e: 580 self.processor.dataset.log(f"JSONDecodeError for video {video_id} metadata: {e}\n{json_source}") 581 582 if not video_metadata: 583 # Failed to collect metadata 584 error_message = f"Failed to find metadata for video {video_id}" 585 request_metadata["error"] = error_message 586 download_results[video_id] = request_metadata 587 self.processor.dataset.log(error_message) 588 self.consecutive_failures += 1 589 if self.consecutive_failures > 5: 590 raise ProcessorException("Too many consecutive failures, stopping") 591 continue 592 593 try: 594 url = list(video_metadata["source"]["data"].values())[0]["videoData"]["itemInfos"]["video"]["urls"][0] 595 except (KeyError, IndexError): 596 error_message = f"vid: {video_id} - failed to find video download URL" 597 request_metadata["error"] = error_message 598 download_results[video_id] = request_metadata 599 self.processor.dataset.log(error_message) 600 self.processor.dataset.log(video_metadata["source"]["data"].values()) 601 continue 602 603 # Add new download URL to be collected 604 self.consecutive_failures = 0 605 video_download_urls.append((video_id, url)) 606 metadata_collected += 1 607 self.processor.dataset.update_status("Collected metadata for %i/%i videos" % 608 (metadata_collected, max_videos)) 609 self.processor.dataset.update_progress(metadata_collected / max_videos) 610 611 elif request_type == "download": 612 # Download video 613 with open(staging_area.joinpath(video_id).with_suffix('.mp4'), "wb") as f: 614 for chunk in response.iter_content(chunk_size=1024 * 1024): 615 if chunk: 616 f.write(chunk) 617 request_metadata["success"] = True 618 request_metadata["files"] = [{"filename": video_id + ".mp4", "success": True}] 619 download_results[video_id] = request_metadata 620 621 downloaded_videos += 1 622 self.processor.dataset.update_status("Downloaded %i/%i videos" % 623 (downloaded_videos, max_videos)) 624 self.processor.dataset.update_progress(downloaded_videos / max_videos) 625 626 return download_results
Download TikTok Videos
This is based on the TikTok downloader from https://jdownloader.org/