Edit on GitHub

datasources.tumblr.search_tumblr

Search Tumblr via its API

Can fetch posts from specific blogs or with specific hashtags

  1"""
  2Search Tumblr via its API
  3
  4Can fetch posts from specific blogs or with specific hashtags
  5"""
  6
  7import time
  8import pytumblr
  9from requests.exceptions import ConnectionError
 10from datetime import datetime
 11
 12from common.config_manager import config
 13from backend.lib.search import Search
 14from common.lib.helpers import UserInput
 15from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, ConfigException
 16
 17__author__ = "Sal Hagen"
 18__credits__ = ["Sal Hagen", "Tumblr API (api.tumblr.com)"]
 19__maintainer__ = "Sal Hagen"
 20__email__ = "4cat@oilab.eu"
 21
 22class SearchTumblr(Search):
 23	"""
 24	Tumblr data filter module.
 25	"""
 26	type = "tumblr-search"  # job ID
 27	category = "Search"  # category
 28	title = "Search Tumblr"  # title displayed in UI
 29	description = "Retrieve Tumblr posts by hashtag or blog."  # description displayed in UI
 30	extension = "csv"  # extension of result file, used internally and in UI
 31	is_local = False	# Whether this datasource is locally scraped
 32	is_static = False	# Whether this datasource is still updated
 33
 34	# not available as a processor for existing datasets
 35	accepts = [None]
 36
 37	max_workers = 1
 38	max_retries = 3 # For API and connection retries.
 39	max_date_retries = 96 + 150 # For checking dates. 96 time retries of -6 hours (24 days), plus 150 extra for 150 weeks (~3 years).
 40	max_posts = 1000000
 41
 42	max_posts_reached = False
 43	api_limit_reached = False
 44
 45	seen_ids = set()
 46	client = None
 47	failed_notes = []
 48	failed_reblogs = []
 49
 50	config = {
 51		# Tumblr API keys to use for data capturing
 52		'api.tumblr.consumer_key': {
 53			'type': UserInput.OPTION_TEXT,
 54			'default': "",
 55			'help': 'Tumblr Consumer Key',
 56			'tooltip': "",
 57		},
 58		'api.tumblr.consumer_secret': {
 59			'type': UserInput.OPTION_TEXT,
 60			'default': "",
 61			'help': 'Tumblr Consumer Secret Key',
 62			'tooltip': "",
 63		},
 64		'api.tumblr.key': {
 65			'type': UserInput.OPTION_TEXT,
 66			'default': "",
 67			'help': 'Tumblr API Key',
 68			'tooltip': "",
 69		},
 70		'api.tumblr.secret_key': {
 71			'type': UserInput.OPTION_TEXT,
 72			'default': "",
 73			'help': 'Tumblr API Secret Key',
 74			'tooltip': "",
 75		},
 76	}
 77	references = ["[Tumblr API documentation](https://www.tumblr.com/docs/en/api/v2)"]
 78
 79	@classmethod
 80	def get_options(cls, parent_dataset=None, user=None):
 81		"""
 82		Check is Tumbler keys configured and if not, requests from User
 83		"""
 84		options = {
 85			"intro": {
 86				"type": UserInput.OPTION_INFO,
 87				"help": "Retrieve any kind of Tumblr posts with specific tags or from specific blogs. Gets 100.000 posts "
 88						"at max. Insert tags or names of blogs, one on each line. You may insert up to ten tags or "
 89						"blogs.\n\nTumblr tags may include whitespace and commas. A `#` before the tag is optional.\n\n"
 90						"Tag search only get posts explicitly associated with the exact tag you insert here. Querying "
 91						"`gogh` will thus not get posts only tagged with `van gogh`. Keyword search is unfortunately not "
 92						"allowed by the [Tumblr API](https://api.tumblr.com).\n\nIf 4CAT reached its Tumblr API rate "
 93						"limit, try again 24 hours later."
 94			},
 95			"search_scope": {
 96				"type": UserInput.OPTION_CHOICE,
 97				"help": "Search by",
 98				"options": {
 99					"tag": "Tag",
100					"blog": "Blog"
101				},
102				"default": "tag"
103			},
104			"query": {
105				"type": UserInput.OPTION_TEXT_LARGE,
106				"help": "Tags/blogs",
107				"tooltip": "Separate with commas or new lines."
108			},
109			"fetch_reblogs": {
110				"type": UserInput.OPTION_TOGGLE,
111				"help": "Also fetch reblogs with text? (warning: slow)",
112				"default": False
113			}
114		}
115
116		try:
117			config_keys = SearchTumblr.get_tumbler_keys(user)
118		except ConfigException:
119			# No 4CAT set keys for user; let user input their own
120			options["key-info"] = {
121				"type": UserInput.OPTION_INFO,
122				"help": "In order to access the Tumblr API, you need to register an application. You can do so "
123						"[here](https://www.tumblr.com/oauth/apps) and use the keys below. You will first get the OAuth "
124						"Consumer Key and Secret, and then the User Token Key and Secret [after entering them here](ht"
125									  "tps://api.tumblr.com/console/calls/user/info) and granting access."
126			}
127			options["consumer_key"] = {
128				"type": UserInput.OPTION_TEXT,
129				"sensitive": True,
130				"cache": True,
131				"help": "OAuth Consumer Key"
132			}
133			options["consumer_secret"] = {
134				"type": UserInput.OPTION_TEXT,
135				"sensitive": True,
136				"cache": True,
137				"help": "OAuth Consumer Secret"
138			}
139			options["key"] = {
140				"type": UserInput.OPTION_TEXT,
141				"sensitive": True,
142				"cache": True,
143				"help": "User Token Key"
144			}
145			options["secret_key"] = {
146				"type": UserInput.OPTION_TEXT,
147				"sensitive": True,
148				"cache": True,
149				"help": "User Token Secret"
150			}
151
152		options["divider"] = {
153				"type": UserInput.OPTION_DIVIDER
154			}
155		options["date-intro"] = {
156				"type": UserInput.OPTION_INFO,
157				"help": "**Note:** The [Tumblr API](https://api.tumblr.com) is volatile: when fetching sporadically used "
158						"tags, it may return zero posts, even though older posts exist. To mitigate this, 4CAT decreases "
159						"the date parameter (<code>before</code>) with six hours and sends the query again. This often "
160						"successfully returns older, un-fetched posts. If it didn't find new data after 96 retries (24 "
161						"days), it checks for data up to six years before the last date, decreasing 12 times by 6 months. "
162						"If that also results in nothing, it assumes the dataset is complete. Check the oldest post in "
163						"your dataset to see if it this is indeed the case and whether any odd time gaps exists."
164			}
165		options["daterange"] = {
166				"type": UserInput.OPTION_DATERANGE,
167				"help": "Date range"
168			}
169
170		return options
171
172	def get_items(self, query):
173		"""
174		Fetches data from Tumblr via its API.
175
176		"""
177
178		# ready our parameters
179		parameters = self.dataset.get_parameters()
180		scope = parameters.get("search_scope", "")
181		queries = parameters.get("query").split(", ")
182		fetch_reblogs = parameters.get("fetch_reblogs", False)
183
184		# Store all info here
185		results = []
186
187		# Store all notes from posts by blogs here
188		all_notes = []
189
190		# Get date parameters
191		min_date = parameters.get("min_date", None)
192		max_date = parameters.get("max_date", None)
193
194		if min_date:
195			min_date = int(min_date)
196		if max_date:
197			max_date = int(max_date)
198		else:
199			max_date = int(time.time())
200
201		# Connect to Tumblr API
202		try:
203			self.client = self.connect_to_tumblr()
204		except ConfigException as e:
205			self.log.warning(f"Could not connect to Tumblr API: API keys invalid or not set")
206			self.dataset.finish_with_error(f"Could not connect to Tumblr API: API keys invalid or not set")
207			return
208		except ConnectionRefusedError as e:
209			client_info = self.client.info()
210			self.log.warning(f"Could not connect to Tumblr API: {e}; client_info: {client_info}")
211			self.dataset.finish_with_error(f"Could not connect to Tumblr API: {client_info.get('meta', {}).get('status', '')} - {client_info.get('meta', {}).get('msg', '')}")
212			return
213
214		# for each tag or blog, get post
215		for query in queries:
216
217				# Get posts per tag
218				if scope == "tag":
219					new_results = self.get_posts_by_tag(query, max_date=max_date, min_date=min_date)
220
221				# Get posts per blog
222				elif scope == "blog":
223					new_results, notes = self.get_posts_by_blog(query, max_date=max_date, min_date=min_date)
224					all_notes.append(notes)
225				else:
226					self.dataset.update_status("Invalid scope")
227					break
228
229				results += new_results
230
231				if self.max_posts_reached:
232					self.dataset.update_status("Max posts exceeded")
233					break
234				if self.api_limit_reached:
235					self.dataset.update_status("API limit reached")
236					break
237
238		# If we also want the posts that reblogged the fetched posts:
239		if fetch_reblogs and not self.max_posts_reached and not self.api_limit_reached:
240			self.dataset.update_status("Getting notes from all posts")
241
242			# Reblog information is already returned for blog-level searches
243			if scope == "blog":
244				text_reblogs = []
245
246				# Loop through and add the text reblogs that came with the results.
247				for post_notes in all_notes:
248					for post_note in post_notes:
249						for note in post_note:
250							if note["type"] == "reblog":
251								text_reblogs.append({note["blog_name"]: note["post_id"]})
252
253			# Retrieving notes for tag-based posts should be done one-by-one.
254			# Fetching them all at once is not supported by the Tumblr API.
255			elif scope == "tag":
256				# Prepare dicts to pass to `get_post_notes`
257				posts_to_fetch = {result["author"]: result["id"] for result in results}
258
259				# First extract the notes of each post, and only keep text reblogs
260				text_reblogs = self.get_post_notes(posts_to_fetch)
261
262			# Get the full data for text reblogs.
263			if text_reblogs:
264				connection_retries = 0
265				for i, text_reblog in enumerate(text_reblogs):
266					self.dataset.update_status("Got %i/%i text reblogs" % (i, len(text_reblogs)))
267					if connection_retries >= 5:
268						self.dataset.update_status("Multiple connection refused errors; unable to continue collection of reblogs.")
269						break
270					for key, value in text_reblog.items():
271						if connection_retries >= 5:
272							break
273						try:
274							reblog_post = self.get_post_by_id(key, value)
275						except ConnectionRefusedError:
276							connection_retries += 1
277							self.failed_reblogs.append(key)
278							self.dataset.update_status(f"ConnectionRefused: Unable to collect reblogs for post {key}")
279							continue
280						if reblog_post:
281							reblog_post = self.parse_tumblr_posts([reblog_post], reblog=True)
282							results.append(reblog_post[0])
283
284		self.job.finish()
285		return results
286
287	def get_posts_by_tag(self, tag, max_date=None, min_date=None):
288		"""
289		Get Tumblr posts posts with a certain tag
290		:param tag, str: the tag you want to look for
291		:param min_date: a unix timestamp, indicates posts should be min_date this date.
292		:param max_date: a unix timestamp, indicates posts should be max_date this date.
293
294		:returns: a dict created from the JSON response
295		"""
296		# Store all posts in here
297		all_posts = []
298
299		# Some retries to make sure the Tumblr API actually returns everything.
300		retries = 0
301		date_retries = 0
302
303		# We're gonna change max_date, so store a copy for reference.
304		max_date_original = max_date
305
306		# We use the average time difference between posts to spot possible gaps in the data.
307		all_time_difs = []
308		avg_time_dif = 0
309		time_difs_len = 0
310
311		# Get Tumblr posts until there's no more left.
312		while True:
313			if self.interrupted:
314				raise ProcessorInterruptedException("Interrupted while fetching tag posts from Tumblr")
315
316			# Stop after max for date reductions
317			if date_retries >= self.max_date_retries:
318				self.dataset.update_status("No more posts in this date range")
319				break
320
321			# Stop after max retries for API/connection stuff
322			if retries >= self.max_retries:
323				self.dataset.update_status("No more posts")
324				break
325
326			try:
327				# Use the pytumblr library to make the API call
328				posts = self.client.tagged(tag, before=max_date, limit=20, filter="raw")
329			except ConnectionError:
330				self.update_status("Encountered a connection error, waiting 10 seconds.")
331				time.sleep(10)
332				retries += 1
333				continue
334
335			# Get rid of posts that we already enountered,
336			# preventing Tumblr API shenanigans or double posts because of
337			# time reductions. Make sure it's no odd error string, though.
338			unseen_posts = []
339			for check_post in posts:
340				# Sometimes the API repsonds just with "meta", "response", or "errors".
341				if isinstance(check_post, str):
342					self.dataset.update_status("Couldn't add post:", check_post)
343					retries += 1
344					break
345				else:
346					retries = 0
347					if check_post["id"] not in self.seen_ids:
348						unseen_posts.append(check_post)
349			posts = unseen_posts
350
351			# For no clear reason, the Tumblr API sometimes provides posts with a higher timestamp than requested.
352			# So we have to prevent this manually.
353			if max_date_original:
354				posts = [post for post in posts if post["timestamp"] <= max_date_original]
355
356			max_date_str = datetime.fromtimestamp(max_date).strftime("%Y-%m-%d %H:%M:%S")
357
358			# except Exception as e:
359			# 	print(e)
360			# 	self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date))
361			# 	self.api_limit_reached = True
362			# 	break
363
364			# Make sure the Tumblr API doesn't magically stop at an earlier date
365			if not posts:
366
367				date_retries += 1
368
369				# We're first gonna check carefully if there's small timegaps by
370				# decreasing by six hours.
371				# If that didn't result in any new posts, also dedicate 12 date_retries
372				# with reductions of six months, just to be sure there's no data from
373				# years earlier missing.
374
375				if date_retries < 96:
376					max_date -= 21600 # Decrease by six hours
377					self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - decreasing time search with 6 hours to %s to make sure this is really it (retry %s/96)" % (str(len(all_posts)), tag, max_date_str, str(date_retries),))
378				elif date_retries <= self.max_date_retries:
379					max_date -= 604800 # Decrease by one week
380					retry_str = str(date_retries - 96)
381					self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - no new posts found with decreasing by 6 hours, decreasing with a week to %s instead (retry %s/150)" % (str(len(all_posts)), tag, max_date_str, str(retry_str),))
382
383				# We can stop when the max date drops below the min date.
384				if min_date:
385					if max_date <= min_date:
386						break
387
388				continue
389
390			# Append posts to main list
391			else:
392
393				posts = self.parse_tumblr_posts(posts)
394
395				# Get all timestamps and sort them.
396				post_dates = sorted([post["timestamp"] for post in posts])
397
398				# Get the lowest date and use it as the next "before" parameter.
399				max_date = post_dates[0]
400
401				# Tumblr's API is volatile - it doesn't neatly sort posts by date,
402				# so it can happen that there's suddenly huge jumps in time.
403				# Check if this is happening by extracting the difference between all consecutive dates.
404				time_difs = list()
405				post_dates.reverse()
406
407				for i, date in enumerate(post_dates):
408
409					if i == (len(post_dates) - 1):
410						break
411
412					# Calculate and add time differences
413					time_dif = date - post_dates[i + 1]
414
415					# After having collected 250 posts, check whether the time
416					# difference between posts far exceeds the average time difference
417					# between posts. If it's more than five times this amount,
418					# restart the query with the timestamp just before the gap, minus the
419					# average time difference up to this point - something might be up with Tumblr's API.
420					if len(all_posts) >= 250 and time_dif > (avg_time_dif * 5):
421
422						time_str = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M:%S")
423						self.dataset.update_status("Time difference of %s spotted, restarting query at %s" % (str(time_dif), time_str,))
424
425						self.seen_ids.update([post["id"] for post in posts])
426						posts = [post for post in posts if post["timestamp"] >= date]
427						if posts:
428							all_posts += posts
429
430						max_date = date
431						break
432
433					time_difs.append(time_dif)
434
435				# To start a new query
436				if not posts:
437					break
438
439				# Manually check if we have a lower date than the lowest allowed date already (min date).
440				# This functonality is not natively supported by Tumblr.
441				if min_date:
442					if max_date < min_date:
443
444						# Get rid of all the posts that are earlier than the max_date timestamp
445						posts = [post for post in posts if post["timestamp"] >= min_date and post["timestamp"] <= max_date_original]
446
447						if posts:
448							all_posts += posts
449							self.seen_ids.update([post["id"] for post in posts])
450						break
451
452				# We got a new post, so we can reset the retry counts.
453				date_retries = 0
454				retries = 0
455
456				# Add retrieved posts top the main list
457				all_posts += posts
458
459				# Add to seen ids
460				self.seen_ids.update([post["id"] for post in posts])
461
462				# Add time differences and calculate new average time difference
463				all_time_difs += time_difs
464
465				# Make the average time difference a moving average,
466				# to be flexible with faster and slower post paces.
467				# Delete the first 100 posts every hundred or so items.
468				if (len(all_time_difs) - time_difs_len) > 100:
469					all_time_difs = all_time_difs[time_difs_len:]
470				if all_time_difs:
471					time_difs_len = len(all_time_difs)
472					avg_time_dif = sum(all_time_difs) / len(all_time_difs)
473
474			if len(all_posts) >= self.max_posts:
475				self.max_posts_reached = True
476				break
477
478			self.dataset.update_status("Collected %s posts for tag %s, now looking for posts before %s" % (str(len(all_posts)), tag, max_date_str,))
479
480		return all_posts
481
482	def get_posts_by_blog(self, blog, max_date=None, min_date=None):
483		"""
484		Get Tumblr posts posts with a certain blog
485		:param tag, str: the name of the blog you want to look for
486		:param min_date: a unix timestamp, indicates posts should be min_date this date.
487	    :param max_date: a unix timestamp, indicates posts should be max_date this date.
488
489	    :returns: a dict created from the JSON response
490		"""
491		blog = blog + ".tumblr.com"
492
493		if not max_date:
494			max_date = int(time.time())
495
496		# Store all posts in here
497		all_posts = []
498
499		# Store notes here, if they exist and are requested
500		all_notes = []
501
502		# Some retries to make sure the Tumblr API actually returns everything
503		retries = 0
504		self.max_retries = 48 # 2 days
505
506		# Get Tumblr posts until there's no more left.
507		while True:
508			if self.interrupted:
509				raise ProcessorInterruptedException("Interrupted while fetching blog posts from Tumblr")
510
511			# Stop min_date 20 retries
512			if retries >= self.max_retries:
513				self.dataset.update_status("No more posts")
514				break
515
516			try:
517				# Use the pytumblr library to make the API call
518				posts = self.client.posts(blog, before=max_date, limit=20, reblog_info=True, notes_info=True, filter="raw")
519				posts = posts["posts"]
520
521				#if (max_date - posts[0]["timestamp"]) > 500000:
522					#self.dataset.update_status("ALERT - DATES LIKELY SKIPPED")
523					#self.dataset.update_status([post["timestamp"] for post in posts])
524
525			except Exception as e:
526
527				self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date))
528				self.api_limit_reached = True
529				break
530
531			# Make sure the Tumblr API doesn't magically stop at an earlier date
532			if not posts or isinstance(posts, str):
533				retries += 1
534				max_date -= 3600 # Decrease by an hour
535				self.dataset.update_status("No posts returned by Tumblr - checking whether this is really all (retry %s/48)" % str(retries))
536				continue
537
538			# Append posts to main list
539			else:
540				# Keep the notes, if so indicated
541				if self.parameters.get("fetch_reblogs"):
542					for post in posts:
543						if "notes" in post:
544							all_notes.append(post["notes"])
545
546				posts = self.parse_tumblr_posts(posts)
547
548				# Get the lowest date
549				max_date = sorted([post["timestamp"] for post in posts])[0]
550
551				# Manually check if we have a lower date than the min date (`min_date`) already.
552				# This functonality is not natively supported by Tumblr.
553				if min_date:
554					if max_date < min_date:
555
556						# Get rid of all the posts that are earlier than the max_date timestamp
557						posts = [post for post in posts if post["timestamp"] >= min_date]
558
559						if posts:
560							all_posts += posts
561						break
562
563				retries = 0
564
565				all_posts += posts
566
567				#if (max_date - posts[len(posts) - 1]["timestamp"]) > 500000:
568					#self.dataset.update_status("ALERT - DATES LIKELY SKIPPED")
569					#self.dataset.update_status([post["timestamp"] for post in posts])
570
571			if len(all_posts) >= self.max_posts:
572				self.max_posts_reached = True
573				break
574
575			self.dataset.update_status("Collected %s posts" % str(len(all_posts)))
576
577		return all_posts, all_notes
578
579	def get_post_notes(self, di_blogs_ids, only_text_reblogs=True):
580		"""
581		Gets the post notes.
582		:param di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values.
583		:param only_text_reblogs, bool: Whether to only keep notes that are text reblogs.
584		"""
585		# List of dict to get reblogs. Items are: [{"blog_name": post_id}]
586		text_reblogs = []
587
588		max_date = None
589
590		# Do some counting
591		len_blogs = len(di_blogs_ids)
592		count = 0
593
594		# Stop trying to fetch the notes after this many retries
595		max_notes_retries = 10
596		notes_retries = 0
597
598		for key, value in di_blogs_ids.items():
599
600			count += 1
601
602			if self.interrupted:
603				raise ProcessorInterruptedException("Interrupted while fetching post notes from Tumblr")
604
605			# First, get the blog names and post_ids from reblogs
606			# Keep digging till there's nothing left, or if we can fetch no new notes
607			while True:
608
609				# Requests a post's notes
610				notes = self.client.notes(key, id=value, before_timestamp=max_date)
611
612				if only_text_reblogs:
613
614					if "notes" in notes:
615						notes_retries = 0
616
617						for note in notes["notes"]:
618							# If it's a reblog, extract the data and save the rest of the posts for later
619							if note["type"] == "reblog":
620								if note.get("added_text"):
621									text_reblogs.append({note["blog_name"]: note["post_id"]})
622
623						if notes.get("_links"):
624							max_date = notes["_links"]["next"]["query_params"]["before_timestamp"]
625
626						# If there's no `_links` key, that's all.
627						else:
628							break
629
630					# If there's no "notes" key in the returned dict, something might be up
631					else:
632						self.dataset.update_status("Couldn't get notes for Tumblr request " + str(notes))
633						notes_retries += 1
634						pass
635
636					if notes_retries > max_notes_retries:
637						self.failed_notes.append(key)
638						break
639
640			self.dataset.update_status("Identified %i text reblogs in %i/%i notes" % (len(text_reblogs), count, len_blogs))
641
642		return text_reblogs
643
644	def get_post_by_id(self, blog_name, post_id):
645		"""
646		Fetch individual posts
647		:param blog_name, str: The blog's name
648		:param id, int: The post ID
649
650		returns result list, a list with a dictionary with the post's information
651		"""
652		if self.interrupted:
653			raise ProcessorInterruptedException("Interrupted while fetching post from Tumblr")
654
655		# Request the specific post.
656		post = self.client.posts(blog_name, id=post_id)
657
658		# Tumblr API can sometimes return with this kind of error:
659		# {'meta': {'status': 500, 'msg': 'Server Error'}, 'response': {'error': 'Malformed JSON or HTML was returned.'}}
660		if "posts" not in post:
661			return None
662
663		# Get the first element of the list - it's always one post.
664		result = post["posts"][0]
665
666		return result
667
668	@staticmethod
669	def get_tumbler_keys(user):
670		config_keys = [
671			config.get("api.tumblr.consumer_key", user=user),
672			config.get("api.tumblr.consumer_secret", user=user),
673			config.get("api.tumblr.key", user=user),
674			config.get("api.tumblr.secret_key", user=user)]
675		if not all(config_keys):
676			raise ConfigException("Not all Tumblr API credentials are configured. Cannot query Tumblr API.")
677		return config_keys
678
679	def connect_to_tumblr(self):
680		"""
681		Returns a connection to the Tumblr API using the pytumblr library.
682
683		"""
684		# User input keys
685		config_keys = [self.parameters.get("consumer_key"),
686			self.parameters.get("consumer_secret"),
687			self.parameters.get("key"),
688			self.parameters.get("secret_key")]
689		if not all(config_keys):
690			# No user input keys; attempt to use 4CAT config keys
691			config_keys = self.get_tumbler_keys(self.owner)
692
693		self.client = pytumblr.TumblrRestClient(*config_keys)
694
695		client_info = self.client.info()
696
697		# Check if there's any errors
698		if client_info.get("meta"):
699			if client_info["meta"].get("status") == 429:
700				raise ConnectionRefusedError("Tumblr API timed out")
701
702		return self.client
703
704	def validate_query(query, request, user):
705		"""
706		Validate custom data input
707
708		Confirms that the uploaded file is a valid CSV file and, if so, returns
709		some metadata.
710
711		:param dict query:  Query parameters, from client-side.
712		:param request:  	Flask request
713		:param User user:  	User object of user who has submitted the query
714		:return dict:  		Safe query parameters
715		"""
716		# no query 4 u
717		if not query.get("query", "").strip():
718			raise QueryParametersException("You must provide a search query.")
719
720		# reformat queries to be a comma-separated list
721		items = query.get("query").replace("#","")
722		items = items.split("\n")
723
724		# Not more than 10 plox
725		if len(items) > 10:
726			raise QueryParametersException("Only query for ten or less tags or blogs." + str(len(items)))
727
728		# no query 4 u
729		if not items:
730			raise QueryParametersException("Search query cannot be empty.")
731
732		# So it shows nicely in the frontend.
733		items = ", ".join([item.lstrip().rstrip() for item in items if item])
734
735		# the dates need to make sense as a range to search within
736		query["min_date"], query["max_date"] = query.get("daterange")
737		if any(query.get("daterange")) and not all(query.get("daterange")):
738			raise QueryParametersException("When providing a date range, set both an upper and lower limit.")
739
740		del query["daterange"]
741
742		query["query"] = items
743		query["board"] = query.get("search_scope") + "s"  # used in web interface
744
745		# if we made it this far, the query can be executed
746		return query
747
748	def parse_tumblr_posts(self, posts, reblog=False):
749		"""
750		Function to parse Tumblr posts into the same dict items.
751		Tumblr posts can be many different types, so some data processing is necessary.
752
753		:param posts, list: List of Tumblr posts as returned form the Tumblr API.
754		:param reblog, bool: Whether the post concerns a reblog of posts from the original dataset.
755
756		returns list processed_posts, a list with dictionary items of post info.
757		"""
758
759		# Store processed posts here
760		processed_posts = []
761
762		media_tags = ["photo", "video", "audio"]
763
764		# Loop through all the posts and write a row for each of them.
765		for post in posts:
766			post_type = post["type"]
767
768			# The post's text is in different keys depending on the post type
769			if post_type in media_tags:
770				text = post["caption"]
771			elif post_type == "link":
772				text = post["description"]
773			elif post_type == "text" or post_type == "chat":
774				text = post["body"]
775			elif post_type == "answer":
776				text = post["question"] + "\n" + post["answer"]
777			else:
778				text = ""
779
780			# Different options for video types (YouTube- or Tumblr-hosted)
781			if post_type == "video":
782
783				video_source = post["video_type"]
784				# Use `get` since some videos are deleted
785				video_url = post.get("permalink_url")
786
787				if video_source == "youtube":
788					# There's no URL if the YouTube video is deleted
789					if video_url:
790						video_id = post["video"]["youtube"]["video_id"]
791					else:
792						video_id = "deleted"
793				else:
794					video_id = "unknown"
795
796			else:
797				video_source = None
798				video_id = None
799				video_url = None
800
801			# All the fields to write
802			processed_post = {
803				# General columns
804				"type": post_type,
805				"timestamp": post["timestamp"],
806				"is_reblog": reblog,
807
808				# Blog columns
809				"author": post["blog_name"],
810				"subject": post["blog"]["title"],
811				"blog_description": post["blog"]["description"],
812				"blog_url": post["blog"]["url"],
813				"blog_uuid": post["blog"]["uuid"],
814				"blog_last_updated": post["blog"]["updated"],
815
816				# Post columns
817				"id": post["id"],
818				"post_url": post["post_url"],
819				"post_slug": post["slug"],
820				"thread_id": post["reblog_key"],
821				"body": text.replace("\x00", ""),
822				"tags": ", ".join(post["tags"]) if post.get("tags") else None,
823				"notes": post["note_count"],
824				"urls": post.get("link_url"),
825				"images": ",".join([photo["original_size"]["url"] for photo in post["photos"]]) if post.get("photos") else None,
826
827				# Optional video columns
828				"video_source": video_source if post_type == "video" else None,
829				"video_url": video_url if post_type == "video" else None,
830				"video_id": video_id if post_type == "video" else None,
831				"video_thumb": post.get("thumbnail_url"), # Can be deleted
832
833				# Optional audio columns
834				"audio_type": post.get("audio_type"),
835				"audio_url": post.get("audio_source_url"),
836				"audio_plays": post.get("plays"),
837
838				# Optional link columns
839				"link_author": post.get("link_author"),
840				"link_publisher": post.get("publisher"),
841				"link_image": post.get("link_image"),
842
843				# Optional answers columns
844				"asking_name": post.get("asking_name"),
845				"asking_url": post.get("asking_url"),
846				"question": post.get("question"),
847				"answer": post.get("answer"),
848
849				# Optional chat columns
850				"chat": post.get("dialogue")
851			}
852
853			# Store the processed post
854			processed_posts.append(processed_post)
855
856		return processed_posts
857
858	def after_process(self):
859		"""
860		Override of the same function in processor.py
861		Used to notify of potential API errors.
862
863		"""
864		super().after_process()
865		self.client = None
866		errors = []
867		if len(self.failed_notes) > 0:
868			errors.append("API error(s) when fetching notes %s" % ", ".join(self.failed_notes))
869		if len(self.failed_reblogs) > 0:
870			errors.append("API error(s) when fetching reblogs %s" % ", ".join(self.failed_reblogs))
871		if errors:
872			self.dataset.log(";\n ".join(errors))
873			self.dataset.update_status(f"Dataset completed but failed to capture some notes/reblogs; see log for details.")
class SearchTumblr(backend.lib.search.Search):
 23class SearchTumblr(Search):
 24	"""
 25	Tumblr data filter module.
 26	"""
 27	type = "tumblr-search"  # job ID
 28	category = "Search"  # category
 29	title = "Search Tumblr"  # title displayed in UI
 30	description = "Retrieve Tumblr posts by hashtag or blog."  # description displayed in UI
 31	extension = "csv"  # extension of result file, used internally and in UI
 32	is_local = False	# Whether this datasource is locally scraped
 33	is_static = False	# Whether this datasource is still updated
 34
 35	# not available as a processor for existing datasets
 36	accepts = [None]
 37
 38	max_workers = 1
 39	max_retries = 3 # For API and connection retries.
 40	max_date_retries = 96 + 150 # For checking dates. 96 time retries of -6 hours (24 days), plus 150 extra for 150 weeks (~3 years).
 41	max_posts = 1000000
 42
 43	max_posts_reached = False
 44	api_limit_reached = False
 45
 46	seen_ids = set()
 47	client = None
 48	failed_notes = []
 49	failed_reblogs = []
 50
 51	config = {
 52		# Tumblr API keys to use for data capturing
 53		'api.tumblr.consumer_key': {
 54			'type': UserInput.OPTION_TEXT,
 55			'default': "",
 56			'help': 'Tumblr Consumer Key',
 57			'tooltip': "",
 58		},
 59		'api.tumblr.consumer_secret': {
 60			'type': UserInput.OPTION_TEXT,
 61			'default': "",
 62			'help': 'Tumblr Consumer Secret Key',
 63			'tooltip': "",
 64		},
 65		'api.tumblr.key': {
 66			'type': UserInput.OPTION_TEXT,
 67			'default': "",
 68			'help': 'Tumblr API Key',
 69			'tooltip': "",
 70		},
 71		'api.tumblr.secret_key': {
 72			'type': UserInput.OPTION_TEXT,
 73			'default': "",
 74			'help': 'Tumblr API Secret Key',
 75			'tooltip': "",
 76		},
 77	}
 78	references = ["[Tumblr API documentation](https://www.tumblr.com/docs/en/api/v2)"]
 79
 80	@classmethod
 81	def get_options(cls, parent_dataset=None, user=None):
 82		"""
 83		Check is Tumbler keys configured and if not, requests from User
 84		"""
 85		options = {
 86			"intro": {
 87				"type": UserInput.OPTION_INFO,
 88				"help": "Retrieve any kind of Tumblr posts with specific tags or from specific blogs. Gets 100.000 posts "
 89						"at max. Insert tags or names of blogs, one on each line. You may insert up to ten tags or "
 90						"blogs.\n\nTumblr tags may include whitespace and commas. A `#` before the tag is optional.\n\n"
 91						"Tag search only get posts explicitly associated with the exact tag you insert here. Querying "
 92						"`gogh` will thus not get posts only tagged with `van gogh`. Keyword search is unfortunately not "
 93						"allowed by the [Tumblr API](https://api.tumblr.com).\n\nIf 4CAT reached its Tumblr API rate "
 94						"limit, try again 24 hours later."
 95			},
 96			"search_scope": {
 97				"type": UserInput.OPTION_CHOICE,
 98				"help": "Search by",
 99				"options": {
100					"tag": "Tag",
101					"blog": "Blog"
102				},
103				"default": "tag"
104			},
105			"query": {
106				"type": UserInput.OPTION_TEXT_LARGE,
107				"help": "Tags/blogs",
108				"tooltip": "Separate with commas or new lines."
109			},
110			"fetch_reblogs": {
111				"type": UserInput.OPTION_TOGGLE,
112				"help": "Also fetch reblogs with text? (warning: slow)",
113				"default": False
114			}
115		}
116
117		try:
118			config_keys = SearchTumblr.get_tumbler_keys(user)
119		except ConfigException:
120			# No 4CAT set keys for user; let user input their own
121			options["key-info"] = {
122				"type": UserInput.OPTION_INFO,
123				"help": "In order to access the Tumblr API, you need to register an application. You can do so "
124						"[here](https://www.tumblr.com/oauth/apps) and use the keys below. You will first get the OAuth "
125						"Consumer Key and Secret, and then the User Token Key and Secret [after entering them here](ht"
126									  "tps://api.tumblr.com/console/calls/user/info) and granting access."
127			}
128			options["consumer_key"] = {
129				"type": UserInput.OPTION_TEXT,
130				"sensitive": True,
131				"cache": True,
132				"help": "OAuth Consumer Key"
133			}
134			options["consumer_secret"] = {
135				"type": UserInput.OPTION_TEXT,
136				"sensitive": True,
137				"cache": True,
138				"help": "OAuth Consumer Secret"
139			}
140			options["key"] = {
141				"type": UserInput.OPTION_TEXT,
142				"sensitive": True,
143				"cache": True,
144				"help": "User Token Key"
145			}
146			options["secret_key"] = {
147				"type": UserInput.OPTION_TEXT,
148				"sensitive": True,
149				"cache": True,
150				"help": "User Token Secret"
151			}
152
153		options["divider"] = {
154				"type": UserInput.OPTION_DIVIDER
155			}
156		options["date-intro"] = {
157				"type": UserInput.OPTION_INFO,
158				"help": "**Note:** The [Tumblr API](https://api.tumblr.com) is volatile: when fetching sporadically used "
159						"tags, it may return zero posts, even though older posts exist. To mitigate this, 4CAT decreases "
160						"the date parameter (<code>before</code>) with six hours and sends the query again. This often "
161						"successfully returns older, un-fetched posts. If it didn't find new data after 96 retries (24 "
162						"days), it checks for data up to six years before the last date, decreasing 12 times by 6 months. "
163						"If that also results in nothing, it assumes the dataset is complete. Check the oldest post in "
164						"your dataset to see if it this is indeed the case and whether any odd time gaps exists."
165			}
166		options["daterange"] = {
167				"type": UserInput.OPTION_DATERANGE,
168				"help": "Date range"
169			}
170
171		return options
172
173	def get_items(self, query):
174		"""
175		Fetches data from Tumblr via its API.
176
177		"""
178
179		# ready our parameters
180		parameters = self.dataset.get_parameters()
181		scope = parameters.get("search_scope", "")
182		queries = parameters.get("query").split(", ")
183		fetch_reblogs = parameters.get("fetch_reblogs", False)
184
185		# Store all info here
186		results = []
187
188		# Store all notes from posts by blogs here
189		all_notes = []
190
191		# Get date parameters
192		min_date = parameters.get("min_date", None)
193		max_date = parameters.get("max_date", None)
194
195		if min_date:
196			min_date = int(min_date)
197		if max_date:
198			max_date = int(max_date)
199		else:
200			max_date = int(time.time())
201
202		# Connect to Tumblr API
203		try:
204			self.client = self.connect_to_tumblr()
205		except ConfigException as e:
206			self.log.warning(f"Could not connect to Tumblr API: API keys invalid or not set")
207			self.dataset.finish_with_error(f"Could not connect to Tumblr API: API keys invalid or not set")
208			return
209		except ConnectionRefusedError as e:
210			client_info = self.client.info()
211			self.log.warning(f"Could not connect to Tumblr API: {e}; client_info: {client_info}")
212			self.dataset.finish_with_error(f"Could not connect to Tumblr API: {client_info.get('meta', {}).get('status', '')} - {client_info.get('meta', {}).get('msg', '')}")
213			return
214
215		# for each tag or blog, get post
216		for query in queries:
217
218				# Get posts per tag
219				if scope == "tag":
220					new_results = self.get_posts_by_tag(query, max_date=max_date, min_date=min_date)
221
222				# Get posts per blog
223				elif scope == "blog":
224					new_results, notes = self.get_posts_by_blog(query, max_date=max_date, min_date=min_date)
225					all_notes.append(notes)
226				else:
227					self.dataset.update_status("Invalid scope")
228					break
229
230				results += new_results
231
232				if self.max_posts_reached:
233					self.dataset.update_status("Max posts exceeded")
234					break
235				if self.api_limit_reached:
236					self.dataset.update_status("API limit reached")
237					break
238
239		# If we also want the posts that reblogged the fetched posts:
240		if fetch_reblogs and not self.max_posts_reached and not self.api_limit_reached:
241			self.dataset.update_status("Getting notes from all posts")
242
243			# Reblog information is already returned for blog-level searches
244			if scope == "blog":
245				text_reblogs = []
246
247				# Loop through and add the text reblogs that came with the results.
248				for post_notes in all_notes:
249					for post_note in post_notes:
250						for note in post_note:
251							if note["type"] == "reblog":
252								text_reblogs.append({note["blog_name"]: note["post_id"]})
253
254			# Retrieving notes for tag-based posts should be done one-by-one.
255			# Fetching them all at once is not supported by the Tumblr API.
256			elif scope == "tag":
257				# Prepare dicts to pass to `get_post_notes`
258				posts_to_fetch = {result["author"]: result["id"] for result in results}
259
260				# First extract the notes of each post, and only keep text reblogs
261				text_reblogs = self.get_post_notes(posts_to_fetch)
262
263			# Get the full data for text reblogs.
264			if text_reblogs:
265				connection_retries = 0
266				for i, text_reblog in enumerate(text_reblogs):
267					self.dataset.update_status("Got %i/%i text reblogs" % (i, len(text_reblogs)))
268					if connection_retries >= 5:
269						self.dataset.update_status("Multiple connection refused errors; unable to continue collection of reblogs.")
270						break
271					for key, value in text_reblog.items():
272						if connection_retries >= 5:
273							break
274						try:
275							reblog_post = self.get_post_by_id(key, value)
276						except ConnectionRefusedError:
277							connection_retries += 1
278							self.failed_reblogs.append(key)
279							self.dataset.update_status(f"ConnectionRefused: Unable to collect reblogs for post {key}")
280							continue
281						if reblog_post:
282							reblog_post = self.parse_tumblr_posts([reblog_post], reblog=True)
283							results.append(reblog_post[0])
284
285		self.job.finish()
286		return results
287
288	def get_posts_by_tag(self, tag, max_date=None, min_date=None):
289		"""
290		Get Tumblr posts posts with a certain tag
291		:param tag, str: the tag you want to look for
292		:param min_date: a unix timestamp, indicates posts should be min_date this date.
293		:param max_date: a unix timestamp, indicates posts should be max_date this date.
294
295		:returns: a dict created from the JSON response
296		"""
297		# Store all posts in here
298		all_posts = []
299
300		# Some retries to make sure the Tumblr API actually returns everything.
301		retries = 0
302		date_retries = 0
303
304		# We're gonna change max_date, so store a copy for reference.
305		max_date_original = max_date
306
307		# We use the average time difference between posts to spot possible gaps in the data.
308		all_time_difs = []
309		avg_time_dif = 0
310		time_difs_len = 0
311
312		# Get Tumblr posts until there's no more left.
313		while True:
314			if self.interrupted:
315				raise ProcessorInterruptedException("Interrupted while fetching tag posts from Tumblr")
316
317			# Stop after max for date reductions
318			if date_retries >= self.max_date_retries:
319				self.dataset.update_status("No more posts in this date range")
320				break
321
322			# Stop after max retries for API/connection stuff
323			if retries >= self.max_retries:
324				self.dataset.update_status("No more posts")
325				break
326
327			try:
328				# Use the pytumblr library to make the API call
329				posts = self.client.tagged(tag, before=max_date, limit=20, filter="raw")
330			except ConnectionError:
331				self.update_status("Encountered a connection error, waiting 10 seconds.")
332				time.sleep(10)
333				retries += 1
334				continue
335
336			# Get rid of posts that we already enountered,
337			# preventing Tumblr API shenanigans or double posts because of
338			# time reductions. Make sure it's no odd error string, though.
339			unseen_posts = []
340			for check_post in posts:
341				# Sometimes the API repsonds just with "meta", "response", or "errors".
342				if isinstance(check_post, str):
343					self.dataset.update_status("Couldn't add post:", check_post)
344					retries += 1
345					break
346				else:
347					retries = 0
348					if check_post["id"] not in self.seen_ids:
349						unseen_posts.append(check_post)
350			posts = unseen_posts
351
352			# For no clear reason, the Tumblr API sometimes provides posts with a higher timestamp than requested.
353			# So we have to prevent this manually.
354			if max_date_original:
355				posts = [post for post in posts if post["timestamp"] <= max_date_original]
356
357			max_date_str = datetime.fromtimestamp(max_date).strftime("%Y-%m-%d %H:%M:%S")
358
359			# except Exception as e:
360			# 	print(e)
361			# 	self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date))
362			# 	self.api_limit_reached = True
363			# 	break
364
365			# Make sure the Tumblr API doesn't magically stop at an earlier date
366			if not posts:
367
368				date_retries += 1
369
370				# We're first gonna check carefully if there's small timegaps by
371				# decreasing by six hours.
372				# If that didn't result in any new posts, also dedicate 12 date_retries
373				# with reductions of six months, just to be sure there's no data from
374				# years earlier missing.
375
376				if date_retries < 96:
377					max_date -= 21600 # Decrease by six hours
378					self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - decreasing time search with 6 hours to %s to make sure this is really it (retry %s/96)" % (str(len(all_posts)), tag, max_date_str, str(date_retries),))
379				elif date_retries <= self.max_date_retries:
380					max_date -= 604800 # Decrease by one week
381					retry_str = str(date_retries - 96)
382					self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - no new posts found with decreasing by 6 hours, decreasing with a week to %s instead (retry %s/150)" % (str(len(all_posts)), tag, max_date_str, str(retry_str),))
383
384				# We can stop when the max date drops below the min date.
385				if min_date:
386					if max_date <= min_date:
387						break
388
389				continue
390
391			# Append posts to main list
392			else:
393
394				posts = self.parse_tumblr_posts(posts)
395
396				# Get all timestamps and sort them.
397				post_dates = sorted([post["timestamp"] for post in posts])
398
399				# Get the lowest date and use it as the next "before" parameter.
400				max_date = post_dates[0]
401
402				# Tumblr's API is volatile - it doesn't neatly sort posts by date,
403				# so it can happen that there's suddenly huge jumps in time.
404				# Check if this is happening by extracting the difference between all consecutive dates.
405				time_difs = list()
406				post_dates.reverse()
407
408				for i, date in enumerate(post_dates):
409
410					if i == (len(post_dates) - 1):
411						break
412
413					# Calculate and add time differences
414					time_dif = date - post_dates[i + 1]
415
416					# After having collected 250 posts, check whether the time
417					# difference between posts far exceeds the average time difference
418					# between posts. If it's more than five times this amount,
419					# restart the query with the timestamp just before the gap, minus the
420					# average time difference up to this point - something might be up with Tumblr's API.
421					if len(all_posts) >= 250 and time_dif > (avg_time_dif * 5):
422
423						time_str = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M:%S")
424						self.dataset.update_status("Time difference of %s spotted, restarting query at %s" % (str(time_dif), time_str,))
425
426						self.seen_ids.update([post["id"] for post in posts])
427						posts = [post for post in posts if post["timestamp"] >= date]
428						if posts:
429							all_posts += posts
430
431						max_date = date
432						break
433
434					time_difs.append(time_dif)
435
436				# To start a new query
437				if not posts:
438					break
439
440				# Manually check if we have a lower date than the lowest allowed date already (min date).
441				# This functonality is not natively supported by Tumblr.
442				if min_date:
443					if max_date < min_date:
444
445						# Get rid of all the posts that are earlier than the max_date timestamp
446						posts = [post for post in posts if post["timestamp"] >= min_date and post["timestamp"] <= max_date_original]
447
448						if posts:
449							all_posts += posts
450							self.seen_ids.update([post["id"] for post in posts])
451						break
452
453				# We got a new post, so we can reset the retry counts.
454				date_retries = 0
455				retries = 0
456
457				# Add retrieved posts top the main list
458				all_posts += posts
459
460				# Add to seen ids
461				self.seen_ids.update([post["id"] for post in posts])
462
463				# Add time differences and calculate new average time difference
464				all_time_difs += time_difs
465
466				# Make the average time difference a moving average,
467				# to be flexible with faster and slower post paces.
468				# Delete the first 100 posts every hundred or so items.
469				if (len(all_time_difs) - time_difs_len) > 100:
470					all_time_difs = all_time_difs[time_difs_len:]
471				if all_time_difs:
472					time_difs_len = len(all_time_difs)
473					avg_time_dif = sum(all_time_difs) / len(all_time_difs)
474
475			if len(all_posts) >= self.max_posts:
476				self.max_posts_reached = True
477				break
478
479			self.dataset.update_status("Collected %s posts for tag %s, now looking for posts before %s" % (str(len(all_posts)), tag, max_date_str,))
480
481		return all_posts
482
483	def get_posts_by_blog(self, blog, max_date=None, min_date=None):
484		"""
485		Get Tumblr posts posts with a certain blog
486		:param tag, str: the name of the blog you want to look for
487		:param min_date: a unix timestamp, indicates posts should be min_date this date.
488	    :param max_date: a unix timestamp, indicates posts should be max_date this date.
489
490	    :returns: a dict created from the JSON response
491		"""
492		blog = blog + ".tumblr.com"
493
494		if not max_date:
495			max_date = int(time.time())
496
497		# Store all posts in here
498		all_posts = []
499
500		# Store notes here, if they exist and are requested
501		all_notes = []
502
503		# Some retries to make sure the Tumblr API actually returns everything
504		retries = 0
505		self.max_retries = 48 # 2 days
506
507		# Get Tumblr posts until there's no more left.
508		while True:
509			if self.interrupted:
510				raise ProcessorInterruptedException("Interrupted while fetching blog posts from Tumblr")
511
512			# Stop min_date 20 retries
513			if retries >= self.max_retries:
514				self.dataset.update_status("No more posts")
515				break
516
517			try:
518				# Use the pytumblr library to make the API call
519				posts = self.client.posts(blog, before=max_date, limit=20, reblog_info=True, notes_info=True, filter="raw")
520				posts = posts["posts"]
521
522				#if (max_date - posts[0]["timestamp"]) > 500000:
523					#self.dataset.update_status("ALERT - DATES LIKELY SKIPPED")
524					#self.dataset.update_status([post["timestamp"] for post in posts])
525
526			except Exception as e:
527
528				self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date))
529				self.api_limit_reached = True
530				break
531
532			# Make sure the Tumblr API doesn't magically stop at an earlier date
533			if not posts or isinstance(posts, str):
534				retries += 1
535				max_date -= 3600 # Decrease by an hour
536				self.dataset.update_status("No posts returned by Tumblr - checking whether this is really all (retry %s/48)" % str(retries))
537				continue
538
539			# Append posts to main list
540			else:
541				# Keep the notes, if so indicated
542				if self.parameters.get("fetch_reblogs"):
543					for post in posts:
544						if "notes" in post:
545							all_notes.append(post["notes"])
546
547				posts = self.parse_tumblr_posts(posts)
548
549				# Get the lowest date
550				max_date = sorted([post["timestamp"] for post in posts])[0]
551
552				# Manually check if we have a lower date than the min date (`min_date`) already.
553				# This functonality is not natively supported by Tumblr.
554				if min_date:
555					if max_date < min_date:
556
557						# Get rid of all the posts that are earlier than the max_date timestamp
558						posts = [post for post in posts if post["timestamp"] >= min_date]
559
560						if posts:
561							all_posts += posts
562						break
563
564				retries = 0
565
566				all_posts += posts
567
568				#if (max_date - posts[len(posts) - 1]["timestamp"]) > 500000:
569					#self.dataset.update_status("ALERT - DATES LIKELY SKIPPED")
570					#self.dataset.update_status([post["timestamp"] for post in posts])
571
572			if len(all_posts) >= self.max_posts:
573				self.max_posts_reached = True
574				break
575
576			self.dataset.update_status("Collected %s posts" % str(len(all_posts)))
577
578		return all_posts, all_notes
579
580	def get_post_notes(self, di_blogs_ids, only_text_reblogs=True):
581		"""
582		Gets the post notes.
583		:param di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values.
584		:param only_text_reblogs, bool: Whether to only keep notes that are text reblogs.
585		"""
586		# List of dict to get reblogs. Items are: [{"blog_name": post_id}]
587		text_reblogs = []
588
589		max_date = None
590
591		# Do some counting
592		len_blogs = len(di_blogs_ids)
593		count = 0
594
595		# Stop trying to fetch the notes after this many retries
596		max_notes_retries = 10
597		notes_retries = 0
598
599		for key, value in di_blogs_ids.items():
600
601			count += 1
602
603			if self.interrupted:
604				raise ProcessorInterruptedException("Interrupted while fetching post notes from Tumblr")
605
606			# First, get the blog names and post_ids from reblogs
607			# Keep digging till there's nothing left, or if we can fetch no new notes
608			while True:
609
610				# Requests a post's notes
611				notes = self.client.notes(key, id=value, before_timestamp=max_date)
612
613				if only_text_reblogs:
614
615					if "notes" in notes:
616						notes_retries = 0
617
618						for note in notes["notes"]:
619							# If it's a reblog, extract the data and save the rest of the posts for later
620							if note["type"] == "reblog":
621								if note.get("added_text"):
622									text_reblogs.append({note["blog_name"]: note["post_id"]})
623
624						if notes.get("_links"):
625							max_date = notes["_links"]["next"]["query_params"]["before_timestamp"]
626
627						# If there's no `_links` key, that's all.
628						else:
629							break
630
631					# If there's no "notes" key in the returned dict, something might be up
632					else:
633						self.dataset.update_status("Couldn't get notes for Tumblr request " + str(notes))
634						notes_retries += 1
635						pass
636
637					if notes_retries > max_notes_retries:
638						self.failed_notes.append(key)
639						break
640
641			self.dataset.update_status("Identified %i text reblogs in %i/%i notes" % (len(text_reblogs), count, len_blogs))
642
643		return text_reblogs
644
645	def get_post_by_id(self, blog_name, post_id):
646		"""
647		Fetch individual posts
648		:param blog_name, str: The blog's name
649		:param id, int: The post ID
650
651		returns result list, a list with a dictionary with the post's information
652		"""
653		if self.interrupted:
654			raise ProcessorInterruptedException("Interrupted while fetching post from Tumblr")
655
656		# Request the specific post.
657		post = self.client.posts(blog_name, id=post_id)
658
659		# Tumblr API can sometimes return with this kind of error:
660		# {'meta': {'status': 500, 'msg': 'Server Error'}, 'response': {'error': 'Malformed JSON or HTML was returned.'}}
661		if "posts" not in post:
662			return None
663
664		# Get the first element of the list - it's always one post.
665		result = post["posts"][0]
666
667		return result
668
669	@staticmethod
670	def get_tumbler_keys(user):
671		config_keys = [
672			config.get("api.tumblr.consumer_key", user=user),
673			config.get("api.tumblr.consumer_secret", user=user),
674			config.get("api.tumblr.key", user=user),
675			config.get("api.tumblr.secret_key", user=user)]
676		if not all(config_keys):
677			raise ConfigException("Not all Tumblr API credentials are configured. Cannot query Tumblr API.")
678		return config_keys
679
680	def connect_to_tumblr(self):
681		"""
682		Returns a connection to the Tumblr API using the pytumblr library.
683
684		"""
685		# User input keys
686		config_keys = [self.parameters.get("consumer_key"),
687			self.parameters.get("consumer_secret"),
688			self.parameters.get("key"),
689			self.parameters.get("secret_key")]
690		if not all(config_keys):
691			# No user input keys; attempt to use 4CAT config keys
692			config_keys = self.get_tumbler_keys(self.owner)
693
694		self.client = pytumblr.TumblrRestClient(*config_keys)
695
696		client_info = self.client.info()
697
698		# Check if there's any errors
699		if client_info.get("meta"):
700			if client_info["meta"].get("status") == 429:
701				raise ConnectionRefusedError("Tumblr API timed out")
702
703		return self.client
704
705	def validate_query(query, request, user):
706		"""
707		Validate custom data input
708
709		Confirms that the uploaded file is a valid CSV file and, if so, returns
710		some metadata.
711
712		:param dict query:  Query parameters, from client-side.
713		:param request:  	Flask request
714		:param User user:  	User object of user who has submitted the query
715		:return dict:  		Safe query parameters
716		"""
717		# no query 4 u
718		if not query.get("query", "").strip():
719			raise QueryParametersException("You must provide a search query.")
720
721		# reformat queries to be a comma-separated list
722		items = query.get("query").replace("#","")
723		items = items.split("\n")
724
725		# Not more than 10 plox
726		if len(items) > 10:
727			raise QueryParametersException("Only query for ten or less tags or blogs." + str(len(items)))
728
729		# no query 4 u
730		if not items:
731			raise QueryParametersException("Search query cannot be empty.")
732
733		# So it shows nicely in the frontend.
734		items = ", ".join([item.lstrip().rstrip() for item in items if item])
735
736		# the dates need to make sense as a range to search within
737		query["min_date"], query["max_date"] = query.get("daterange")
738		if any(query.get("daterange")) and not all(query.get("daterange")):
739			raise QueryParametersException("When providing a date range, set both an upper and lower limit.")
740
741		del query["daterange"]
742
743		query["query"] = items
744		query["board"] = query.get("search_scope") + "s"  # used in web interface
745
746		# if we made it this far, the query can be executed
747		return query
748
749	def parse_tumblr_posts(self, posts, reblog=False):
750		"""
751		Function to parse Tumblr posts into the same dict items.
752		Tumblr posts can be many different types, so some data processing is necessary.
753
754		:param posts, list: List of Tumblr posts as returned form the Tumblr API.
755		:param reblog, bool: Whether the post concerns a reblog of posts from the original dataset.
756
757		returns list processed_posts, a list with dictionary items of post info.
758		"""
759
760		# Store processed posts here
761		processed_posts = []
762
763		media_tags = ["photo", "video", "audio"]
764
765		# Loop through all the posts and write a row for each of them.
766		for post in posts:
767			post_type = post["type"]
768
769			# The post's text is in different keys depending on the post type
770			if post_type in media_tags:
771				text = post["caption"]
772			elif post_type == "link":
773				text = post["description"]
774			elif post_type == "text" or post_type == "chat":
775				text = post["body"]
776			elif post_type == "answer":
777				text = post["question"] + "\n" + post["answer"]
778			else:
779				text = ""
780
781			# Different options for video types (YouTube- or Tumblr-hosted)
782			if post_type == "video":
783
784				video_source = post["video_type"]
785				# Use `get` since some videos are deleted
786				video_url = post.get("permalink_url")
787
788				if video_source == "youtube":
789					# There's no URL if the YouTube video is deleted
790					if video_url:
791						video_id = post["video"]["youtube"]["video_id"]
792					else:
793						video_id = "deleted"
794				else:
795					video_id = "unknown"
796
797			else:
798				video_source = None
799				video_id = None
800				video_url = None
801
802			# All the fields to write
803			processed_post = {
804				# General columns
805				"type": post_type,
806				"timestamp": post["timestamp"],
807				"is_reblog": reblog,
808
809				# Blog columns
810				"author": post["blog_name"],
811				"subject": post["blog"]["title"],
812				"blog_description": post["blog"]["description"],
813				"blog_url": post["blog"]["url"],
814				"blog_uuid": post["blog"]["uuid"],
815				"blog_last_updated": post["blog"]["updated"],
816
817				# Post columns
818				"id": post["id"],
819				"post_url": post["post_url"],
820				"post_slug": post["slug"],
821				"thread_id": post["reblog_key"],
822				"body": text.replace("\x00", ""),
823				"tags": ", ".join(post["tags"]) if post.get("tags") else None,
824				"notes": post["note_count"],
825				"urls": post.get("link_url"),
826				"images": ",".join([photo["original_size"]["url"] for photo in post["photos"]]) if post.get("photos") else None,
827
828				# Optional video columns
829				"video_source": video_source if post_type == "video" else None,
830				"video_url": video_url if post_type == "video" else None,
831				"video_id": video_id if post_type == "video" else None,
832				"video_thumb": post.get("thumbnail_url"), # Can be deleted
833
834				# Optional audio columns
835				"audio_type": post.get("audio_type"),
836				"audio_url": post.get("audio_source_url"),
837				"audio_plays": post.get("plays"),
838
839				# Optional link columns
840				"link_author": post.get("link_author"),
841				"link_publisher": post.get("publisher"),
842				"link_image": post.get("link_image"),
843
844				# Optional answers columns
845				"asking_name": post.get("asking_name"),
846				"asking_url": post.get("asking_url"),
847				"question": post.get("question"),
848				"answer": post.get("answer"),
849
850				# Optional chat columns
851				"chat": post.get("dialogue")
852			}
853
854			# Store the processed post
855			processed_posts.append(processed_post)
856
857		return processed_posts
858
859	def after_process(self):
860		"""
861		Override of the same function in processor.py
862		Used to notify of potential API errors.
863
864		"""
865		super().after_process()
866		self.client = None
867		errors = []
868		if len(self.failed_notes) > 0:
869			errors.append("API error(s) when fetching notes %s" % ", ".join(self.failed_notes))
870		if len(self.failed_reblogs) > 0:
871			errors.append("API error(s) when fetching reblogs %s" % ", ".join(self.failed_reblogs))
872		if errors:
873			self.dataset.log(";\n ".join(errors))
874			self.dataset.update_status(f"Dataset completed but failed to capture some notes/reblogs; see log for details.")

Tumblr data filter module.

type = 'tumblr-search'
category = 'Search'
title = 'Search Tumblr'
description = 'Retrieve Tumblr posts by hashtag or blog.'
extension = 'csv'
is_local = False
is_static = False
accepts = [None]
max_workers = 1
max_retries = 3
max_date_retries = 246
max_posts = 1000000
max_posts_reached = False
api_limit_reached = False
seen_ids = set()
client = None
failed_notes = []
failed_reblogs = []
config = {'api.tumblr.consumer_key': {'type': 'string', 'default': '', 'help': 'Tumblr Consumer Key', 'tooltip': ''}, 'api.tumblr.consumer_secret': {'type': 'string', 'default': '', 'help': 'Tumblr Consumer Secret Key', 'tooltip': ''}, 'api.tumblr.key': {'type': 'string', 'default': '', 'help': 'Tumblr API Key', 'tooltip': ''}, 'api.tumblr.secret_key': {'type': 'string', 'default': '', 'help': 'Tumblr API Secret Key', 'tooltip': ''}}
references = ['[Tumblr API documentation](https://www.tumblr.com/docs/en/api/v2)']
@classmethod
def get_options(cls, parent_dataset=None, user=None):
 80	@classmethod
 81	def get_options(cls, parent_dataset=None, user=None):
 82		"""
 83		Check is Tumbler keys configured and if not, requests from User
 84		"""
 85		options = {
 86			"intro": {
 87				"type": UserInput.OPTION_INFO,
 88				"help": "Retrieve any kind of Tumblr posts with specific tags or from specific blogs. Gets 100.000 posts "
 89						"at max. Insert tags or names of blogs, one on each line. You may insert up to ten tags or "
 90						"blogs.\n\nTumblr tags may include whitespace and commas. A `#` before the tag is optional.\n\n"
 91						"Tag search only get posts explicitly associated with the exact tag you insert here. Querying "
 92						"`gogh` will thus not get posts only tagged with `van gogh`. Keyword search is unfortunately not "
 93						"allowed by the [Tumblr API](https://api.tumblr.com).\n\nIf 4CAT reached its Tumblr API rate "
 94						"limit, try again 24 hours later."
 95			},
 96			"search_scope": {
 97				"type": UserInput.OPTION_CHOICE,
 98				"help": "Search by",
 99				"options": {
100					"tag": "Tag",
101					"blog": "Blog"
102				},
103				"default": "tag"
104			},
105			"query": {
106				"type": UserInput.OPTION_TEXT_LARGE,
107				"help": "Tags/blogs",
108				"tooltip": "Separate with commas or new lines."
109			},
110			"fetch_reblogs": {
111				"type": UserInput.OPTION_TOGGLE,
112				"help": "Also fetch reblogs with text? (warning: slow)",
113				"default": False
114			}
115		}
116
117		try:
118			config_keys = SearchTumblr.get_tumbler_keys(user)
119		except ConfigException:
120			# No 4CAT set keys for user; let user input their own
121			options["key-info"] = {
122				"type": UserInput.OPTION_INFO,
123				"help": "In order to access the Tumblr API, you need to register an application. You can do so "
124						"[here](https://www.tumblr.com/oauth/apps) and use the keys below. You will first get the OAuth "
125						"Consumer Key and Secret, and then the User Token Key and Secret [after entering them here](ht"
126									  "tps://api.tumblr.com/console/calls/user/info) and granting access."
127			}
128			options["consumer_key"] = {
129				"type": UserInput.OPTION_TEXT,
130				"sensitive": True,
131				"cache": True,
132				"help": "OAuth Consumer Key"
133			}
134			options["consumer_secret"] = {
135				"type": UserInput.OPTION_TEXT,
136				"sensitive": True,
137				"cache": True,
138				"help": "OAuth Consumer Secret"
139			}
140			options["key"] = {
141				"type": UserInput.OPTION_TEXT,
142				"sensitive": True,
143				"cache": True,
144				"help": "User Token Key"
145			}
146			options["secret_key"] = {
147				"type": UserInput.OPTION_TEXT,
148				"sensitive": True,
149				"cache": True,
150				"help": "User Token Secret"
151			}
152
153		options["divider"] = {
154				"type": UserInput.OPTION_DIVIDER
155			}
156		options["date-intro"] = {
157				"type": UserInput.OPTION_INFO,
158				"help": "**Note:** The [Tumblr API](https://api.tumblr.com) is volatile: when fetching sporadically used "
159						"tags, it may return zero posts, even though older posts exist. To mitigate this, 4CAT decreases "
160						"the date parameter (<code>before</code>) with six hours and sends the query again. This often "
161						"successfully returns older, un-fetched posts. If it didn't find new data after 96 retries (24 "
162						"days), it checks for data up to six years before the last date, decreasing 12 times by 6 months. "
163						"If that also results in nothing, it assumes the dataset is complete. Check the oldest post in "
164						"your dataset to see if it this is indeed the case and whether any odd time gaps exists."
165			}
166		options["daterange"] = {
167				"type": UserInput.OPTION_DATERANGE,
168				"help": "Date range"
169			}
170
171		return options

Check is Tumbler keys configured and if not, requests from User

def get_items(self, query):
173	def get_items(self, query):
174		"""
175		Fetches data from Tumblr via its API.
176
177		"""
178
179		# ready our parameters
180		parameters = self.dataset.get_parameters()
181		scope = parameters.get("search_scope", "")
182		queries = parameters.get("query").split(", ")
183		fetch_reblogs = parameters.get("fetch_reblogs", False)
184
185		# Store all info here
186		results = []
187
188		# Store all notes from posts by blogs here
189		all_notes = []
190
191		# Get date parameters
192		min_date = parameters.get("min_date", None)
193		max_date = parameters.get("max_date", None)
194
195		if min_date:
196			min_date = int(min_date)
197		if max_date:
198			max_date = int(max_date)
199		else:
200			max_date = int(time.time())
201
202		# Connect to Tumblr API
203		try:
204			self.client = self.connect_to_tumblr()
205		except ConfigException as e:
206			self.log.warning(f"Could not connect to Tumblr API: API keys invalid or not set")
207			self.dataset.finish_with_error(f"Could not connect to Tumblr API: API keys invalid or not set")
208			return
209		except ConnectionRefusedError as e:
210			client_info = self.client.info()
211			self.log.warning(f"Could not connect to Tumblr API: {e}; client_info: {client_info}")
212			self.dataset.finish_with_error(f"Could not connect to Tumblr API: {client_info.get('meta', {}).get('status', '')} - {client_info.get('meta', {}).get('msg', '')}")
213			return
214
215		# for each tag or blog, get post
216		for query in queries:
217
218				# Get posts per tag
219				if scope == "tag":
220					new_results = self.get_posts_by_tag(query, max_date=max_date, min_date=min_date)
221
222				# Get posts per blog
223				elif scope == "blog":
224					new_results, notes = self.get_posts_by_blog(query, max_date=max_date, min_date=min_date)
225					all_notes.append(notes)
226				else:
227					self.dataset.update_status("Invalid scope")
228					break
229
230				results += new_results
231
232				if self.max_posts_reached:
233					self.dataset.update_status("Max posts exceeded")
234					break
235				if self.api_limit_reached:
236					self.dataset.update_status("API limit reached")
237					break
238
239		# If we also want the posts that reblogged the fetched posts:
240		if fetch_reblogs and not self.max_posts_reached and not self.api_limit_reached:
241			self.dataset.update_status("Getting notes from all posts")
242
243			# Reblog information is already returned for blog-level searches
244			if scope == "blog":
245				text_reblogs = []
246
247				# Loop through and add the text reblogs that came with the results.
248				for post_notes in all_notes:
249					for post_note in post_notes:
250						for note in post_note:
251							if note["type"] == "reblog":
252								text_reblogs.append({note["blog_name"]: note["post_id"]})
253
254			# Retrieving notes for tag-based posts should be done one-by-one.
255			# Fetching them all at once is not supported by the Tumblr API.
256			elif scope == "tag":
257				# Prepare dicts to pass to `get_post_notes`
258				posts_to_fetch = {result["author"]: result["id"] for result in results}
259
260				# First extract the notes of each post, and only keep text reblogs
261				text_reblogs = self.get_post_notes(posts_to_fetch)
262
263			# Get the full data for text reblogs.
264			if text_reblogs:
265				connection_retries = 0
266				for i, text_reblog in enumerate(text_reblogs):
267					self.dataset.update_status("Got %i/%i text reblogs" % (i, len(text_reblogs)))
268					if connection_retries >= 5:
269						self.dataset.update_status("Multiple connection refused errors; unable to continue collection of reblogs.")
270						break
271					for key, value in text_reblog.items():
272						if connection_retries >= 5:
273							break
274						try:
275							reblog_post = self.get_post_by_id(key, value)
276						except ConnectionRefusedError:
277							connection_retries += 1
278							self.failed_reblogs.append(key)
279							self.dataset.update_status(f"ConnectionRefused: Unable to collect reblogs for post {key}")
280							continue
281						if reblog_post:
282							reblog_post = self.parse_tumblr_posts([reblog_post], reblog=True)
283							results.append(reblog_post[0])
284
285		self.job.finish()
286		return results

Fetches data from Tumblr via its API.

def get_posts_by_tag(self, tag, max_date=None, min_date=None):
288	def get_posts_by_tag(self, tag, max_date=None, min_date=None):
289		"""
290		Get Tumblr posts posts with a certain tag
291		:param tag, str: the tag you want to look for
292		:param min_date: a unix timestamp, indicates posts should be min_date this date.
293		:param max_date: a unix timestamp, indicates posts should be max_date this date.
294
295		:returns: a dict created from the JSON response
296		"""
297		# Store all posts in here
298		all_posts = []
299
300		# Some retries to make sure the Tumblr API actually returns everything.
301		retries = 0
302		date_retries = 0
303
304		# We're gonna change max_date, so store a copy for reference.
305		max_date_original = max_date
306
307		# We use the average time difference between posts to spot possible gaps in the data.
308		all_time_difs = []
309		avg_time_dif = 0
310		time_difs_len = 0
311
312		# Get Tumblr posts until there's no more left.
313		while True:
314			if self.interrupted:
315				raise ProcessorInterruptedException("Interrupted while fetching tag posts from Tumblr")
316
317			# Stop after max for date reductions
318			if date_retries >= self.max_date_retries:
319				self.dataset.update_status("No more posts in this date range")
320				break
321
322			# Stop after max retries for API/connection stuff
323			if retries >= self.max_retries:
324				self.dataset.update_status("No more posts")
325				break
326
327			try:
328				# Use the pytumblr library to make the API call
329				posts = self.client.tagged(tag, before=max_date, limit=20, filter="raw")
330			except ConnectionError:
331				self.update_status("Encountered a connection error, waiting 10 seconds.")
332				time.sleep(10)
333				retries += 1
334				continue
335
336			# Get rid of posts that we already enountered,
337			# preventing Tumblr API shenanigans or double posts because of
338			# time reductions. Make sure it's no odd error string, though.
339			unseen_posts = []
340			for check_post in posts:
341				# Sometimes the API repsonds just with "meta", "response", or "errors".
342				if isinstance(check_post, str):
343					self.dataset.update_status("Couldn't add post:", check_post)
344					retries += 1
345					break
346				else:
347					retries = 0
348					if check_post["id"] not in self.seen_ids:
349						unseen_posts.append(check_post)
350			posts = unseen_posts
351
352			# For no clear reason, the Tumblr API sometimes provides posts with a higher timestamp than requested.
353			# So we have to prevent this manually.
354			if max_date_original:
355				posts = [post for post in posts if post["timestamp"] <= max_date_original]
356
357			max_date_str = datetime.fromtimestamp(max_date).strftime("%Y-%m-%d %H:%M:%S")
358
359			# except Exception as e:
360			# 	print(e)
361			# 	self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date))
362			# 	self.api_limit_reached = True
363			# 	break
364
365			# Make sure the Tumblr API doesn't magically stop at an earlier date
366			if not posts:
367
368				date_retries += 1
369
370				# We're first gonna check carefully if there's small timegaps by
371				# decreasing by six hours.
372				# If that didn't result in any new posts, also dedicate 12 date_retries
373				# with reductions of six months, just to be sure there's no data from
374				# years earlier missing.
375
376				if date_retries < 96:
377					max_date -= 21600 # Decrease by six hours
378					self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - decreasing time search with 6 hours to %s to make sure this is really it (retry %s/96)" % (str(len(all_posts)), tag, max_date_str, str(date_retries),))
379				elif date_retries <= self.max_date_retries:
380					max_date -= 604800 # Decrease by one week
381					retry_str = str(date_retries - 96)
382					self.dataset.update_status("Collected %s posts for tag %s, but no new posts returned - no new posts found with decreasing by 6 hours, decreasing with a week to %s instead (retry %s/150)" % (str(len(all_posts)), tag, max_date_str, str(retry_str),))
383
384				# We can stop when the max date drops below the min date.
385				if min_date:
386					if max_date <= min_date:
387						break
388
389				continue
390
391			# Append posts to main list
392			else:
393
394				posts = self.parse_tumblr_posts(posts)
395
396				# Get all timestamps and sort them.
397				post_dates = sorted([post["timestamp"] for post in posts])
398
399				# Get the lowest date and use it as the next "before" parameter.
400				max_date = post_dates[0]
401
402				# Tumblr's API is volatile - it doesn't neatly sort posts by date,
403				# so it can happen that there's suddenly huge jumps in time.
404				# Check if this is happening by extracting the difference between all consecutive dates.
405				time_difs = list()
406				post_dates.reverse()
407
408				for i, date in enumerate(post_dates):
409
410					if i == (len(post_dates) - 1):
411						break
412
413					# Calculate and add time differences
414					time_dif = date - post_dates[i + 1]
415
416					# After having collected 250 posts, check whether the time
417					# difference between posts far exceeds the average time difference
418					# between posts. If it's more than five times this amount,
419					# restart the query with the timestamp just before the gap, minus the
420					# average time difference up to this point - something might be up with Tumblr's API.
421					if len(all_posts) >= 250 and time_dif > (avg_time_dif * 5):
422
423						time_str = datetime.fromtimestamp(date).strftime("%Y-%m-%d %H:%M:%S")
424						self.dataset.update_status("Time difference of %s spotted, restarting query at %s" % (str(time_dif), time_str,))
425
426						self.seen_ids.update([post["id"] for post in posts])
427						posts = [post for post in posts if post["timestamp"] >= date]
428						if posts:
429							all_posts += posts
430
431						max_date = date
432						break
433
434					time_difs.append(time_dif)
435
436				# To start a new query
437				if not posts:
438					break
439
440				# Manually check if we have a lower date than the lowest allowed date already (min date).
441				# This functonality is not natively supported by Tumblr.
442				if min_date:
443					if max_date < min_date:
444
445						# Get rid of all the posts that are earlier than the max_date timestamp
446						posts = [post for post in posts if post["timestamp"] >= min_date and post["timestamp"] <= max_date_original]
447
448						if posts:
449							all_posts += posts
450							self.seen_ids.update([post["id"] for post in posts])
451						break
452
453				# We got a new post, so we can reset the retry counts.
454				date_retries = 0
455				retries = 0
456
457				# Add retrieved posts top the main list
458				all_posts += posts
459
460				# Add to seen ids
461				self.seen_ids.update([post["id"] for post in posts])
462
463				# Add time differences and calculate new average time difference
464				all_time_difs += time_difs
465
466				# Make the average time difference a moving average,
467				# to be flexible with faster and slower post paces.
468				# Delete the first 100 posts every hundred or so items.
469				if (len(all_time_difs) - time_difs_len) > 100:
470					all_time_difs = all_time_difs[time_difs_len:]
471				if all_time_difs:
472					time_difs_len = len(all_time_difs)
473					avg_time_dif = sum(all_time_difs) / len(all_time_difs)
474
475			if len(all_posts) >= self.max_posts:
476				self.max_posts_reached = True
477				break
478
479			self.dataset.update_status("Collected %s posts for tag %s, now looking for posts before %s" % (str(len(all_posts)), tag, max_date_str,))
480
481		return all_posts

Get Tumblr posts posts with a certain tag

Parameters
  • tag, str: the tag you want to look for
  • min_date: a unix timestamp, indicates posts should be min_date this date.
  • max_date: a unix timestamp, indicates posts should be max_date this date.

:returns: a dict created from the JSON response

def get_posts_by_blog(self, blog, max_date=None, min_date=None):
483	def get_posts_by_blog(self, blog, max_date=None, min_date=None):
484		"""
485		Get Tumblr posts posts with a certain blog
486		:param tag, str: the name of the blog you want to look for
487		:param min_date: a unix timestamp, indicates posts should be min_date this date.
488	    :param max_date: a unix timestamp, indicates posts should be max_date this date.
489
490	    :returns: a dict created from the JSON response
491		"""
492		blog = blog + ".tumblr.com"
493
494		if not max_date:
495			max_date = int(time.time())
496
497		# Store all posts in here
498		all_posts = []
499
500		# Store notes here, if they exist and are requested
501		all_notes = []
502
503		# Some retries to make sure the Tumblr API actually returns everything
504		retries = 0
505		self.max_retries = 48 # 2 days
506
507		# Get Tumblr posts until there's no more left.
508		while True:
509			if self.interrupted:
510				raise ProcessorInterruptedException("Interrupted while fetching blog posts from Tumblr")
511
512			# Stop min_date 20 retries
513			if retries >= self.max_retries:
514				self.dataset.update_status("No more posts")
515				break
516
517			try:
518				# Use the pytumblr library to make the API call
519				posts = self.client.posts(blog, before=max_date, limit=20, reblog_info=True, notes_info=True, filter="raw")
520				posts = posts["posts"]
521
522				#if (max_date - posts[0]["timestamp"]) > 500000:
523					#self.dataset.update_status("ALERT - DATES LIKELY SKIPPED")
524					#self.dataset.update_status([post["timestamp"] for post in posts])
525
526			except Exception as e:
527
528				self.dataset.update_status("Reached the limit of the Tumblr API. Last timestamp: %s" % str(max_date))
529				self.api_limit_reached = True
530				break
531
532			# Make sure the Tumblr API doesn't magically stop at an earlier date
533			if not posts or isinstance(posts, str):
534				retries += 1
535				max_date -= 3600 # Decrease by an hour
536				self.dataset.update_status("No posts returned by Tumblr - checking whether this is really all (retry %s/48)" % str(retries))
537				continue
538
539			# Append posts to main list
540			else:
541				# Keep the notes, if so indicated
542				if self.parameters.get("fetch_reblogs"):
543					for post in posts:
544						if "notes" in post:
545							all_notes.append(post["notes"])
546
547				posts = self.parse_tumblr_posts(posts)
548
549				# Get the lowest date
550				max_date = sorted([post["timestamp"] for post in posts])[0]
551
552				# Manually check if we have a lower date than the min date (`min_date`) already.
553				# This functonality is not natively supported by Tumblr.
554				if min_date:
555					if max_date < min_date:
556
557						# Get rid of all the posts that are earlier than the max_date timestamp
558						posts = [post for post in posts if post["timestamp"] >= min_date]
559
560						if posts:
561							all_posts += posts
562						break
563
564				retries = 0
565
566				all_posts += posts
567
568				#if (max_date - posts[len(posts) - 1]["timestamp"]) > 500000:
569					#self.dataset.update_status("ALERT - DATES LIKELY SKIPPED")
570					#self.dataset.update_status([post["timestamp"] for post in posts])
571
572			if len(all_posts) >= self.max_posts:
573				self.max_posts_reached = True
574				break
575
576			self.dataset.update_status("Collected %s posts" % str(len(all_posts)))
577
578		return all_posts, all_notes

Get Tumblr posts posts with a certain blog :param tag, str: the name of the blog you want to look for :param min_date: a unix timestamp, indicates posts should be min_date this date.

Parameters
  • max_date: a unix timestamp, indicates posts should be max_date this date.

:returns: a dict created from the JSON response

def get_post_notes(self, di_blogs_ids, only_text_reblogs=True):
580	def get_post_notes(self, di_blogs_ids, only_text_reblogs=True):
581		"""
582		Gets the post notes.
583		:param di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values.
584		:param only_text_reblogs, bool: Whether to only keep notes that are text reblogs.
585		"""
586		# List of dict to get reblogs. Items are: [{"blog_name": post_id}]
587		text_reblogs = []
588
589		max_date = None
590
591		# Do some counting
592		len_blogs = len(di_blogs_ids)
593		count = 0
594
595		# Stop trying to fetch the notes after this many retries
596		max_notes_retries = 10
597		notes_retries = 0
598
599		for key, value in di_blogs_ids.items():
600
601			count += 1
602
603			if self.interrupted:
604				raise ProcessorInterruptedException("Interrupted while fetching post notes from Tumblr")
605
606			# First, get the blog names and post_ids from reblogs
607			# Keep digging till there's nothing left, or if we can fetch no new notes
608			while True:
609
610				# Requests a post's notes
611				notes = self.client.notes(key, id=value, before_timestamp=max_date)
612
613				if only_text_reblogs:
614
615					if "notes" in notes:
616						notes_retries = 0
617
618						for note in notes["notes"]:
619							# If it's a reblog, extract the data and save the rest of the posts for later
620							if note["type"] == "reblog":
621								if note.get("added_text"):
622									text_reblogs.append({note["blog_name"]: note["post_id"]})
623
624						if notes.get("_links"):
625							max_date = notes["_links"]["next"]["query_params"]["before_timestamp"]
626
627						# If there's no `_links` key, that's all.
628						else:
629							break
630
631					# If there's no "notes" key in the returned dict, something might be up
632					else:
633						self.dataset.update_status("Couldn't get notes for Tumblr request " + str(notes))
634						notes_retries += 1
635						pass
636
637					if notes_retries > max_notes_retries:
638						self.failed_notes.append(key)
639						break
640
641			self.dataset.update_status("Identified %i text reblogs in %i/%i notes" % (len(text_reblogs), count, len_blogs))
642
643		return text_reblogs

Gets the post notes.

Parameters
  • di_blogs_ids, dict: A dictionary with blog names as keys and post IDs as values.
  • only_text_reblogs, bool: Whether to only keep notes that are text reblogs.
def get_post_by_id(self, blog_name, post_id):
645	def get_post_by_id(self, blog_name, post_id):
646		"""
647		Fetch individual posts
648		:param blog_name, str: The blog's name
649		:param id, int: The post ID
650
651		returns result list, a list with a dictionary with the post's information
652		"""
653		if self.interrupted:
654			raise ProcessorInterruptedException("Interrupted while fetching post from Tumblr")
655
656		# Request the specific post.
657		post = self.client.posts(blog_name, id=post_id)
658
659		# Tumblr API can sometimes return with this kind of error:
660		# {'meta': {'status': 500, 'msg': 'Server Error'}, 'response': {'error': 'Malformed JSON or HTML was returned.'}}
661		if "posts" not in post:
662			return None
663
664		# Get the first element of the list - it's always one post.
665		result = post["posts"][0]
666
667		return result

Fetch individual posts

Parameters
  • blog_name, str: The blog's name
  • id, int: The post ID

returns result list, a list with a dictionary with the post's information

@staticmethod
def get_tumbler_keys(user):
669	@staticmethod
670	def get_tumbler_keys(user):
671		config_keys = [
672			config.get("api.tumblr.consumer_key", user=user),
673			config.get("api.tumblr.consumer_secret", user=user),
674			config.get("api.tumblr.key", user=user),
675			config.get("api.tumblr.secret_key", user=user)]
676		if not all(config_keys):
677			raise ConfigException("Not all Tumblr API credentials are configured. Cannot query Tumblr API.")
678		return config_keys
def connect_to_tumblr(self):
680	def connect_to_tumblr(self):
681		"""
682		Returns a connection to the Tumblr API using the pytumblr library.
683
684		"""
685		# User input keys
686		config_keys = [self.parameters.get("consumer_key"),
687			self.parameters.get("consumer_secret"),
688			self.parameters.get("key"),
689			self.parameters.get("secret_key")]
690		if not all(config_keys):
691			# No user input keys; attempt to use 4CAT config keys
692			config_keys = self.get_tumbler_keys(self.owner)
693
694		self.client = pytumblr.TumblrRestClient(*config_keys)
695
696		client_info = self.client.info()
697
698		# Check if there's any errors
699		if client_info.get("meta"):
700			if client_info["meta"].get("status") == 429:
701				raise ConnectionRefusedError("Tumblr API timed out")
702
703		return self.client

Returns a connection to the Tumblr API using the pytumblr library.

def validate_query(query, request, user):
705	def validate_query(query, request, user):
706		"""
707		Validate custom data input
708
709		Confirms that the uploaded file is a valid CSV file and, if so, returns
710		some metadata.
711
712		:param dict query:  Query parameters, from client-side.
713		:param request:  	Flask request
714		:param User user:  	User object of user who has submitted the query
715		:return dict:  		Safe query parameters
716		"""
717		# no query 4 u
718		if not query.get("query", "").strip():
719			raise QueryParametersException("You must provide a search query.")
720
721		# reformat queries to be a comma-separated list
722		items = query.get("query").replace("#","")
723		items = items.split("\n")
724
725		# Not more than 10 plox
726		if len(items) > 10:
727			raise QueryParametersException("Only query for ten or less tags or blogs." + str(len(items)))
728
729		# no query 4 u
730		if not items:
731			raise QueryParametersException("Search query cannot be empty.")
732
733		# So it shows nicely in the frontend.
734		items = ", ".join([item.lstrip().rstrip() for item in items if item])
735
736		# the dates need to make sense as a range to search within
737		query["min_date"], query["max_date"] = query.get("daterange")
738		if any(query.get("daterange")) and not all(query.get("daterange")):
739			raise QueryParametersException("When providing a date range, set both an upper and lower limit.")
740
741		del query["daterange"]
742
743		query["query"] = items
744		query["board"] = query.get("search_scope") + "s"  # used in web interface
745
746		# if we made it this far, the query can be executed
747		return query

Validate custom data input

Confirms that the uploaded file is a valid CSV file and, if so, returns some metadata.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns
       Safe query parameters
def parse_tumblr_posts(self, posts, reblog=False):
749	def parse_tumblr_posts(self, posts, reblog=False):
750		"""
751		Function to parse Tumblr posts into the same dict items.
752		Tumblr posts can be many different types, so some data processing is necessary.
753
754		:param posts, list: List of Tumblr posts as returned form the Tumblr API.
755		:param reblog, bool: Whether the post concerns a reblog of posts from the original dataset.
756
757		returns list processed_posts, a list with dictionary items of post info.
758		"""
759
760		# Store processed posts here
761		processed_posts = []
762
763		media_tags = ["photo", "video", "audio"]
764
765		# Loop through all the posts and write a row for each of them.
766		for post in posts:
767			post_type = post["type"]
768
769			# The post's text is in different keys depending on the post type
770			if post_type in media_tags:
771				text = post["caption"]
772			elif post_type == "link":
773				text = post["description"]
774			elif post_type == "text" or post_type == "chat":
775				text = post["body"]
776			elif post_type == "answer":
777				text = post["question"] + "\n" + post["answer"]
778			else:
779				text = ""
780
781			# Different options for video types (YouTube- or Tumblr-hosted)
782			if post_type == "video":
783
784				video_source = post["video_type"]
785				# Use `get` since some videos are deleted
786				video_url = post.get("permalink_url")
787
788				if video_source == "youtube":
789					# There's no URL if the YouTube video is deleted
790					if video_url:
791						video_id = post["video"]["youtube"]["video_id"]
792					else:
793						video_id = "deleted"
794				else:
795					video_id = "unknown"
796
797			else:
798				video_source = None
799				video_id = None
800				video_url = None
801
802			# All the fields to write
803			processed_post = {
804				# General columns
805				"type": post_type,
806				"timestamp": post["timestamp"],
807				"is_reblog": reblog,
808
809				# Blog columns
810				"author": post["blog_name"],
811				"subject": post["blog"]["title"],
812				"blog_description": post["blog"]["description"],
813				"blog_url": post["blog"]["url"],
814				"blog_uuid": post["blog"]["uuid"],
815				"blog_last_updated": post["blog"]["updated"],
816
817				# Post columns
818				"id": post["id"],
819				"post_url": post["post_url"],
820				"post_slug": post["slug"],
821				"thread_id": post["reblog_key"],
822				"body": text.replace("\x00", ""),
823				"tags": ", ".join(post["tags"]) if post.get("tags") else None,
824				"notes": post["note_count"],
825				"urls": post.get("link_url"),
826				"images": ",".join([photo["original_size"]["url"] for photo in post["photos"]]) if post.get("photos") else None,
827
828				# Optional video columns
829				"video_source": video_source if post_type == "video" else None,
830				"video_url": video_url if post_type == "video" else None,
831				"video_id": video_id if post_type == "video" else None,
832				"video_thumb": post.get("thumbnail_url"), # Can be deleted
833
834				# Optional audio columns
835				"audio_type": post.get("audio_type"),
836				"audio_url": post.get("audio_source_url"),
837				"audio_plays": post.get("plays"),
838
839				# Optional link columns
840				"link_author": post.get("link_author"),
841				"link_publisher": post.get("publisher"),
842				"link_image": post.get("link_image"),
843
844				# Optional answers columns
845				"asking_name": post.get("asking_name"),
846				"asking_url": post.get("asking_url"),
847				"question": post.get("question"),
848				"answer": post.get("answer"),
849
850				# Optional chat columns
851				"chat": post.get("dialogue")
852			}
853
854			# Store the processed post
855			processed_posts.append(processed_post)
856
857		return processed_posts

Function to parse Tumblr posts into the same dict items. Tumblr posts can be many different types, so some data processing is necessary.

Parameters
  • posts, list: List of Tumblr posts as returned form the Tumblr API.
  • reblog, bool: Whether the post concerns a reblog of posts from the original dataset.

returns list processed_posts, a list with dictionary items of post info.

def after_process(self):
859	def after_process(self):
860		"""
861		Override of the same function in processor.py
862		Used to notify of potential API errors.
863
864		"""
865		super().after_process()
866		self.client = None
867		errors = []
868		if len(self.failed_notes) > 0:
869			errors.append("API error(s) when fetching notes %s" % ", ".join(self.failed_notes))
870		if len(self.failed_reblogs) > 0:
871			errors.append("API error(s) when fetching reblogs %s" % ", ".join(self.failed_reblogs))
872		if errors:
873			self.dataset.log(";\n ".join(errors))
874			self.dataset.update_status(f"Dataset completed but failed to capture some notes/reblogs; see log for details.")

Override of the same function in processor.py Used to notify of potential API errors.