datasources.tumblr.search_tumblr
Search Tumblr via its API
Can fetch posts from specific blogs or with specific hashtags
1""" 2Search Tumblr via its API 3 4Can fetch posts from specific blogs or with specific hashtags 5""" 6 7import time 8import pytumblr 9from requests.exceptions import ConnectionError 10from datetime import datetime 11 12from common.config_manager import config 13from backend.lib.search import Search 14from common.lib.helpers import UserInput 15from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ConfigException 16 17__author__ = "Sal Hagen" 18__credits__ = ["Sal Hagen", "Tumblr API (api.tumblr.com)"] 19__maintainer__ = "Sal Hagen" 20__email__ = "4cat@oilab.eu" 21 22class SearchTumblr(Search): 23 """ 24 Tumblr data filter module. 25 """ 26 type = "tumblr-search" # job ID 27 category = "Search" # category 28 title = "Search Tumblr" # title displayed in UI 29 description = "Retrieve Tumblr posts by hashtag or blog." # description displayed in UI 30 extension = "csv" # extension of result file, used internally and in UI 31 is_local = False # Whether this datasource is locally scraped 32 is_static = False # Whether this datasource is still updated 33 34 # not available as a processor for existing datasets 35 accepts = [None] 36 37 max_workers = 1 38 max_retries = 3 # For API and connection retries. 39 max_date_retries = 96 + 150 # For checking dates. 96 time retries of -6 hours (24 days), plus 150 extra for 150 weeks (~3 years). 40 max_posts = 1000000 41 42 max_posts_reached = False 43 api_limit_reached = False 44 45 seen_ids = set() 46 client = None 47 failed_notes = [] 48 failed_reblogs = [] 49 50 config = { 51 # Tumblr API keys to use for data capturing 52 'api.tumblr.consumer_key': { 53 'type': UserInput.OPTION_TEXT, 54 'default': "", 55 'help': 'Tumblr Consumer Key', 56 'tooltip': "", 57 }, 58 'api.tumblr.consumer_secret': { 59 'type': UserInput.OPTION_TEXT, 60 'default': "", 61 'help': 'Tumblr Consumer Secret Key', 62 'tooltip': "", 63 }, 64 'api.tumblr.key': { 65 'type': UserInput.OPTION_TEXT, 66 'default': "", 67 'help': 'Tumblr API Key', 68 'tooltip': "", 69 }, 70 'api.tumblr.secret_key': { 71 'type': UserInput.OPTION_TEXT, 72 'default': "", 73 'help': 'Tumblr API Secret Key', 74 'tooltip': "", 75 }, 76 } 77 references = ["[Tumblr API documentation](https://www.tumblr.com/docs/en/api/v2)"] 78 79 @classmethod 80 def get_options(cls, parent_dataset=None, user=None): 81 """ 82 Check is Tumbler keys configured and if not, requests from User 83 """ 84 options = { 85 "intro": { 86 "type": UserInput.OPTION_INFO, 87 "help": "Retrieve any kind of Tumblr posts with specific tags or from specific blogs. Gets 100.000 posts " 88 "at max. Insert tags or names of blogs, one on each line. You may insert up to ten tags or " 89 "blogs.\n\nTumblr tags may include whitespace and commas. A `#` before the tag is optional.\n\n" 90 "Tag search only get posts explicitly associated with the exact tag you insert here. Querying " 91 "`gogh` will thus not get posts only tagged with `van gogh`. Keyword search is unfortunately not " 92 "allowed by the [Tumblr API](https://api.tumblr.com).\n\nIf 4CAT reached its Tumblr API rate " 93 "limit, try again 24 hours later." 94 }, 95 "search_scope": { 96 "type": UserInput.OPTION_CHOICE, 97 "help": "Search by", 98 "options": { 99 "tag": "Tag", 100 "blog": "Blog" 101 }, 102 "default": "tag" 103 }, 104 "query": { 105 "type": UserInput.OPTION_TEXT_LARGE, 106 "help": "Tags/blogs", 107 "tooltip": "Separate with commas or new lines." 108 }, 109 "fetch_reblogs": { 110 "type": UserInput.OPTION_TOGGLE, 111 "help": "Also fetch reblogs with text? (warning: slow)", 112 "default": False 113 } 114 } 115 116 try: 117 config_keys = SearchTumblr.get_tumbler_keys(user) 118 except ConfigException: 119 # No 4CAT set keys for user; let user input their own 120 options["key-info"] = { 121 "type": UserInput.OPTION_INFO, 122 "help": "In order to access the Tumblr API, you need to register an application. You can do so " 123 "[here](https://www.tumblr.com/oauth/apps) and use the keys below. You will first get the OAuth " 124 "Consumer Key and Secret, and then the User Token Key and Secret [after entering them here](ht" 125 "tps://api.tumblr.com/console/calls/user/info) and granting access." 126 } 127 options["consumer_key"] = { 128 "type": UserInput.OPTION_TEXT, 129 "sensitive": True, 130 "cache": True, 131 "help": "OAuth Consumer Key" 132 } 133 options["consumer_secret"] = { 134 "type": UserInput.OPTION_TEXT, 135 "sensitive": True, 136 "cache": True, 137 "help": "OAuth Consumer Secret" 138 } 139 options["key"] = { 140 "type": UserInput.OPTION_TEXT, 141 "sensitive": True, 142 "cache": True, 143 "help": "User Token Key" 144 } 145 options["secret_key"] = { 146 "type": UserInput.OPTION_TEXT, 147 "sensitive": True, 148 "cache": True, 149 "help": "User Token Secret" 150 } 151 152 options["divider"] = { 153 "type": UserInput.OPTION_DIVIDER 154 } 155 options["date-intro"] = { 156 "type": UserInput.OPTION_INFO, 157 "help": "**Note:** The [Tumblr API](https://api.tumblr.com) is volatile: when fetching sporadically used " 158 "tags, it may return zero posts, even though older posts exist. To mitigate this, 4CAT decreases " 159 "the date parameter (<code>before</code>) with six hours and sends the query again. This often " 160 "successfully returns older, un-fetched posts. If it didn't find new data after 96 retries (24 " 161 "days), it checks for data up to six years before the last date, decreasing 12 times by 6 months. " 162 "If that also results in nothing, it assumes the dataset is complete. Check the oldest post in " 163 "your dataset to see if it this is indeed the case and whether any odd time gaps exists." 164 } 165 options["daterange"] = { 166 "type": UserInput.OPTION_DATERANGE, 167 "help": "Date range" 168 } 169 170 return options 171 172 def get_items(self, query): 173 """ 174 Fetches data from Tumblr via its API. 175 176 """ 177 178 # ready our parameters 179 parameters = self.dataset.get_parameters() 180 scope = parameters.get("search_scope", "") 181 queries = parameters.get("query").split(", ") 182 fetch_reblogs = parameters.get("fetch_reblogs", False) 183 184 # Store all info here 185 results = [] 186 187 # Store all notes from posts by blogs here 188 all_notes = [] 189 190 # Get date parameters 191 min_date = parameters.get("min_date", None) 192 max_date = parameters.get("max_date", None) 193 194 if min_date: 195 min_date = int(min_date) 196 if max_date: 197 max_date = int(max_date) 198 else: 199 max_date = int(time.time()) 200 201 # Connect to Tumblr API 202 try: 203 self.client = self.connect_to_tumblr() 204 except ConfigException as e: 205 self.log.warning(f"Could not connect to Tumblr API: API keys invalid or not set") 206 self.dataset.finish_with_error(f"Could not connect to Tumblr API: API keys invalid or not set") 207 return 208 except ConnectionRefusedError as e: 209 client_info = self.client.info() 210 self.log.warning(f"Could not connect to Tumblr API: {e}; client_info: {client_info}") 211 self.dataset.finish_with_error(f"Could not connect to Tumblr API: {client_info.get('meta', {}).get('status', '')} - {client_info.get('meta', {}).get('msg', '')}") 212 return 213 214 # for each tag or blog, get post 215 for query in queries: 216 217 # Get posts per tag 218 if scope == "tag": 219 new_results = self.get_posts_by_tag(query, max_date=max_date, min_date=min_date) 220 221 # Get posts per blog 222 elif scope == "blog": 223 new_results, notes = self.get_posts_by_blog(query, max_date=max_date, min_date=min_date) 224 all_notes.append(notes) 225 else: 226 self.dataset.update_status("Invalid scope") 227 break 228 229 results += new_results 230 231 if self.max_posts_reached: 232 self.dataset.update_status("Max posts exceeded") 233 break 234 if self.api_limit_reached: 235 self.dataset.update_status("API limit reached") 236 break 237 238 # If we also want the posts that reblogged the fetched posts: 239 if fetch_reblogs and not self.max_posts_reached and not self.api_limit_reached: 240 self.dataset.update_status("Getting notes from all posts") 241 242 # Reblog information is already returned for blog-level searches 243 if scope == "blog": 244 text_reblogs = [] 245 246 # Loop through and add the text reblogs that came with the results. 247 for post_notes in all_notes: 248 for post_note in post_notes: 249 for note in post_note: 250 if note["type"] == "reblog": 251 text_reblogs.append({note["blog_name"]: note["post_id"]}) 252 253 # Retrieving notes for tag-based posts should be done one-by-one. 254 # Fetching them all at once is not supported by the Tumblr API. 255 elif scope == "tag": 256 # Prepare dicts to pass to `get_post_notes` 257 posts_to_fetch = {result["author"]: result["id"] for result in results} 258 259 # First extract the notes of each post, and only keep text reblogs 260 text_reblogs = self.get_post_notes(posts_to_fetch) 261 262 # Get the full data for text reblogs. 263 if text_reblogs: 264 connection_retries = 0 265 for i, text_reblog in enumerate(text_reblogs): 266 self.dataset.update_status("Got %i/%i text reblogs" % (i, len(text_reblogs))) 267 if connection_retries >= 5: 268 self.dataset.update_status("Multiple connection refused errors; unable to continue collection of reblogs.") 269 break 270 for key, value in text_reblog.items(): 271 if connection_retries >= 5: 272 break 273 try: 274 reblog_post = self.get_post_by_id(key, value) 275 except ConnectionRefusedError: 276 connection_retries += 1 277 self.failed_reblogs.append(key) 278 self.dataset.update_status(f"ConnectionRefused: Unable to collect reblogs for post {key}") 279 continue 280 if reblog_post: 281 reblog_post = self.parse_tumblr_posts([reblog_post], reblog=True) 282 results.append(reblog_post[0]) 283 284 self.job.finish() 285 return results 286 287 def get_posts_by_tag(self, tag, max_date=None, min_date=None): 288 """ 289 Get Tumblr posts posts with a certain tag 290 :param tag, str: the tag you want to look for 291 :param min_date: a unix timestamp, indicates posts should be min_date this date. 292 :param max_date: a unix timestamp, indicates posts should be max_date this date. 293 294 :returns: a dict created from the JSON response 295 """ 296 # Store all posts in here 297 all_posts = [] 298 299 # Some retries to make sure the Tumblr API actually returns everything. 300 retries = 0 301 date_retries = 0 302 303 # We're gonna change max_date, so store a copy for reference. 304 max_date_original = max_date 305 306 # We use the average time difference between posts to spot possible gaps in the data. 307 all_time_difs = [] 308 avg_time_dif = 0 309 time_difs_len = 0 310 311 # Get Tumblr posts until there's no more left. 312 while True: 313 if self.interrupted: 314 raise ProcessorInterruptedException("Interrupted while fetching tag posts from Tumblr") 315 316 # Stop after max for date reductions 317 if date_retries >= self.max_date_retries: 318 self.dataset.update_status("No more posts in this date range") 319 break 320 321 # Stop after max retries for API/connection stuff 322 if retries >= self.max_retries: 323 self.dataset.update_status("No more posts") 324 break 325 326 try: 327 # Use the pytumblr library to make the API call 328 posts = self.client.tagged(tag, before=max_date, limit=20, filter="raw") 329 except ConnectionError: 330 self.update_status("Encountered a connection error, waiting 10 seconds.") 331 time.sleep(10) 332 retries += 1 333 continue 334 335 # Get rid of posts that we already enountered, 336 # preventing Tumblr API shenanigans or double posts because of 337 # time reductions. Make sure it's no odd error string, though. 338 unseen_posts = [] 339 for check_post in posts: 340 # Sometimes the API repsonds just with "meta", "response", or "errors". 341 if isinstance(check_post, str): 342 self.dataset.update_status("Couldn't add post:", check_post) 343 retries += 1 344 break 345 else: 346 retries = 0 347 if check_post["id"] not in self.seen_ids: 348 unseen_posts.append(check_post) 349 posts = unseen_posts 350 351 # For no clear reason, the Tumblr API sometimes provides posts with a higher timestamp than requested. 352 # So we have to prevent this manually. 353 if max_date_original: 354 posts = [post for post in posts if post["timestamp"] <= max_date_original] 355 356 max_date_str = datetime.fromtimestamp(max_date).strftime("%Y-%m-%d %H:%M:%S") 357 358 # except Exception as e: 359 # print(e) 360 # self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date)) 361 # self.api_limit_reached = True 362 # break 363 364 # Make sure the Tumblr API doesn't magically stop at an earlier date 365 if not posts: 366 367 date_retries += 1 368 369 # We're first gonna check carefully if there's small timegaps by 370 # decreasing by six hours. 371 # If that didn't result in any new posts, also dedicate 12 date_retries 372 # with reductions of six months, just to be sure there's no data from 373 # years earlier missing. 374 375 if date_retries < 96: 376 max_date -= 21600 # Decrease by six hours 377 self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - decreasing time search with 6 hours to %s to make sure this is really it (retry %s/96)" % (str(len(all_posts)), tag, max_date_str, str(date_retries),)) 378 elif date_retries <= self.max_date_retries: 379 max_date -= 604800 # Decrease by one week 380 retry_str = str(date_retries - 96) 381 self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - no new posts found with decreasing by 6 hours, decreasing with a week to %s instead (retry %s/150)" % (str(len(all_posts)), tag, max_date_str, str(retry_str),)) 382 383 # We can stop when the max date drops below the min date. 384 if min_date: 385 if max_date <= min_date: 386 break 387 388 continue 389 390 # Append posts to main list 391 else: 392 393 posts = self.parse_tumblr_posts(posts) 394 395 # Get all timestamps and sort them. 396 post_dates = sorted([post["timestamp"] for post in posts]) 397 398 # Get the lowest date and use it as the next "before" parameter. 399 max_date = post_dates[0] 400 401 # Tumblr's API is volatile - it doesn't neatly sort posts by date, 402 # so it can happen that there's suddenly huge jumps in time. 403 # Check if this is happening by extracting the difference between all consecutive dates. 404 time_difs = list() 405 post_dates.reverse() 406 407 for i, date in enumerate(post_dates): 408 409 if i == (len(post_dates) - 1): 410 break 411 412 # Calculate and add time differences 413 time_dif = date - post_dates[i + 1] 414 415 # After having collected 250 posts, check whether the time 416 # difference between posts far exceeds the average time difference 417 # between posts. If it's more than five times this amount, 418 # restart the query with the timestamp just before the gap, minus the 419 # average time difference up to this point - something might be up with Tumblr's API. 420 if len(all_posts) >= 250 and time_dif > (avg_time_dif * 5): 421 422 time_str = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M:%S") 423 self.dataset.update_status("Time difference of %s spotted, restarting query at %s" % (str(time_dif), time_str,)) 424 425 self.seen_ids.update([post["id"] for post in posts]) 426 posts = [post for post in posts if post["timestamp"] >= date] 427 if posts: 428 all_posts += posts 429 430 max_date = date 431 break 432 433 time_difs.append(time_dif) 434 435 # To start a new query 436 if not posts: 437 break 438 439 # Manually check if we have a lower date than the lowest allowed date already (min date). 440 # This functonality is not natively supported by Tumblr. 441 if min_date: 442 if max_date < min_date: 443 444 # Get rid of all the posts that are earlier than the max_date timestamp 445 posts = [post for post in posts if post["timestamp"] >= min_date and post["timestamp"] <= max_date_original] 446 447 if posts: 448 all_posts += posts 449 self.seen_ids.update([post["id"] for post in posts]) 450 break 451 452 # We got a new post, so we can reset the retry counts. 453 date_retries = 0 454 retries = 0 455 456 # Add retrieved posts top the main list 457 all_posts += posts 458 459 # Add to seen ids 460 self.seen_ids.update([post["id"] for post in posts]) 461 462 # Add time differences and calculate new average time difference 463 all_time_difs += time_difs 464 465 # Make the average time difference a moving average, 466 # to be flexible with faster and slower post paces. 467 # Delete the first 100 posts every hundred or so items. 468 if (len(all_time_difs) - time_difs_len) > 100: 469 all_time_difs = all_time_difs[time_difs_len:] 470 if all_time_difs: 471 time_difs_len = len(all_time_difs) 472 avg_time_dif = sum(all_time_difs) / len(all_time_difs) 473 474 if len(all_posts) >= self.max_posts: 475 self.max_posts_reached = True 476 break 477 478 self.dataset.update_status("Collected %s posts for tag %s, now looking for posts before %s" % (str(len(all_posts)), tag, max_date_str,)) 479 480 return all_posts 481 482 def get_posts_by_blog(self, blog, max_date=None, min_date=None): 483 """ 484 Get Tumblr posts posts with a certain blog 485 :param tag, str: the name of the blog you want to look for 486 :param min_date: a unix timestamp, indicates posts should be min_date this date. 487 :param max_date: a unix timestamp, indicates posts should be max_date this date. 488 489 :returns: a dict created from the JSON response 490 """ 491 blog = blog + ".tumblr.com" 492 493 if not max_date: 494 max_date = int(time.time()) 495 496 # Store all posts in here 497 all_posts = [] 498 499 # Store notes here, if they exist and are requested 500 all_notes = [] 501 502 # Some retries to make sure the Tumblr API actually returns everything 503 retries = 0 504 self.max_retries = 48 # 2 days 505 506 # Get Tumblr posts until there's no more left. 507 while True: 508 if self.interrupted: 509 raise ProcessorInterruptedException("Interrupted while fetching blog posts from Tumblr") 510 511 # Stop min_date 20 retries 512 if retries >= self.max_retries: 513 self.dataset.update_status("No more posts") 514 break 515 516 try: 517 # Use the pytumblr library to make the API call 518 posts = self.client.posts(blog, before=max_date, limit=20, reblog_info=True, notes_info=True, filter="raw") 519 posts = posts["posts"] 520 521 #if (max_date - posts[0]["timestamp"]) > 500000: 522 #self.dataset.update_status("ALERT - DATES LIKELY SKIPPED") 523 #self.dataset.update_status([post["timestamp"] for post in posts]) 524 525 except Exception as e: 526 527 self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date)) 528 self.api_limit_reached = True 529 break 530 531 # Make sure the Tumblr API doesn't magically stop at an earlier date 532 if not posts or isinstance(posts, str): 533 retries += 1 534 max_date -= 3600 # Decrease by an hour 535 self.dataset.update_status("No posts returned by Tumblr - checking whether this is really all (retry %s/48)" % str(retries)) 536 continue 537 538 # Append posts to main list 539 else: 540 # Keep the notes, if so indicated 541 if self.parameters.get("fetch_reblogs"): 542 for post in posts: 543 if "notes" in post: 544 all_notes.append(post["notes"]) 545 546 posts = self.parse_tumblr_posts(posts) 547 548 # Get the lowest date 549 max_date = sorted([post["timestamp"] for post in posts])[0] 550 551 # Manually check if we have a lower date than the min date (`min_date`) already. 552 # This functonality is not natively supported by Tumblr. 553 if min_date: 554 if max_date < min_date: 555 556 # Get rid of all the posts that are earlier than the max_date timestamp 557 posts = [post for post in posts if post["timestamp"] >= min_date] 558 559 if posts: 560 all_posts += posts 561 break 562 563 retries = 0 564 565 all_posts += posts 566 567 #if (max_date - posts[len(posts) - 1]["timestamp"]) > 500000: 568 #self.dataset.update_status("ALERT - DATES LIKELY SKIPPED") 569 #self.dataset.update_status([post["timestamp"] for post in posts]) 570 571 if len(all_posts) >= self.max_posts: 572 self.max_posts_reached = True 573 break 574 575 self.dataset.update_status("Collected %s posts" % str(len(all_posts))) 576 577 return all_posts, all_notes 578 579 def get_post_notes(self, di_blogs_ids, only_text_reblogs=True): 580 """ 581 Gets the post notes. 582 :param di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values. 583 :param only_text_reblogs, bool: Whether to only keep notes that are text reblogs. 584 """ 585 # List of dict to get reblogs. Items are: [{"blog_name": post_id}] 586 text_reblogs = [] 587 588 max_date = None 589 590 # Do some counting 591 len_blogs = len(di_blogs_ids) 592 count = 0 593 594 # Stop trying to fetch the notes after this many retries 595 max_notes_retries = 10 596 notes_retries = 0 597 598 for key, value in di_blogs_ids.items(): 599 600 count += 1 601 602 if self.interrupted: 603 raise ProcessorInterruptedException("Interrupted while fetching post notes from Tumblr") 604 605 # First, get the blog names and post_ids from reblogs 606 # Keep digging till there's nothing left, or if we can fetch no new notes 607 while True: 608 609 # Requests a post's notes 610 notes = self.client.notes(key, id=value, before_timestamp=max_date) 611 612 if only_text_reblogs: 613 614 if "notes" in notes: 615 notes_retries = 0 616 617 for note in notes["notes"]: 618 # If it's a reblog, extract the data and save the rest of the posts for later 619 if note["type"] == "reblog": 620 if note.get("added_text"): 621 text_reblogs.append({note["blog_name"]: note["post_id"]}) 622 623 if notes.get("_links"): 624 max_date = notes["_links"]["next"]["query_params"]["before_timestamp"] 625 626 # If there's no `_links` key, that's all. 627 else: 628 break 629 630 # If there's no "notes" key in the returned dict, something might be up 631 else: 632 self.dataset.update_status("Couldn't get notes for Tumblr request " + str(notes)) 633 notes_retries += 1 634 pass 635 636 if notes_retries > max_notes_retries: 637 self.failed_notes.append(key) 638 break 639 640 self.dataset.update_status("Identified %i text reblogs in %i/%i notes" % (len(text_reblogs), count, len_blogs)) 641 642 return text_reblogs 643 644 def get_post_by_id(self, blog_name, post_id): 645 """ 646 Fetch individual posts 647 :param blog_name, str: The blog's name 648 :param id, int: The post ID 649 650 returns result list, a list with a dictionary with the post's information 651 """ 652 if self.interrupted: 653 raise ProcessorInterruptedException("Interrupted while fetching post from Tumblr") 654 655 # Request the specific post. 656 post = self.client.posts(blog_name, id=post_id) 657 658 # Tumblr API can sometimes return with this kind of error: 659 # {'meta': {'status': 500, 'msg': 'Server Error'}, 'response': {'error': 'Malformed JSON or HTML was returned.'}} 660 if "posts" not in post: 661 return None 662 663 # Get the first element of the list - it's always one post. 664 result = post["posts"][0] 665 666 return result 667 668 @staticmethod 669 def get_tumbler_keys(user): 670 config_keys = [ 671 config.get("api.tumblr.consumer_key", user=user), 672 config.get("api.tumblr.consumer_secret", user=user), 673 config.get("api.tumblr.key", user=user), 674 config.get("api.tumblr.secret_key", user=user)] 675 if not all(config_keys): 676 raise ConfigException("Not all Tumblr API credentials are configured. Cannot query Tumblr API.") 677 return config_keys 678 679 def connect_to_tumblr(self): 680 """ 681 Returns a connection to the Tumblr API using the pytumblr library. 682 683 """ 684 # User input keys 685 config_keys = [self.parameters.get("consumer_key"), 686 self.parameters.get("consumer_secret"), 687 self.parameters.get("key"), 688 self.parameters.get("secret_key")] 689 if not all(config_keys): 690 # No user input keys; attempt to use 4CAT config keys 691 config_keys = self.get_tumbler_keys(self.owner) 692 693 self.client = pytumblr.TumblrRestClient(*config_keys) 694 695 client_info = self.client.info() 696 697 # Check if there's any errors 698 if client_info.get("meta"): 699 if client_info["meta"].get("status") == 429: 700 raise ConnectionRefusedError("Tumblr API timed out") 701 702 return self.client 703 704 def validate_query(query, request, user): 705 """ 706 Validate custom data input 707 708 Confirms that the uploaded file is a valid CSV file and, if so, returns 709 some metadata. 710 711 :param dict query: Query parameters, from client-side. 712 :param request: Flask request 713 :param User user: User object of user who has submitted the query 714 :return dict: Safe query parameters 715 """ 716 # no query 4 u 717 if not query.get("query", "").strip(): 718 raise QueryParametersException("You must provide a search query.") 719 720 # reformat queries to be a comma-separated list 721 items = query.get("query").replace("#","") 722 items = items.split("\n") 723 724 # Not more than 10 plox 725 if len(items) > 10: 726 raise QueryParametersException("Only query for ten or less tags or blogs." + str(len(items))) 727 728 # no query 4 u 729 if not items: 730 raise QueryParametersException("Search query cannot be empty.") 731 732 # So it shows nicely in the frontend. 733 items = ", ".join([item.lstrip().rstrip() for item in items if item]) 734 735 # the dates need to make sense as a range to search within 736 query["min_date"], query["max_date"] = query.get("daterange") 737 if any(query.get("daterange")) and not all(query.get("daterange")): 738 raise QueryParametersException("When providing a date range, set both an upper and lower limit.") 739 740 del query["daterange"] 741 742 query["query"] = items 743 query["board"] = query.get("search_scope") + "s" # used in web interface 744 745 # if we made it this far, the query can be executed 746 return query 747 748 def parse_tumblr_posts(self, posts, reblog=False): 749 """ 750 Function to parse Tumblr posts into the same dict items. 751 Tumblr posts can be many different types, so some data processing is necessary. 752 753 :param posts, list: List of Tumblr posts as returned form the Tumblr API. 754 :param reblog, bool: Whether the post concerns a reblog of posts from the original dataset. 755 756 returns list processed_posts, a list with dictionary items of post info. 757 """ 758 759 # Store processed posts here 760 processed_posts = [] 761 762 media_tags = ["photo", "video", "audio"] 763 764 # Loop through all the posts and write a row for each of them. 765 for post in posts: 766 post_type = post["type"] 767 768 # The post's text is in different keys depending on the post type 769 if post_type in media_tags: 770 text = post["caption"] 771 elif post_type == "link": 772 text = post["description"] 773 elif post_type == "text" or post_type == "chat": 774 text = post["body"] 775 elif post_type == "answer": 776 text = post["question"] + "\n" + post["answer"] 777 else: 778 text = "" 779 780 # Different options for video types (YouTube- or Tumblr-hosted) 781 if post_type == "video": 782 783 video_source = post["video_type"] 784 # Use `get` since some videos are deleted 785 video_url = post.get("permalink_url") 786 787 if video_source == "youtube": 788 # There's no URL if the YouTube video is deleted 789 if video_url: 790 video_id = post["video"]["youtube"]["video_id"] 791 else: 792 video_id = "deleted" 793 else: 794 video_id = "unknown" 795 796 else: 797 video_source = None 798 video_id = None 799 video_url = None 800 801 # All the fields to write 802 processed_post = { 803 # General columns 804 "type": post_type, 805 "timestamp": post["timestamp"], 806 "is_reblog": reblog, 807 808 # Blog columns 809 "author": post["blog_name"], 810 "subject": post["blog"]["title"], 811 "blog_description": post["blog"]["description"], 812 "blog_url": post["blog"]["url"], 813 "blog_uuid": post["blog"]["uuid"], 814 "blog_last_updated": post["blog"]["updated"], 815 816 # Post columns 817 "id": post["id"], 818 "post_url": post["post_url"], 819 "post_slug": post["slug"], 820 "thread_id": post["reblog_key"], 821 "body": text.replace("\x00", ""), 822 "tags": ", ".join(post["tags"]) if post.get("tags") else None, 823 "notes": post["note_count"], 824 "urls": post.get("link_url"), 825 "images": ",".join([photo["original_size"]["url"] for photo in post["photos"]]) if post.get("photos") else None, 826 827 # Optional video columns 828 "video_source": video_source if post_type == "video" else None, 829 "video_url": video_url if post_type == "video" else None, 830 "video_id": video_id if post_type == "video" else None, 831 "video_thumb": post.get("thumbnail_url"), # Can be deleted 832 833 # Optional audio columns 834 "audio_type": post.get("audio_type"), 835 "audio_url": post.get("audio_source_url"), 836 "audio_plays": post.get("plays"), 837 838 # Optional link columns 839 "link_author": post.get("link_author"), 840 "link_publisher": post.get("publisher"), 841 "link_image": post.get("link_image"), 842 843 # Optional answers columns 844 "asking_name": post.get("asking_name"), 845 "asking_url": post.get("asking_url"), 846 "question": post.get("question"), 847 "answer": post.get("answer"), 848 849 # Optional chat columns 850 "chat": post.get("dialogue") 851 } 852 853 # Store the processed post 854 processed_posts.append(processed_post) 855 856 return processed_posts 857 858 def after_process(self): 859 """ 860 Override of the same function in processor.py 861 Used to notify of potential API errors. 862 863 """ 864 super().after_process() 865 self.client = None 866 errors = [] 867 if len(self.failed_notes) > 0: 868 errors.append("API error(s) when fetching notes %s" % ", ".join(self.failed_notes)) 869 if len(self.failed_reblogs) > 0: 870 errors.append("API error(s) when fetching reblogs %s" % ", ".join(self.failed_reblogs)) 871 if errors: 872 self.dataset.log(";\n ".join(errors)) 873 self.dataset.update_status(f"Dataset completed but failed to capture some notes/reblogs; see log for details.")
23class SearchTumblr(Search): 24 """ 25 Tumblr data filter module. 26 """ 27 type = "tumblr-search" # job ID 28 category = "Search" # category 29 title = "Search Tumblr" # title displayed in UI 30 description = "Retrieve Tumblr posts by hashtag or blog." # description displayed in UI 31 extension = "csv" # extension of result file, used internally and in UI 32 is_local = False # Whether this datasource is locally scraped 33 is_static = False # Whether this datasource is still updated 34 35 # not available as a processor for existing datasets 36 accepts = [None] 37 38 max_workers = 1 39 max_retries = 3 # For API and connection retries. 40 max_date_retries = 96 + 150 # For checking dates. 96 time retries of -6 hours (24 days), plus 150 extra for 150 weeks (~3 years). 41 max_posts = 1000000 42 43 max_posts_reached = False 44 api_limit_reached = False 45 46 seen_ids = set() 47 client = None 48 failed_notes = [] 49 failed_reblogs = [] 50 51 config = { 52 # Tumblr API keys to use for data capturing 53 'api.tumblr.consumer_key': { 54 'type': UserInput.OPTION_TEXT, 55 'default': "", 56 'help': 'Tumblr Consumer Key', 57 'tooltip': "", 58 }, 59 'api.tumblr.consumer_secret': { 60 'type': UserInput.OPTION_TEXT, 61 'default': "", 62 'help': 'Tumblr Consumer Secret Key', 63 'tooltip': "", 64 }, 65 'api.tumblr.key': { 66 'type': UserInput.OPTION_TEXT, 67 'default': "", 68 'help': 'Tumblr API Key', 69 'tooltip': "", 70 }, 71 'api.tumblr.secret_key': { 72 'type': UserInput.OPTION_TEXT, 73 'default': "", 74 'help': 'Tumblr API Secret Key', 75 'tooltip': "", 76 }, 77 } 78 references = ["[Tumblr API documentation](https://www.tumblr.com/docs/en/api/v2)"] 79 80 @classmethod 81 def get_options(cls, parent_dataset=None, user=None): 82 """ 83 Check is Tumbler keys configured and if not, requests from User 84 """ 85 options = { 86 "intro": { 87 "type": UserInput.OPTION_INFO, 88 "help": "Retrieve any kind of Tumblr posts with specific tags or from specific blogs. Gets 100.000 posts " 89 "at max. Insert tags or names of blogs, one on each line. You may insert up to ten tags or " 90 "blogs.\n\nTumblr tags may include whitespace and commas. A `#` before the tag is optional.\n\n" 91 "Tag search only get posts explicitly associated with the exact tag you insert here. Querying " 92 "`gogh` will thus not get posts only tagged with `van gogh`. Keyword search is unfortunately not " 93 "allowed by the [Tumblr API](https://api.tumblr.com).\n\nIf 4CAT reached its Tumblr API rate " 94 "limit, try again 24 hours later." 95 }, 96 "search_scope": { 97 "type": UserInput.OPTION_CHOICE, 98 "help": "Search by", 99 "options": { 100 "tag": "Tag", 101 "blog": "Blog" 102 }, 103 "default": "tag" 104 }, 105 "query": { 106 "type": UserInput.OPTION_TEXT_LARGE, 107 "help": "Tags/blogs", 108 "tooltip": "Separate with commas or new lines." 109 }, 110 "fetch_reblogs": { 111 "type": UserInput.OPTION_TOGGLE, 112 "help": "Also fetch reblogs with text? (warning: slow)", 113 "default": False 114 } 115 } 116 117 try: 118 config_keys = SearchTumblr.get_tumbler_keys(user) 119 except ConfigException: 120 # No 4CAT set keys for user; let user input their own 121 options["key-info"] = { 122 "type": UserInput.OPTION_INFO, 123 "help": "In order to access the Tumblr API, you need to register an application. You can do so " 124 "[here](https://www.tumblr.com/oauth/apps) and use the keys below. You will first get the OAuth " 125 "Consumer Key and Secret, and then the User Token Key and Secret [after entering them here](ht" 126 "tps://api.tumblr.com/console/calls/user/info) and granting access." 127 } 128 options["consumer_key"] = { 129 "type": UserInput.OPTION_TEXT, 130 "sensitive": True, 131 "cache": True, 132 "help": "OAuth Consumer Key" 133 } 134 options["consumer_secret"] = { 135 "type": UserInput.OPTION_TEXT, 136 "sensitive": True, 137 "cache": True, 138 "help": "OAuth Consumer Secret" 139 } 140 options["key"] = { 141 "type": UserInput.OPTION_TEXT, 142 "sensitive": True, 143 "cache": True, 144 "help": "User Token Key" 145 } 146 options["secret_key"] = { 147 "type": UserInput.OPTION_TEXT, 148 "sensitive": True, 149 "cache": True, 150 "help": "User Token Secret" 151 } 152 153 options["divider"] = { 154 "type": UserInput.OPTION_DIVIDER 155 } 156 options["date-intro"] = { 157 "type": UserInput.OPTION_INFO, 158 "help": "**Note:** The [Tumblr API](https://api.tumblr.com) is volatile: when fetching sporadically used " 159 "tags, it may return zero posts, even though older posts exist. To mitigate this, 4CAT decreases " 160 "the date parameter (<code>before</code>) with six hours and sends the query again. This often " 161 "successfully returns older, un-fetched posts. If it didn't find new data after 96 retries (24 " 162 "days), it checks for data up to six years before the last date, decreasing 12 times by 6 months. " 163 "If that also results in nothing, it assumes the dataset is complete. Check the oldest post in " 164 "your dataset to see if it this is indeed the case and whether any odd time gaps exists." 165 } 166 options["daterange"] = { 167 "type": UserInput.OPTION_DATERANGE, 168 "help": "Date range" 169 } 170 171 return options 172 173 def get_items(self, query): 174 """ 175 Fetches data from Tumblr via its API. 176 177 """ 178 179 # ready our parameters 180 parameters = self.dataset.get_parameters() 181 scope = parameters.get("search_scope", "") 182 queries = parameters.get("query").split(", ") 183 fetch_reblogs = parameters.get("fetch_reblogs", False) 184 185 # Store all info here 186 results = [] 187 188 # Store all notes from posts by blogs here 189 all_notes = [] 190 191 # Get date parameters 192 min_date = parameters.get("min_date", None) 193 max_date = parameters.get("max_date", None) 194 195 if min_date: 196 min_date = int(min_date) 197 if max_date: 198 max_date = int(max_date) 199 else: 200 max_date = int(time.time()) 201 202 # Connect to Tumblr API 203 try: 204 self.client = self.connect_to_tumblr() 205 except ConfigException as e: 206 self.log.warning(f"Could not connect to Tumblr API: API keys invalid or not set") 207 self.dataset.finish_with_error(f"Could not connect to Tumblr API: API keys invalid or not set") 208 return 209 except ConnectionRefusedError as e: 210 client_info = self.client.info() 211 self.log.warning(f"Could not connect to Tumblr API: {e}; client_info: {client_info}") 212 self.dataset.finish_with_error(f"Could not connect to Tumblr API: {client_info.get('meta', {}).get('status', '')} - {client_info.get('meta', {}).get('msg', '')}") 213 return 214 215 # for each tag or blog, get post 216 for query in queries: 217 218 # Get posts per tag 219 if scope == "tag": 220 new_results = self.get_posts_by_tag(query, max_date=max_date, min_date=min_date) 221 222 # Get posts per blog 223 elif scope == "blog": 224 new_results, notes = self.get_posts_by_blog(query, max_date=max_date, min_date=min_date) 225 all_notes.append(notes) 226 else: 227 self.dataset.update_status("Invalid scope") 228 break 229 230 results += new_results 231 232 if self.max_posts_reached: 233 self.dataset.update_status("Max posts exceeded") 234 break 235 if self.api_limit_reached: 236 self.dataset.update_status("API limit reached") 237 break 238 239 # If we also want the posts that reblogged the fetched posts: 240 if fetch_reblogs and not self.max_posts_reached and not self.api_limit_reached: 241 self.dataset.update_status("Getting notes from all posts") 242 243 # Reblog information is already returned for blog-level searches 244 if scope == "blog": 245 text_reblogs = [] 246 247 # Loop through and add the text reblogs that came with the results. 248 for post_notes in all_notes: 249 for post_note in post_notes: 250 for note in post_note: 251 if note["type"] == "reblog": 252 text_reblogs.append({note["blog_name"]: note["post_id"]}) 253 254 # Retrieving notes for tag-based posts should be done one-by-one. 255 # Fetching them all at once is not supported by the Tumblr API. 256 elif scope == "tag": 257 # Prepare dicts to pass to `get_post_notes` 258 posts_to_fetch = {result["author"]: result["id"] for result in results} 259 260 # First extract the notes of each post, and only keep text reblogs 261 text_reblogs = self.get_post_notes(posts_to_fetch) 262 263 # Get the full data for text reblogs. 264 if text_reblogs: 265 connection_retries = 0 266 for i, text_reblog in enumerate(text_reblogs): 267 self.dataset.update_status("Got %i/%i text reblogs" % (i, len(text_reblogs))) 268 if connection_retries >= 5: 269 self.dataset.update_status("Multiple connection refused errors; unable to continue collection of reblogs.") 270 break 271 for key, value in text_reblog.items(): 272 if connection_retries >= 5: 273 break 274 try: 275 reblog_post = self.get_post_by_id(key, value) 276 except ConnectionRefusedError: 277 connection_retries += 1 278 self.failed_reblogs.append(key) 279 self.dataset.update_status(f"ConnectionRefused: Unable to collect reblogs for post {key}") 280 continue 281 if reblog_post: 282 reblog_post = self.parse_tumblr_posts([reblog_post], reblog=True) 283 results.append(reblog_post[0]) 284 285 self.job.finish() 286 return results 287 288 def get_posts_by_tag(self, tag, max_date=None, min_date=None): 289 """ 290 Get Tumblr posts posts with a certain tag 291 :param tag, str: the tag you want to look for 292 :param min_date: a unix timestamp, indicates posts should be min_date this date. 293 :param max_date: a unix timestamp, indicates posts should be max_date this date. 294 295 :returns: a dict created from the JSON response 296 """ 297 # Store all posts in here 298 all_posts = [] 299 300 # Some retries to make sure the Tumblr API actually returns everything. 301 retries = 0 302 date_retries = 0 303 304 # We're gonna change max_date, so store a copy for reference. 305 max_date_original = max_date 306 307 # We use the average time difference between posts to spot possible gaps in the data. 308 all_time_difs = [] 309 avg_time_dif = 0 310 time_difs_len = 0 311 312 # Get Tumblr posts until there's no more left. 313 while True: 314 if self.interrupted: 315 raise ProcessorInterruptedException("Interrupted while fetching tag posts from Tumblr") 316 317 # Stop after max for date reductions 318 if date_retries >= self.max_date_retries: 319 self.dataset.update_status("No more posts in this date range") 320 break 321 322 # Stop after max retries for API/connection stuff 323 if retries >= self.max_retries: 324 self.dataset.update_status("No more posts") 325 break 326 327 try: 328 # Use the pytumblr library to make the API call 329 posts = self.client.tagged(tag, before=max_date, limit=20, filter="raw") 330 except ConnectionError: 331 self.update_status("Encountered a connection error, waiting 10 seconds.") 332 time.sleep(10) 333 retries += 1 334 continue 335 336 # Get rid of posts that we already enountered, 337 # preventing Tumblr API shenanigans or double posts because of 338 # time reductions. Make sure it's no odd error string, though. 339 unseen_posts = [] 340 for check_post in posts: 341 # Sometimes the API repsonds just with "meta", "response", or "errors". 342 if isinstance(check_post, str): 343 self.dataset.update_status("Couldn't add post:", check_post) 344 retries += 1 345 break 346 else: 347 retries = 0 348 if check_post["id"] not in self.seen_ids: 349 unseen_posts.append(check_post) 350 posts = unseen_posts 351 352 # For no clear reason, the Tumblr API sometimes provides posts with a higher timestamp than requested. 353 # So we have to prevent this manually. 354 if max_date_original: 355 posts = [post for post in posts if post["timestamp"] <= max_date_original] 356 357 max_date_str = datetime.fromtimestamp(max_date).strftime("%Y-%m-%d %H:%M:%S") 358 359 # except Exception as e: 360 # print(e) 361 # self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date)) 362 # self.api_limit_reached = True 363 # break 364 365 # Make sure the Tumblr API doesn't magically stop at an earlier date 366 if not posts: 367 368 date_retries += 1 369 370 # We're first gonna check carefully if there's small timegaps by 371 # decreasing by six hours. 372 # If that didn't result in any new posts, also dedicate 12 date_retries 373 # with reductions of six months, just to be sure there's no data from 374 # years earlier missing. 375 376 if date_retries < 96: 377 max_date -= 21600 # Decrease by six hours 378 self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - decreasing time search with 6 hours to %s to make sure this is really it (retry %s/96)" % (str(len(all_posts)), tag, max_date_str, str(date_retries),)) 379 elif date_retries <= self.max_date_retries: 380 max_date -= 604800 # Decrease by one week 381 retry_str = str(date_retries - 96) 382 self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - no new posts found with decreasing by 6 hours, decreasing with a week to %s instead (retry %s/150)" % (str(len(all_posts)), tag, max_date_str, str(retry_str),)) 383 384 # We can stop when the max date drops below the min date. 385 if min_date: 386 if max_date <= min_date: 387 break 388 389 continue 390 391 # Append posts to main list 392 else: 393 394 posts = self.parse_tumblr_posts(posts) 395 396 # Get all timestamps and sort them. 397 post_dates = sorted([post["timestamp"] for post in posts]) 398 399 # Get the lowest date and use it as the next "before" parameter. 400 max_date = post_dates[0] 401 402 # Tumblr's API is volatile - it doesn't neatly sort posts by date, 403 # so it can happen that there's suddenly huge jumps in time. 404 # Check if this is happening by extracting the difference between all consecutive dates. 405 time_difs = list() 406 post_dates.reverse() 407 408 for i, date in enumerate(post_dates): 409 410 if i == (len(post_dates) - 1): 411 break 412 413 # Calculate and add time differences 414 time_dif = date - post_dates[i + 1] 415 416 # After having collected 250 posts, check whether the time 417 # difference between posts far exceeds the average time difference 418 # between posts. If it's more than five times this amount, 419 # restart the query with the timestamp just before the gap, minus the 420 # average time difference up to this point - something might be up with Tumblr's API. 421 if len(all_posts) >= 250 and time_dif > (avg_time_dif * 5): 422 423 time_str = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M:%S") 424 self.dataset.update_status("Time difference of %s spotted, restarting query at %s" % (str(time_dif), time_str,)) 425 426 self.seen_ids.update([post["id"] for post in posts]) 427 posts = [post for post in posts if post["timestamp"] >= date] 428 if posts: 429 all_posts += posts 430 431 max_date = date 432 break 433 434 time_difs.append(time_dif) 435 436 # To start a new query 437 if not posts: 438 break 439 440 # Manually check if we have a lower date than the lowest allowed date already (min date). 441 # This functonality is not natively supported by Tumblr. 442 if min_date: 443 if max_date < min_date: 444 445 # Get rid of all the posts that are earlier than the max_date timestamp 446 posts = [post for post in posts if post["timestamp"] >= min_date and post["timestamp"] <= max_date_original] 447 448 if posts: 449 all_posts += posts 450 self.seen_ids.update([post["id"] for post in posts]) 451 break 452 453 # We got a new post, so we can reset the retry counts. 454 date_retries = 0 455 retries = 0 456 457 # Add retrieved posts top the main list 458 all_posts += posts 459 460 # Add to seen ids 461 self.seen_ids.update([post["id"] for post in posts]) 462 463 # Add time differences and calculate new average time difference 464 all_time_difs += time_difs 465 466 # Make the average time difference a moving average, 467 # to be flexible with faster and slower post paces. 468 # Delete the first 100 posts every hundred or so items. 469 if (len(all_time_difs) - time_difs_len) > 100: 470 all_time_difs = all_time_difs[time_difs_len:] 471 if all_time_difs: 472 time_difs_len = len(all_time_difs) 473 avg_time_dif = sum(all_time_difs) / len(all_time_difs) 474 475 if len(all_posts) >= self.max_posts: 476 self.max_posts_reached = True 477 break 478 479 self.dataset.update_status("Collected %s posts for tag %s, now looking for posts before %s" % (str(len(all_posts)), tag, max_date_str,)) 480 481 return all_posts 482 483 def get_posts_by_blog(self, blog, max_date=None, min_date=None): 484 """ 485 Get Tumblr posts posts with a certain blog 486 :param tag, str: the name of the blog you want to look for 487 :param min_date: a unix timestamp, indicates posts should be min_date this date. 488 :param max_date: a unix timestamp, indicates posts should be max_date this date. 489 490 :returns: a dict created from the JSON response 491 """ 492 blog = blog + ".tumblr.com" 493 494 if not max_date: 495 max_date = int(time.time()) 496 497 # Store all posts in here 498 all_posts = [] 499 500 # Store notes here, if they exist and are requested 501 all_notes = [] 502 503 # Some retries to make sure the Tumblr API actually returns everything 504 retries = 0 505 self.max_retries = 48 # 2 days 506 507 # Get Tumblr posts until there's no more left. 508 while True: 509 if self.interrupted: 510 raise ProcessorInterruptedException("Interrupted while fetching blog posts from Tumblr") 511 512 # Stop min_date 20 retries 513 if retries >= self.max_retries: 514 self.dataset.update_status("No more posts") 515 break 516 517 try: 518 # Use the pytumblr library to make the API call 519 posts = self.client.posts(blog, before=max_date, limit=20, reblog_info=True, notes_info=True, filter="raw") 520 posts = posts["posts"] 521 522 #if (max_date - posts[0]["timestamp"]) > 500000: 523 #self.dataset.update_status("ALERT - DATES LIKELY SKIPPED") 524 #self.dataset.update_status([post["timestamp"] for post in posts]) 525 526 except Exception as e: 527 528 self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date)) 529 self.api_limit_reached = True 530 break 531 532 # Make sure the Tumblr API doesn't magically stop at an earlier date 533 if not posts or isinstance(posts, str): 534 retries += 1 535 max_date -= 3600 # Decrease by an hour 536 self.dataset.update_status("No posts returned by Tumblr - checking whether this is really all (retry %s/48)" % str(retries)) 537 continue 538 539 # Append posts to main list 540 else: 541 # Keep the notes, if so indicated 542 if self.parameters.get("fetch_reblogs"): 543 for post in posts: 544 if "notes" in post: 545 all_notes.append(post["notes"]) 546 547 posts = self.parse_tumblr_posts(posts) 548 549 # Get the lowest date 550 max_date = sorted([post["timestamp"] for post in posts])[0] 551 552 # Manually check if we have a lower date than the min date (`min_date`) already. 553 # This functonality is not natively supported by Tumblr. 554 if min_date: 555 if max_date < min_date: 556 557 # Get rid of all the posts that are earlier than the max_date timestamp 558 posts = [post for post in posts if post["timestamp"] >= min_date] 559 560 if posts: 561 all_posts += posts 562 break 563 564 retries = 0 565 566 all_posts += posts 567 568 #if (max_date - posts[len(posts) - 1]["timestamp"]) > 500000: 569 #self.dataset.update_status("ALERT - DATES LIKELY SKIPPED") 570 #self.dataset.update_status([post["timestamp"] for post in posts]) 571 572 if len(all_posts) >= self.max_posts: 573 self.max_posts_reached = True 574 break 575 576 self.dataset.update_status("Collected %s posts" % str(len(all_posts))) 577 578 return all_posts, all_notes 579 580 def get_post_notes(self, di_blogs_ids, only_text_reblogs=True): 581 """ 582 Gets the post notes. 583 :param di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values. 584 :param only_text_reblogs, bool: Whether to only keep notes that are text reblogs. 585 """ 586 # List of dict to get reblogs. Items are: [{"blog_name": post_id}] 587 text_reblogs = [] 588 589 max_date = None 590 591 # Do some counting 592 len_blogs = len(di_blogs_ids) 593 count = 0 594 595 # Stop trying to fetch the notes after this many retries 596 max_notes_retries = 10 597 notes_retries = 0 598 599 for key, value in di_blogs_ids.items(): 600 601 count += 1 602 603 if self.interrupted: 604 raise ProcessorInterruptedException("Interrupted while fetching post notes from Tumblr") 605 606 # First, get the blog names and post_ids from reblogs 607 # Keep digging till there's nothing left, or if we can fetch no new notes 608 while True: 609 610 # Requests a post's notes 611 notes = self.client.notes(key, id=value, before_timestamp=max_date) 612 613 if only_text_reblogs: 614 615 if "notes" in notes: 616 notes_retries = 0 617 618 for note in notes["notes"]: 619 # If it's a reblog, extract the data and save the rest of the posts for later 620 if note["type"] == "reblog": 621 if note.get("added_text"): 622 text_reblogs.append({note["blog_name"]: note["post_id"]}) 623 624 if notes.get("_links"): 625 max_date = notes["_links"]["next"]["query_params"]["before_timestamp"] 626 627 # If there's no `_links` key, that's all. 628 else: 629 break 630 631 # If there's no "notes" key in the returned dict, something might be up 632 else: 633 self.dataset.update_status("Couldn't get notes for Tumblr request " + str(notes)) 634 notes_retries += 1 635 pass 636 637 if notes_retries > max_notes_retries: 638 self.failed_notes.append(key) 639 break 640 641 self.dataset.update_status("Identified %i text reblogs in %i/%i notes" % (len(text_reblogs), count, len_blogs)) 642 643 return text_reblogs 644 645 def get_post_by_id(self, blog_name, post_id): 646 """ 647 Fetch individual posts 648 :param blog_name, str: The blog's name 649 :param id, int: The post ID 650 651 returns result list, a list with a dictionary with the post's information 652 """ 653 if self.interrupted: 654 raise ProcessorInterruptedException("Interrupted while fetching post from Tumblr") 655 656 # Request the specific post. 657 post = self.client.posts(blog_name, id=post_id) 658 659 # Tumblr API can sometimes return with this kind of error: 660 # {'meta': {'status': 500, 'msg': 'Server Error'}, 'response': {'error': 'Malformed JSON or HTML was returned.'}} 661 if "posts" not in post: 662 return None 663 664 # Get the first element of the list - it's always one post. 665 result = post["posts"][0] 666 667 return result 668 669 @staticmethod 670 def get_tumbler_keys(user): 671 config_keys = [ 672 config.get("api.tumblr.consumer_key", user=user), 673 config.get("api.tumblr.consumer_secret", user=user), 674 config.get("api.tumblr.key", user=user), 675 config.get("api.tumblr.secret_key", user=user)] 676 if not all(config_keys): 677 raise ConfigException("Not all Tumblr API credentials are configured. Cannot query Tumblr API.") 678 return config_keys 679 680 def connect_to_tumblr(self): 681 """ 682 Returns a connection to the Tumblr API using the pytumblr library. 683 684 """ 685 # User input keys 686 config_keys = [self.parameters.get("consumer_key"), 687 self.parameters.get("consumer_secret"), 688 self.parameters.get("key"), 689 self.parameters.get("secret_key")] 690 if not all(config_keys): 691 # No user input keys; attempt to use 4CAT config keys 692 config_keys = self.get_tumbler_keys(self.owner) 693 694 self.client = pytumblr.TumblrRestClient(*config_keys) 695 696 client_info = self.client.info() 697 698 # Check if there's any errors 699 if client_info.get("meta"): 700 if client_info["meta"].get("status") == 429: 701 raise ConnectionRefusedError("Tumblr API timed out") 702 703 return self.client 704 705 def validate_query(query, request, user): 706 """ 707 Validate custom data input 708 709 Confirms that the uploaded file is a valid CSV file and, if so, returns 710 some metadata. 711 712 :param dict query: Query parameters, from client-side. 713 :param request: Flask request 714 :param User user: User object of user who has submitted the query 715 :return dict: Safe query parameters 716 """ 717 # no query 4 u 718 if not query.get("query", "").strip(): 719 raise QueryParametersException("You must provide a search query.") 720 721 # reformat queries to be a comma-separated list 722 items = query.get("query").replace("#","") 723 items = items.split("\n") 724 725 # Not more than 10 plox 726 if len(items) > 10: 727 raise QueryParametersException("Only query for ten or less tags or blogs." + str(len(items))) 728 729 # no query 4 u 730 if not items: 731 raise QueryParametersException("Search query cannot be empty.") 732 733 # So it shows nicely in the frontend. 734 items = ", ".join([item.lstrip().rstrip() for item in items if item]) 735 736 # the dates need to make sense as a range to search within 737 query["min_date"], query["max_date"] = query.get("daterange") 738 if any(query.get("daterange")) and not all(query.get("daterange")): 739 raise QueryParametersException("When providing a date range, set both an upper and lower limit.") 740 741 del query["daterange"] 742 743 query["query"] = items 744 query["board"] = query.get("search_scope") + "s" # used in web interface 745 746 # if we made it this far, the query can be executed 747 return query 748 749 def parse_tumblr_posts(self, posts, reblog=False): 750 """ 751 Function to parse Tumblr posts into the same dict items. 752 Tumblr posts can be many different types, so some data processing is necessary. 753 754 :param posts, list: List of Tumblr posts as returned form the Tumblr API. 755 :param reblog, bool: Whether the post concerns a reblog of posts from the original dataset. 756 757 returns list processed_posts, a list with dictionary items of post info. 758 """ 759 760 # Store processed posts here 761 processed_posts = [] 762 763 media_tags = ["photo", "video", "audio"] 764 765 # Loop through all the posts and write a row for each of them. 766 for post in posts: 767 post_type = post["type"] 768 769 # The post's text is in different keys depending on the post type 770 if post_type in media_tags: 771 text = post["caption"] 772 elif post_type == "link": 773 text = post["description"] 774 elif post_type == "text" or post_type == "chat": 775 text = post["body"] 776 elif post_type == "answer": 777 text = post["question"] + "\n" + post["answer"] 778 else: 779 text = "" 780 781 # Different options for video types (YouTube- or Tumblr-hosted) 782 if post_type == "video": 783 784 video_source = post["video_type"] 785 # Use `get` since some videos are deleted 786 video_url = post.get("permalink_url") 787 788 if video_source == "youtube": 789 # There's no URL if the YouTube video is deleted 790 if video_url: 791 video_id = post["video"]["youtube"]["video_id"] 792 else: 793 video_id = "deleted" 794 else: 795 video_id = "unknown" 796 797 else: 798 video_source = None 799 video_id = None 800 video_url = None 801 802 # All the fields to write 803 processed_post = { 804 # General columns 805 "type": post_type, 806 "timestamp": post["timestamp"], 807 "is_reblog": reblog, 808 809 # Blog columns 810 "author": post["blog_name"], 811 "subject": post["blog"]["title"], 812 "blog_description": post["blog"]["description"], 813 "blog_url": post["blog"]["url"], 814 "blog_uuid": post["blog"]["uuid"], 815 "blog_last_updated": post["blog"]["updated"], 816 817 # Post columns 818 "id": post["id"], 819 "post_url": post["post_url"], 820 "post_slug": post["slug"], 821 "thread_id": post["reblog_key"], 822 "body": text.replace("\x00", ""), 823 "tags": ", ".join(post["tags"]) if post.get("tags") else None, 824 "notes": post["note_count"], 825 "urls": post.get("link_url"), 826 "images": ",".join([photo["original_size"]["url"] for photo in post["photos"]]) if post.get("photos") else None, 827 828 # Optional video columns 829 "video_source": video_source if post_type == "video" else None, 830 "video_url": video_url if post_type == "video" else None, 831 "video_id": video_id if post_type == "video" else None, 832 "video_thumb": post.get("thumbnail_url"), # Can be deleted 833 834 # Optional audio columns 835 "audio_type": post.get("audio_type"), 836 "audio_url": post.get("audio_source_url"), 837 "audio_plays": post.get("plays"), 838 839 # Optional link columns 840 "link_author": post.get("link_author"), 841 "link_publisher": post.get("publisher"), 842 "link_image": post.get("link_image"), 843 844 # Optional answers columns 845 "asking_name": post.get("asking_name"), 846 "asking_url": post.get("asking_url"), 847 "question": post.get("question"), 848 "answer": post.get("answer"), 849 850 # Optional chat columns 851 "chat": post.get("dialogue") 852 } 853 854 # Store the processed post 855 processed_posts.append(processed_post) 856 857 return processed_posts 858 859 def after_process(self): 860 """ 861 Override of the same function in processor.py 862 Used to notify of potential API errors. 863 864 """ 865 super().after_process() 866 self.client = None 867 errors = [] 868 if len(self.failed_notes) > 0: 869 errors.append("API error(s) when fetching notes %s" % ", ".join(self.failed_notes)) 870 if len(self.failed_reblogs) > 0: 871 errors.append("API error(s) when fetching reblogs %s" % ", ".join(self.failed_reblogs)) 872 if errors: 873 self.dataset.log(";\n ".join(errors)) 874 self.dataset.update_status(f"Dataset completed but failed to capture some notes/reblogs; see log for details.")
Tumblr data filter module.
80 @classmethod 81 def get_options(cls, parent_dataset=None, user=None): 82 """ 83 Check is Tumbler keys configured and if not, requests from User 84 """ 85 options = { 86 "intro": { 87 "type": UserInput.OPTION_INFO, 88 "help": "Retrieve any kind of Tumblr posts with specific tags or from specific blogs. Gets 100.000 posts " 89 "at max. Insert tags or names of blogs, one on each line. You may insert up to ten tags or " 90 "blogs.\n\nTumblr tags may include whitespace and commas. A `#` before the tag is optional.\n\n" 91 "Tag search only get posts explicitly associated with the exact tag you insert here. Querying " 92 "`gogh` will thus not get posts only tagged with `van gogh`. Keyword search is unfortunately not " 93 "allowed by the [Tumblr API](https://api.tumblr.com).\n\nIf 4CAT reached its Tumblr API rate " 94 "limit, try again 24 hours later." 95 }, 96 "search_scope": { 97 "type": UserInput.OPTION_CHOICE, 98 "help": "Search by", 99 "options": { 100 "tag": "Tag", 101 "blog": "Blog" 102 }, 103 "default": "tag" 104 }, 105 "query": { 106 "type": UserInput.OPTION_TEXT_LARGE, 107 "help": "Tags/blogs", 108 "tooltip": "Separate with commas or new lines." 109 }, 110 "fetch_reblogs": { 111 "type": UserInput.OPTION_TOGGLE, 112 "help": "Also fetch reblogs with text? (warning: slow)", 113 "default": False 114 } 115 } 116 117 try: 118 config_keys = SearchTumblr.get_tumbler_keys(user) 119 except ConfigException: 120 # No 4CAT set keys for user; let user input their own 121 options["key-info"] = { 122 "type": UserInput.OPTION_INFO, 123 "help": "In order to access the Tumblr API, you need to register an application. You can do so " 124 "[here](https://www.tumblr.com/oauth/apps) and use the keys below. You will first get the OAuth " 125 "Consumer Key and Secret, and then the User Token Key and Secret [after entering them here](ht" 126 "tps://api.tumblr.com/console/calls/user/info) and granting access." 127 } 128 options["consumer_key"] = { 129 "type": UserInput.OPTION_TEXT, 130 "sensitive": True, 131 "cache": True, 132 "help": "OAuth Consumer Key" 133 } 134 options["consumer_secret"] = { 135 "type": UserInput.OPTION_TEXT, 136 "sensitive": True, 137 "cache": True, 138 "help": "OAuth Consumer Secret" 139 } 140 options["key"] = { 141 "type": UserInput.OPTION_TEXT, 142 "sensitive": True, 143 "cache": True, 144 "help": "User Token Key" 145 } 146 options["secret_key"] = { 147 "type": UserInput.OPTION_TEXT, 148 "sensitive": True, 149 "cache": True, 150 "help": "User Token Secret" 151 } 152 153 options["divider"] = { 154 "type": UserInput.OPTION_DIVIDER 155 } 156 options["date-intro"] = { 157 "type": UserInput.OPTION_INFO, 158 "help": "**Note:** The [Tumblr API](https://api.tumblr.com) is volatile: when fetching sporadically used " 159 "tags, it may return zero posts, even though older posts exist. To mitigate this, 4CAT decreases " 160 "the date parameter (<code>before</code>) with six hours and sends the query again. This often " 161 "successfully returns older, un-fetched posts. If it didn't find new data after 96 retries (24 " 162 "days), it checks for data up to six years before the last date, decreasing 12 times by 6 months. " 163 "If that also results in nothing, it assumes the dataset is complete. Check the oldest post in " 164 "your dataset to see if it this is indeed the case and whether any odd time gaps exists." 165 } 166 options["daterange"] = { 167 "type": UserInput.OPTION_DATERANGE, 168 "help": "Date range" 169 } 170 171 return options
Check is Tumbler keys configured and if not, requests from User
173 def get_items(self, query): 174 """ 175 Fetches data from Tumblr via its API. 176 177 """ 178 179 # ready our parameters 180 parameters = self.dataset.get_parameters() 181 scope = parameters.get("search_scope", "") 182 queries = parameters.get("query").split(", ") 183 fetch_reblogs = parameters.get("fetch_reblogs", False) 184 185 # Store all info here 186 results = [] 187 188 # Store all notes from posts by blogs here 189 all_notes = [] 190 191 # Get date parameters 192 min_date = parameters.get("min_date", None) 193 max_date = parameters.get("max_date", None) 194 195 if min_date: 196 min_date = int(min_date) 197 if max_date: 198 max_date = int(max_date) 199 else: 200 max_date = int(time.time()) 201 202 # Connect to Tumblr API 203 try: 204 self.client = self.connect_to_tumblr() 205 except ConfigException as e: 206 self.log.warning(f"Could not connect to Tumblr API: API keys invalid or not set") 207 self.dataset.finish_with_error(f"Could not connect to Tumblr API: API keys invalid or not set") 208 return 209 except ConnectionRefusedError as e: 210 client_info = self.client.info() 211 self.log.warning(f"Could not connect to Tumblr API: {e}; client_info: {client_info}") 212 self.dataset.finish_with_error(f"Could not connect to Tumblr API: {client_info.get('meta', {}).get('status', '')} - {client_info.get('meta', {}).get('msg', '')}") 213 return 214 215 # for each tag or blog, get post 216 for query in queries: 217 218 # Get posts per tag 219 if scope == "tag": 220 new_results = self.get_posts_by_tag(query, max_date=max_date, min_date=min_date) 221 222 # Get posts per blog 223 elif scope == "blog": 224 new_results, notes = self.get_posts_by_blog(query, max_date=max_date, min_date=min_date) 225 all_notes.append(notes) 226 else: 227 self.dataset.update_status("Invalid scope") 228 break 229 230 results += new_results 231 232 if self.max_posts_reached: 233 self.dataset.update_status("Max posts exceeded") 234 break 235 if self.api_limit_reached: 236 self.dataset.update_status("API limit reached") 237 break 238 239 # If we also want the posts that reblogged the fetched posts: 240 if fetch_reblogs and not self.max_posts_reached and not self.api_limit_reached: 241 self.dataset.update_status("Getting notes from all posts") 242 243 # Reblog information is already returned for blog-level searches 244 if scope == "blog": 245 text_reblogs = [] 246 247 # Loop through and add the text reblogs that came with the results. 248 for post_notes in all_notes: 249 for post_note in post_notes: 250 for note in post_note: 251 if note["type"] == "reblog": 252 text_reblogs.append({note["blog_name"]: note["post_id"]}) 253 254 # Retrieving notes for tag-based posts should be done one-by-one. 255 # Fetching them all at once is not supported by the Tumblr API. 256 elif scope == "tag": 257 # Prepare dicts to pass to `get_post_notes` 258 posts_to_fetch = {result["author"]: result["id"] for result in results} 259 260 # First extract the notes of each post, and only keep text reblogs 261 text_reblogs = self.get_post_notes(posts_to_fetch) 262 263 # Get the full data for text reblogs. 264 if text_reblogs: 265 connection_retries = 0 266 for i, text_reblog in enumerate(text_reblogs): 267 self.dataset.update_status("Got %i/%i text reblogs" % (i, len(text_reblogs))) 268 if connection_retries >= 5: 269 self.dataset.update_status("Multiple connection refused errors; unable to continue collection of reblogs.") 270 break 271 for key, value in text_reblog.items(): 272 if connection_retries >= 5: 273 break 274 try: 275 reblog_post = self.get_post_by_id(key, value) 276 except ConnectionRefusedError: 277 connection_retries += 1 278 self.failed_reblogs.append(key) 279 self.dataset.update_status(f"ConnectionRefused: Unable to collect reblogs for post {key}") 280 continue 281 if reblog_post: 282 reblog_post = self.parse_tumblr_posts([reblog_post], reblog=True) 283 results.append(reblog_post[0]) 284 285 self.job.finish() 286 return results
Fetches data from Tumblr via its API.
288 def get_posts_by_tag(self, tag, max_date=None, min_date=None): 289 """ 290 Get Tumblr posts posts with a certain tag 291 :param tag, str: the tag you want to look for 292 :param min_date: a unix timestamp, indicates posts should be min_date this date. 293 :param max_date: a unix timestamp, indicates posts should be max_date this date. 294 295 :returns: a dict created from the JSON response 296 """ 297 # Store all posts in here 298 all_posts = [] 299 300 # Some retries to make sure the Tumblr API actually returns everything. 301 retries = 0 302 date_retries = 0 303 304 # We're gonna change max_date, so store a copy for reference. 305 max_date_original = max_date 306 307 # We use the average time difference between posts to spot possible gaps in the data. 308 all_time_difs = [] 309 avg_time_dif = 0 310 time_difs_len = 0 311 312 # Get Tumblr posts until there's no more left. 313 while True: 314 if self.interrupted: 315 raise ProcessorInterruptedException("Interrupted while fetching tag posts from Tumblr") 316 317 # Stop after max for date reductions 318 if date_retries >= self.max_date_retries: 319 self.dataset.update_status("No more posts in this date range") 320 break 321 322 # Stop after max retries for API/connection stuff 323 if retries >= self.max_retries: 324 self.dataset.update_status("No more posts") 325 break 326 327 try: 328 # Use the pytumblr library to make the API call 329 posts = self.client.tagged(tag, before=max_date, limit=20, filter="raw") 330 except ConnectionError: 331 self.update_status("Encountered a connection error, waiting 10 seconds.") 332 time.sleep(10) 333 retries += 1 334 continue 335 336 # Get rid of posts that we already enountered, 337 # preventing Tumblr API shenanigans or double posts because of 338 # time reductions. Make sure it's no odd error string, though. 339 unseen_posts = [] 340 for check_post in posts: 341 # Sometimes the API repsonds just with "meta", "response", or "errors". 342 if isinstance(check_post, str): 343 self.dataset.update_status("Couldn't add post:", check_post) 344 retries += 1 345 break 346 else: 347 retries = 0 348 if check_post["id"] not in self.seen_ids: 349 unseen_posts.append(check_post) 350 posts = unseen_posts 351 352 # For no clear reason, the Tumblr API sometimes provides posts with a higher timestamp than requested. 353 # So we have to prevent this manually. 354 if max_date_original: 355 posts = [post for post in posts if post["timestamp"] <= max_date_original] 356 357 max_date_str = datetime.fromtimestamp(max_date).strftime("%Y-%m-%d %H:%M:%S") 358 359 # except Exception as e: 360 # print(e) 361 # self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date)) 362 # self.api_limit_reached = True 363 # break 364 365 # Make sure the Tumblr API doesn't magically stop at an earlier date 366 if not posts: 367 368 date_retries += 1 369 370 # We're first gonna check carefully if there's small timegaps by 371 # decreasing by six hours. 372 # If that didn't result in any new posts, also dedicate 12 date_retries 373 # with reductions of six months, just to be sure there's no data from 374 # years earlier missing. 375 376 if date_retries < 96: 377 max_date -= 21600 # Decrease by six hours 378 self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - decreasing time search with 6 hours to %s to make sure this is really it (retry %s/96)" % (str(len(all_posts)), tag, max_date_str, str(date_retries),)) 379 elif date_retries <= self.max_date_retries: 380 max_date -= 604800 # Decrease by one week 381 retry_str = str(date_retries - 96) 382 self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - no new posts found with decreasing by 6 hours, decreasing with a week to %s instead (retry %s/150)" % (str(len(all_posts)), tag, max_date_str, str(retry_str),)) 383 384 # We can stop when the max date drops below the min date. 385 if min_date: 386 if max_date <= min_date: 387 break 388 389 continue 390 391 # Append posts to main list 392 else: 393 394 posts = self.parse_tumblr_posts(posts) 395 396 # Get all timestamps and sort them. 397 post_dates = sorted([post["timestamp"] for post in posts]) 398 399 # Get the lowest date and use it as the next "before" parameter. 400 max_date = post_dates[0] 401 402 # Tumblr's API is volatile - it doesn't neatly sort posts by date, 403 # so it can happen that there's suddenly huge jumps in time. 404 # Check if this is happening by extracting the difference between all consecutive dates. 405 time_difs = list() 406 post_dates.reverse() 407 408 for i, date in enumerate(post_dates): 409 410 if i == (len(post_dates) - 1): 411 break 412 413 # Calculate and add time differences 414 time_dif = date - post_dates[i + 1] 415 416 # After having collected 250 posts, check whether the time 417 # difference between posts far exceeds the average time difference 418 # between posts. If it's more than five times this amount, 419 # restart the query with the timestamp just before the gap, minus the 420 # average time difference up to this point - something might be up with Tumblr's API. 421 if len(all_posts) >= 250 and time_dif > (avg_time_dif * 5): 422 423 time_str = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M:%S") 424 self.dataset.update_status("Time difference of %s spotted, restarting query at %s" % (str(time_dif), time_str,)) 425 426 self.seen_ids.update([post["id"] for post in posts]) 427 posts = [post for post in posts if post["timestamp"] >= date] 428 if posts: 429 all_posts += posts 430 431 max_date = date 432 break 433 434 time_difs.append(time_dif) 435 436 # To start a new query 437 if not posts: 438 break 439 440 # Manually check if we have a lower date than the lowest allowed date already (min date). 441 # This functonality is not natively supported by Tumblr. 442 if min_date: 443 if max_date < min_date: 444 445 # Get rid of all the posts that are earlier than the max_date timestamp 446 posts = [post for post in posts if post["timestamp"] >= min_date and post["timestamp"] <= max_date_original] 447 448 if posts: 449 all_posts += posts 450 self.seen_ids.update([post["id"] for post in posts]) 451 break 452 453 # We got a new post, so we can reset the retry counts. 454 date_retries = 0 455 retries = 0 456 457 # Add retrieved posts top the main list 458 all_posts += posts 459 460 # Add to seen ids 461 self.seen_ids.update([post["id"] for post in posts]) 462 463 # Add time differences and calculate new average time difference 464 all_time_difs += time_difs 465 466 # Make the average time difference a moving average, 467 # to be flexible with faster and slower post paces. 468 # Delete the first 100 posts every hundred or so items. 469 if (len(all_time_difs) - time_difs_len) > 100: 470 all_time_difs = all_time_difs[time_difs_len:] 471 if all_time_difs: 472 time_difs_len = len(all_time_difs) 473 avg_time_dif = sum(all_time_difs) / len(all_time_difs) 474 475 if len(all_posts) >= self.max_posts: 476 self.max_posts_reached = True 477 break 478 479 self.dataset.update_status("Collected %s posts for tag %s, now looking for posts before %s" % (str(len(all_posts)), tag, max_date_str,)) 480 481 return all_posts
Get Tumblr posts posts with a certain tag
Parameters
- tag, str: the tag you want to look for
- min_date: a unix timestamp, indicates posts should be min_date this date.
- max_date: a unix timestamp, indicates posts should be max_date this date.
:returns: a dict created from the JSON response
483 def get_posts_by_blog(self, blog, max_date=None, min_date=None): 484 """ 485 Get Tumblr posts posts with a certain blog 486 :param tag, str: the name of the blog you want to look for 487 :param min_date: a unix timestamp, indicates posts should be min_date this date. 488 :param max_date: a unix timestamp, indicates posts should be max_date this date. 489 490 :returns: a dict created from the JSON response 491 """ 492 blog = blog + ".tumblr.com" 493 494 if not max_date: 495 max_date = int(time.time()) 496 497 # Store all posts in here 498 all_posts = [] 499 500 # Store notes here, if they exist and are requested 501 all_notes = [] 502 503 # Some retries to make sure the Tumblr API actually returns everything 504 retries = 0 505 self.max_retries = 48 # 2 days 506 507 # Get Tumblr posts until there's no more left. 508 while True: 509 if self.interrupted: 510 raise ProcessorInterruptedException("Interrupted while fetching blog posts from Tumblr") 511 512 # Stop min_date 20 retries 513 if retries >= self.max_retries: 514 self.dataset.update_status("No more posts") 515 break 516 517 try: 518 # Use the pytumblr library to make the API call 519 posts = self.client.posts(blog, before=max_date, limit=20, reblog_info=True, notes_info=True, filter="raw") 520 posts = posts["posts"] 521 522 #if (max_date - posts[0]["timestamp"]) > 500000: 523 #self.dataset.update_status("ALERT - DATES LIKELY SKIPPED") 524 #self.dataset.update_status([post["timestamp"] for post in posts]) 525 526 except Exception as e: 527 528 self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date)) 529 self.api_limit_reached = True 530 break 531 532 # Make sure the Tumblr API doesn't magically stop at an earlier date 533 if not posts or isinstance(posts, str): 534 retries += 1 535 max_date -= 3600 # Decrease by an hour 536 self.dataset.update_status("No posts returned by Tumblr - checking whether this is really all (retry %s/48)" % str(retries)) 537 continue 538 539 # Append posts to main list 540 else: 541 # Keep the notes, if so indicated 542 if self.parameters.get("fetch_reblogs"): 543 for post in posts: 544 if "notes" in post: 545 all_notes.append(post["notes"]) 546 547 posts = self.parse_tumblr_posts(posts) 548 549 # Get the lowest date 550 max_date = sorted([post["timestamp"] for post in posts])[0] 551 552 # Manually check if we have a lower date than the min date (`min_date`) already. 553 # This functonality is not natively supported by Tumblr. 554 if min_date: 555 if max_date < min_date: 556 557 # Get rid of all the posts that are earlier than the max_date timestamp 558 posts = [post for post in posts if post["timestamp"] >= min_date] 559 560 if posts: 561 all_posts += posts 562 break 563 564 retries = 0 565 566 all_posts += posts 567 568 #if (max_date - posts[len(posts) - 1]["timestamp"]) > 500000: 569 #self.dataset.update_status("ALERT - DATES LIKELY SKIPPED") 570 #self.dataset.update_status([post["timestamp"] for post in posts]) 571 572 if len(all_posts) >= self.max_posts: 573 self.max_posts_reached = True 574 break 575 576 self.dataset.update_status("Collected %s posts" % str(len(all_posts))) 577 578 return all_posts, all_notes
Get Tumblr posts posts with a certain blog :param tag, str: the name of the blog you want to look for :param min_date: a unix timestamp, indicates posts should be min_date this date.
Parameters
- max_date: a unix timestamp, indicates posts should be max_date this date.
:returns: a dict created from the JSON response
580 def get_post_notes(self, di_blogs_ids, only_text_reblogs=True): 581 """ 582 Gets the post notes. 583 :param di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values. 584 :param only_text_reblogs, bool: Whether to only keep notes that are text reblogs. 585 """ 586 # List of dict to get reblogs. Items are: [{"blog_name": post_id}] 587 text_reblogs = [] 588 589 max_date = None 590 591 # Do some counting 592 len_blogs = len(di_blogs_ids) 593 count = 0 594 595 # Stop trying to fetch the notes after this many retries 596 max_notes_retries = 10 597 notes_retries = 0 598 599 for key, value in di_blogs_ids.items(): 600 601 count += 1 602 603 if self.interrupted: 604 raise ProcessorInterruptedException("Interrupted while fetching post notes from Tumblr") 605 606 # First, get the blog names and post_ids from reblogs 607 # Keep digging till there's nothing left, or if we can fetch no new notes 608 while True: 609 610 # Requests a post's notes 611 notes = self.client.notes(key, id=value, before_timestamp=max_date) 612 613 if only_text_reblogs: 614 615 if "notes" in notes: 616 notes_retries = 0 617 618 for note in notes["notes"]: 619 # If it's a reblog, extract the data and save the rest of the posts for later 620 if note["type"] == "reblog": 621 if note.get("added_text"): 622 text_reblogs.append({note["blog_name"]: note["post_id"]}) 623 624 if notes.get("_links"): 625 max_date = notes["_links"]["next"]["query_params"]["before_timestamp"] 626 627 # If there's no `_links` key, that's all. 628 else: 629 break 630 631 # If there's no "notes" key in the returned dict, something might be up 632 else: 633 self.dataset.update_status("Couldn't get notes for Tumblr request " + str(notes)) 634 notes_retries += 1 635 pass 636 637 if notes_retries > max_notes_retries: 638 self.failed_notes.append(key) 639 break 640 641 self.dataset.update_status("Identified %i text reblogs in %i/%i notes" % (len(text_reblogs), count, len_blogs)) 642 643 return text_reblogs
Gets the post notes.
Parameters
- di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values.
- only_text_reblogs, bool: Whether to only keep notes that are text reblogs.
645 def get_post_by_id(self, blog_name, post_id): 646 """ 647 Fetch individual posts 648 :param blog_name, str: The blog's name 649 :param id, int: The post ID 650 651 returns result list, a list with a dictionary with the post's information 652 """ 653 if self.interrupted: 654 raise ProcessorInterruptedException("Interrupted while fetching post from Tumblr") 655 656 # Request the specific post. 657 post = self.client.posts(blog_name, id=post_id) 658 659 # Tumblr API can sometimes return with this kind of error: 660 # {'meta': {'status': 500, 'msg': 'Server Error'}, 'response': {'error': 'Malformed JSON or HTML was returned.'}} 661 if "posts" not in post: 662 return None 663 664 # Get the first element of the list - it's always one post. 665 result = post["posts"][0] 666 667 return result
Fetch individual posts
Parameters
- blog_name, str: The blog's name
- id, int: The post ID
returns result list, a list with a dictionary with the post's information
669 @staticmethod 670 def get_tumbler_keys(user): 671 config_keys = [ 672 config.get("api.tumblr.consumer_key", user=user), 673 config.get("api.tumblr.consumer_secret", user=user), 674 config.get("api.tumblr.key", user=user), 675 config.get("api.tumblr.secret_key", user=user)] 676 if not all(config_keys): 677 raise ConfigException("Not all Tumblr API credentials are configured. Cannot query Tumblr API.") 678 return config_keys
680 def connect_to_tumblr(self): 681 """ 682 Returns a connection to the Tumblr API using the pytumblr library. 683 684 """ 685 # User input keys 686 config_keys = [self.parameters.get("consumer_key"), 687 self.parameters.get("consumer_secret"), 688 self.parameters.get("key"), 689 self.parameters.get("secret_key")] 690 if not all(config_keys): 691 # No user input keys; attempt to use 4CAT config keys 692 config_keys = self.get_tumbler_keys(self.owner) 693 694 self.client = pytumblr.TumblrRestClient(*config_keys) 695 696 client_info = self.client.info() 697 698 # Check if there's any errors 699 if client_info.get("meta"): 700 if client_info["meta"].get("status") == 429: 701 raise ConnectionRefusedError("Tumblr API timed out") 702 703 return self.client
Returns a connection to the Tumblr API using the pytumblr library.
705 def validate_query(query, request, user): 706 """ 707 Validate custom data input 708 709 Confirms that the uploaded file is a valid CSV file and, if so, returns 710 some metadata. 711 712 :param dict query: Query parameters, from client-side. 713 :param request: Flask request 714 :param User user: User object of user who has submitted the query 715 :return dict: Safe query parameters 716 """ 717 # no query 4 u 718 if not query.get("query", "").strip(): 719 raise QueryParametersException("You must provide a search query.") 720 721 # reformat queries to be a comma-separated list 722 items = query.get("query").replace("#","") 723 items = items.split("\n") 724 725 # Not more than 10 plox 726 if len(items) > 10: 727 raise QueryParametersException("Only query for ten or less tags or blogs." + str(len(items))) 728 729 # no query 4 u 730 if not items: 731 raise QueryParametersException("Search query cannot be empty.") 732 733 # So it shows nicely in the frontend. 734 items = ", ".join([item.lstrip().rstrip() for item in items if item]) 735 736 # the dates need to make sense as a range to search within 737 query["min_date"], query["max_date"] = query.get("daterange") 738 if any(query.get("daterange")) and not all(query.get("daterange")): 739 raise QueryParametersException("When providing a date range, set both an upper and lower limit.") 740 741 del query["daterange"] 742 743 query["query"] = items 744 query["board"] = query.get("search_scope") + "s" # used in web interface 745 746 # if we made it this far, the query can be executed 747 return query
Validate custom data input
Confirms that the uploaded file is a valid CSV file and, if so, returns some metadata.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
749 def parse_tumblr_posts(self, posts, reblog=False): 750 """ 751 Function to parse Tumblr posts into the same dict items. 752 Tumblr posts can be many different types, so some data processing is necessary. 753 754 :param posts, list: List of Tumblr posts as returned form the Tumblr API. 755 :param reblog, bool: Whether the post concerns a reblog of posts from the original dataset. 756 757 returns list processed_posts, a list with dictionary items of post info. 758 """ 759 760 # Store processed posts here 761 processed_posts = [] 762 763 media_tags = ["photo", "video", "audio"] 764 765 # Loop through all the posts and write a row for each of them. 766 for post in posts: 767 post_type = post["type"] 768 769 # The post's text is in different keys depending on the post type 770 if post_type in media_tags: 771 text = post["caption"] 772 elif post_type == "link": 773 text = post["description"] 774 elif post_type == "text" or post_type == "chat": 775 text = post["body"] 776 elif post_type == "answer": 777 text = post["question"] + "\n" + post["answer"] 778 else: 779 text = "" 780 781 # Different options for video types (YouTube- or Tumblr-hosted) 782 if post_type == "video": 783 784 video_source = post["video_type"] 785 # Use `get` since some videos are deleted 786 video_url = post.get("permalink_url") 787 788 if video_source == "youtube": 789 # There's no URL if the YouTube video is deleted 790 if video_url: 791 video_id = post["video"]["youtube"]["video_id"] 792 else: 793 video_id = "deleted" 794 else: 795 video_id = "unknown" 796 797 else: 798 video_source = None 799 video_id = None 800 video_url = None 801 802 # All the fields to write 803 processed_post = { 804 # General columns 805 "type": post_type, 806 "timestamp": post["timestamp"], 807 "is_reblog": reblog, 808 809 # Blog columns 810 "author": post["blog_name"], 811 "subject": post["blog"]["title"], 812 "blog_description": post["blog"]["description"], 813 "blog_url": post["blog"]["url"], 814 "blog_uuid": post["blog"]["uuid"], 815 "blog_last_updated": post["blog"]["updated"], 816 817 # Post columns 818 "id": post["id"], 819 "post_url": post["post_url"], 820 "post_slug": post["slug"], 821 "thread_id": post["reblog_key"], 822 "body": text.replace("\x00", ""), 823 "tags": ", ".join(post["tags"]) if post.get("tags") else None, 824 "notes": post["note_count"], 825 "urls": post.get("link_url"), 826 "images": ",".join([photo["original_size"]["url"] for photo in post["photos"]]) if post.get("photos") else None, 827 828 # Optional video columns 829 "video_source": video_source if post_type == "video" else None, 830 "video_url": video_url if post_type == "video" else None, 831 "video_id": video_id if post_type == "video" else None, 832 "video_thumb": post.get("thumbnail_url"), # Can be deleted 833 834 # Optional audio columns 835 "audio_type": post.get("audio_type"), 836 "audio_url": post.get("audio_source_url"), 837 "audio_plays": post.get("plays"), 838 839 # Optional link columns 840 "link_author": post.get("link_author"), 841 "link_publisher": post.get("publisher"), 842 "link_image": post.get("link_image"), 843 844 # Optional answers columns 845 "asking_name": post.get("asking_name"), 846 "asking_url": post.get("asking_url"), 847 "question": post.get("question"), 848 "answer": post.get("answer"), 849 850 # Optional chat columns 851 "chat": post.get("dialogue") 852 } 853 854 # Store the processed post 855 processed_posts.append(processed_post) 856 857 return processed_posts
Function to parse Tumblr posts into the same dict items. Tumblr posts can be many different types, so some data processing is necessary.
Parameters
- posts, list: List of Tumblr posts as returned form the Tumblr API.
- reblog, bool: Whether the post concerns a reblog of posts from the original dataset.
returns list processed_posts, a list with dictionary items of post info.
859 def after_process(self): 860 """ 861 Override of the same function in processor.py 862 Used to notify of potential API errors. 863 864 """ 865 super().after_process() 866 self.client = None 867 errors = [] 868 if len(self.failed_notes) > 0: 869 errors.append("API error(s) when fetching notes %s" % ", ".join(self.failed_notes)) 870 if len(self.failed_reblogs) > 0: 871 errors.append("API error(s) when fetching reblogs %s" % ", ".join(self.failed_reblogs)) 872 if errors: 873 self.dataset.log(";\n ".join(errors)) 874 self.dataset.update_status(f"Dataset completed but failed to capture some notes/reblogs; see log for details.")
Override of the same function in processor.py Used to notify of potential API errors.
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor