Edit on GitHub

datasources.reddit.search_reddit

  1import requests
  2import json
  3import time
  4import re
  5
  6from backend.lib.search import Search
  7from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, QueryNeedsExplicitConfirmationException
  8from common.lib.helpers import UserInput, timify_long
  9
 10from common.config_manager import config
 11
 12
 13class SearchReddit(Search):
 14	"""
 15	Search Reddit
 16
 17	Defines methods to fetch Reddit data on demand
 18	"""
 19	type = "reddit-search"  # job ID
 20	category = "Search"  # category
 21	title = "Reddit Search"  # title displayed in UI
 22	description = "Query the Pushshift API to retrieve Reddit posts and threads matching the search parameters"  # description displayed in UI
 23	extension = "csv"  # extension of result file, used internally and in UI
 24	is_local = False  # Whether this datasource is locally scraped
 25	is_static = False  # Whether this datasource is still updated
 26
 27	references = [
 28		"[API documentation](https://github.com/pushshift/api)",
 29		"[r/pushshift](https://www.reddit.com/r/pushshift/)",
 30		"[Baumgartner, J., Zannettou, S., Keegan, B., Squire, M., & Blackburn, J. (2020). The Pushshift Reddit Dataset. *Proceedings of the International AAAI Conference on Web and Social Media*, 14(1), 830-839.](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)"
 31	]
 32
 33	# not available as a processor for existing datasets
 34	accepts = [None]
 35
 36	max_workers = 1
 37	max_retries = 5
 38
 39	rate_limit = 0
 40	request_timestamps = []
 41
 42	config = {
 43		"reddit-search.can_query_without_keyword": {
 44			"type": UserInput.OPTION_TOGGLE,
 45			"help": "Can query without keyword",
 46			"default": False,
 47			"tooltip": "Allows users to query Pushshift without specifying a keyword. This can lead to HUGE datasets!"
 48		}
 49	}
 50
 51	# These change depending on the API type used,
 52	# but should be globally accessible.
 53	submission_endpoint = None
 54	comment_endpoint = None
 55	api_type = None
 56	since = "since"
 57	after = "after"
 58
 59	@classmethod
 60	def get_options(cls, parent_dataset=None, user=None):
 61		"""
 62		Determine if user needs to see the 'careful with wildcard queries!'
 63		warning
 64
 65		:param parent_dataset:
 66		:param user:
 67		:return dict:  Options definition
 68		"""
 69		options = {
 70			"wildcard-warning": {
 71				"type": UserInput.OPTION_INFO,
 72				"help": "The requirement for searching by keyword has been lifted for your account; you can search by "
 73						"date range only. This can potentially return hundreds of millions of posts, so **please be "
 74						"careful** when using this privilege."
 75			},
 76			"pushshift_track": {
 77				"type": UserInput.OPTION_CHOICE,
 78				"help": "API version",
 79				"options": {
 80					"beta": "Beta (new version)",
 81					"regular": "Regular"
 82				},
 83				"default": "beta",
 84				"tooltip": "The beta version retrieves more comments per request but may be incomplete."
 85			},
 86			"board": {
 87				"type": UserInput.OPTION_TEXT,
 88				"help": "Subreddit(s)",
 89				"tooltip": "Comma-separated"
 90			},
 91			"divider": {
 92				"type": UserInput.OPTION_DIVIDER
 93			},
 94			"intro": {
 95				"type": UserInput.OPTION_INFO,
 96				"help": "Reddit data is retrieved from [Pushshift](https://pushshift.io) (see also [this "
 97						"paper](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)). Note that Pushshift's dataset "
 98						"*may not be complete* depending on the parameters used,"
 99						" data from the last few days might not be there yet,"
100						" and post scores can be out of date. "
101						"See [this paper](https://arxiv.org/pdf/1803.05046.pdf) for an overview of the gaps in data. "
102						"Double-check manually or via the official Reddit API if completeness is a concern. Check the "
103						"documentation ([beta](https://beta.pushshift.io/redoc), [regular](https://github.com/pushshift/api)) for "
104						"more information (e.g. query syntax)."
105			},
106			"body_match": {
107				"type": UserInput.OPTION_TEXT,
108				"help": "Message search",
109				"tooltip": "Matches anything in the body of a comment or post."
110			},
111			"subject_match": {
112				"type": UserInput.OPTION_TEXT,
113				"help": "Subject search",
114				"tooltip": "Matches anything in the title of a post."
115			},
116			"subject_url": {
117				"type": UserInput.OPTION_TEXT,
118				"help": "URL/domain in post",
119				"tooltip": "Regular API only; Filter for posts that link to certain sites or domains (e.g. only posts linking to reddit.com)",
120			},
121			"divider-2": {
122				"type": UserInput.OPTION_DIVIDER
123			},
124			"daterange": {
125				"type": UserInput.OPTION_DATERANGE,
126				"help": "Date range"
127			},
128			"search_scope": {
129				"type": UserInput.OPTION_CHOICE,
130				"help": "Search scope",
131				"options": {
132					"op-only": "Opening posts only (no replies/comments)",
133					"posts-only": "All matching posts",
134				},
135				"default": "posts-only"
136			}
137		}
138
139		# this warning isn't needed if the user can't search for everything
140		# anyway
141		if not config.get("reddit-search.can_query_without_keyword"):
142			del options["wildcard-warning"]
143
144		return options
145
146	@staticmethod
147	def build_query(query):
148		"""
149		Determine API call parameters
150
151		Decides what endpoints to call and with which parameters based on the
152		parameters provided by the user. There is some complexity here because
153		we support two versions of the API, each with their own protocol.
154
155		:param dict query:  Query parameters, as part of the DataSet object
156		:return tuple:  Tuple of tuples. First tuple is (submissions endpoint,
157		submission parameters), the second the same but for replies.
158		"""
159		api_type = query.get("pushshift_track", "beta")
160
161		# first, build the request parameters
162		if api_type == "regular":
163			submission_endpoint = "https://api.pushshift.io/reddit/submission/search"
164			post_endpoint = "https://api.pushshift.io/reddit/comment/search"
165
166			post_parameters = {
167				"order": "asc",
168				"sort_type": "created_utc",
169				"size": 100,  # max value
170				"metadata": True
171			}
172			since = "after"
173			until = "before"
174
175		# beta fields are a bit different.
176		elif api_type == "beta":
177			submission_endpoint = "https://beta.pushshift.io/reddit/search/submissions"
178			post_endpoint = "https://beta.pushshift.io/reddit/search/comments"
179
180			# For beta requests, we're sorting by IDs so we're not missing data.
181			# This is unavailable for the regular API.
182			post_parameters = {
183				"sort_type": "created_utc",
184				"order": "asc",
185				"limit": 1000  # max value
186			}
187			since = "since"
188			until = "until"
189
190		else:
191			raise NotImplementedError()
192
193		if query["min_date"]:
194			post_parameters[since] = int(query["min_date"])
195
196		if query["max_date"]:
197			post_parameters[until] = int(query["max_date"])
198
199		if query["board"] and query["board"] != "*":
200			post_parameters["subreddit"] = query["board"]
201
202		if query["body_match"]:
203			post_parameters["q"] = query["body_match"]
204		else:
205			post_parameters["q"] = ""
206
207		# first, search for threads - this is a separate endpoint from comments
208		submission_parameters = post_parameters.copy()
209		submission_parameters["selftext"] = submission_parameters["q"]
210
211		if query["subject_match"]:
212			submission_parameters["title"] = query["subject_match"]
213
214		# Check whether only OPs linking to certain URLs should be retrieved.
215		# Only available for the regular API.
216		if query.get("subject_url", None):
217			urls = []
218			domains = []
219
220			if "," in query["subject_url"]:
221				urls_input = query["subject_url"].split(",")
222			elif "|" in query["subject_url"]:
223				urls_input = query["subject_url"].split("|")
224			else:
225				urls_input = [query["subject_url"]]
226
227			# Input strings
228			for url in urls_input:
229				# Some cleaning
230				url = url.strip()
231				url_clean = url.replace("www.", "")
232
233				# Store urls or domains separately; different fields in Pushshift API
234				if "/" in url_clean:
235					urls.append(url)
236				else:
237					domains.append(url_clean)
238			if urls:
239				# Multiple full URLs is supposedly not supported by Pushshift
240				submission_parameters["url"] = "\'" + (",".join(urls)) + "\'"
241			if domains:
242				submission_parameters["domain"] = ",".join(domains)
243
244		return (
245			(submission_endpoint, submission_parameters),
246			(post_endpoint, post_parameters),
247		)
248
249	def get_items(self, query):
250		"""
251		Execute a query; get post data for given parameters
252
253		This queries the Pushshift API to find posts and threads mathcing the
254		given parameters.
255
256		:param dict query:  Query parameters, as part of the DataSet object
257		:return list:  Posts, sorted by thread and post ID, in ascending order
258		"""
259		scope = query.get("search_scope")
260		submission_call, post_call = self.build_query(query)
261
262		# set up query
263		total_posts = 0
264		max_retries = 3
265
266		# rate limits are not returned by the API server anymore,
267		# so we're manually setting it to 120
268		self.rate_limit = 120
269
270		# this is where we store our progress
271		total_threads = 0
272		seen_threads = set()
273		expected_results = query.get("expected-results", 0)
274
275		# loop through results bit by bit
276		while True:
277			if self.interrupted:
278				raise ProcessorInterruptedException("Interrupted while fetching thread data from the Pushshift API")
279
280			retries = 0
281			response = self.call_pushshift_api(*submission_call)
282
283			if response is None:
284				return response
285
286			threads = response.json()["data"]
287
288			if len([t for t in threads if t["id"] not in seen_threads]) == 0:
289				# we're done here, no more results will be coming
290				break
291
292			# store comment IDs for a thread, and also add the OP to the
293			# return list. This means all OPs will come before all comments
294			# but we can sort later if that turns out to be a problem
295			for thread in threads:
296				if thread.get("promoted", False):
297					continue
298
299				if thread["id"] not in seen_threads:
300					seen_threads.add(thread["id"])
301					yield self.thread_to_4cat(thread)
302					
303					# Increase the time.
304					# this is the only way to go to the next page right now...
305					submission_call[1]["after"] = thread["created_utc"]
306					
307					total_threads += 1
308
309			# update status
310			if expected_results:
311				self.dataset.update_progress(total_threads / expected_results)
312			self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_threads), "{:,}".format(expected_results) if expected_results else "unknown"))
313
314		# okay, search the pushshift API for posts
315		# we have two modes here: by keyword, or by ID. ID is set above where
316		# ID chunks are defined: these chunks are used here if available
317		seen_posts = set()
318
319		# only query for individual posts if no subject keyword is given
320		# since individual posts don't have subjects so if there is a subject
321		# query no results should be returned
322		do_body_query = not bool(query.get("subject_match", "")) and not bool(
323			query.get("subject_url", "")) and scope != "op-only"
324
325		while do_body_query:
326			if self.interrupted:
327				raise ProcessorInterruptedException("Interrupted while fetching post data from the Pushshift API")
328
329			response = self.call_pushshift_api(*post_call)
330
331			if response is None:
332				return response
333
334			if retries >= max_retries:
335				self.log.error("Error during pushshift fetch of query %s" % self.dataset.key)
336				self.dataset.update_status("Error while searching for posts on Pushshift")
337				return None
338
339			# no more posts
340			posts = response.json()["data"]
341
342			if len([p for p in posts if p["id"] not in seen_posts]) == 0:
343				# this could happen in some edge cases if we're searching by
344				# chunk (if no IDs in the chunk match the other parameters)
345				# so only break if that's not the case
346				break
347
348			# store post data
349			for post in posts:
350				if post.get("promoted", False):
351					continue
352
353				if post["id"] not in seen_posts:
354					seen_posts.add(post["id"])
355					yield self.post_to_4cat(post)
356					
357					# Increase the time.
358					# this is the only way to go to the next page right now...
359					post_call[1][self.since] = post["created_utc"]
360
361					total_posts += 1
362
363			# update our progress
364			# update status
365			if expected_results:
366				self.dataset.update_progress((total_threads + total_posts) / expected_results)
367			self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_posts + total_threads), "{:,}".format(expected_results) if expected_results else "unknown"))
368
369		# and done!
370		if total_posts == 0 and total_threads == 0:
371			self.dataset.update_status("No posts found")
372
373	@staticmethod
374	def post_to_4cat(post):
375		"""
376		Convert a pushshift post object to 4CAT post data
377
378		:param dict post:  Post data, as from the pushshift API
379		:return dict:  Re-formatted data
380		"""
381
382		return {
383			"thread_id": post["link_id"].split("_").pop(),
384			"id": post["id"],
385			"timestamp": post["created_utc"],
386			"body": post["body"].strip().replace("\r", ""),
387			"subject": "",
388			"author": post["author"],
389			"author_flair": post.get("author_flair_text", ""),
390			"post_flair": "",
391			"domain": "",
392			"url": "",
393			"image_file": "",
394			"image_md5": "",
395			"subreddit": post["subreddit"],
396			"parent": post["parent_id"],
397			# this is missing sometimes, but upon manual inspection
398			# the post always has 1 point
399			"score": post.get("score", 1)
400		}
401
402	@staticmethod
403	def thread_to_4cat(thread):
404		"""
405		Convert a pushshift thread object to 4CAT post data
406
407		:param dict post:  Post data, as from the pushshift API
408		:return dict:  Re-formatted data
409		"""
410		image_match = re.compile(r"\.(jpg|jpeg|png|gif|webm|mp4)$", flags=re.IGNORECASE)
411
412		return {
413			"thread_id": thread["id"],
414			"id": thread["id"],
415			"timestamp": thread["created_utc"],
416			"body": thread.get("selftext", "").strip().replace("\r", ""),
417			"subject": thread["title"],
418			"author": thread["author"],
419			"author_flair": thread.get("author_flair_text", ""),
420			"post_flair": thread.get("link_flair_text", ""),
421			"image_file": thread.get("url", "") if thread.get("url") and image_match.search(thread.get("url", "")) else "",
422			"domain": thread.get("domain", ""),
423			"url": thread.get("url", ""),
424			"image_md5": "",
425			"subreddit": thread["subreddit"],
426			"parent": "",
427			"score": thread.get("score", 0)
428		}
429
430	def call_pushshift_api(self, *args, **kwargs):
431		"""
432		Call pushshift API and don't crash (immediately) if it fails
433
434		Will also try to respect the rate limit, waiting before making a
435		request until it will not violate the rate limit.
436
437		:param args:
438		:param kwargs:
439		:return: Response, or `None`
440		"""
441
442		retries = 0
443		while retries < self.max_retries:
444			try:
445				self.wait_until_window()
446				response = requests.get(*args, **kwargs)
447				self.request_timestamps.append(time.time())
448				if response.status_code == 200:
449					break
450				else:
451					raise RuntimeError("HTTP %s" % response.status_code)
452			except (RuntimeError, requests.RequestException) as e:
453				self.log.info("Error %s while querying Pushshift API - waiting 15 seconds and retrying..." % e)
454				time.sleep(15)
455				retries += 1
456
457		if retries >= self.max_retries:
458			self.log.error("Error during Pushshift fetch of query %s" % self.dataset.key)
459			self.dataset.update_status("Error while searching for posts on Pushshift - API did not respond as expected")
460			return None
461
462		return response
463
464	@staticmethod
465	def get_expected_results(endpoint, parameters, api_type):
466		"""
467		Get expected result size for a query
468
469		We're not using call_pushshift_api here because that cannot be called
470		statically, which is necessary because this is called from within
471		validate_query.
472
473		:param str endpoint:  URL of the API endpoint
474		:param dict parameters:  Call parameters
475		:param api_type: Type of API (regular or beta)
476
477		:return:  Number of expected results, or `None`
478		"""
479		parameters.update({"metadata": "true", "size": 0,"track_total_hits": True})
480
481		retries = 0
482		response = None
483
484		while retries < 3:
485			try:
486				response = requests.get(endpoint, parameters, timeout=10)
487				break
488			except requests.RequestException:
489				retries += 1
490				time.sleep(retries * 5)
491				continue
492
493		if not response or response.status_code != 200:
494			return None
495		else:
496			try:
497				return response.json()["metadata"]["es"]["hits"]["total"]["value"]
498			except (json.JSONDecodeError, KeyError):
499				return None
500
501	def wait_until_window(self):
502		"""
503		Wait until a request can be made outside of the rate limit
504
505		If we have made more requests in the window (one minute) than allowed
506		by the rate limit, wait until that is no longer the case.
507		"""
508		window_start = time.time() - 60
509		has_warned = False
510
511		while len([timestamp for timestamp in self.request_timestamps if timestamp >= window_start]) >= self.rate_limit:
512			if not has_warned:
513				self.log.info("Hit Pushshift rate limit - throttling...")
514				has_warned = True
515
516			time.sleep(0.25)  # should be enough
517
518		# clean up timestamps outside of window
519		self.request_timestamps = [timestamp for timestamp in self.request_timestamps if timestamp >= window_start]
520
521	def validate_query(query, request, user):
522		"""
523		Validate input for a dataset query on the 4chan data source.
524
525		Will raise a QueryParametersException if invalid parameters are
526		encountered. Mutually exclusive parameters may also be sanitised by
527		ignoring either of the mutually exclusive options.
528
529		:param dict query:  Query parameters, from client-side.
530		:param request:  Flask request
531		:param User user:  User object of user who has submitted the query
532		:return dict:  Safe query parameters
533		"""
534		# we need a board!
535		r_prefix = re.compile(r"^/?r/")
536		boards = [r_prefix.sub("", board).strip() for board in query.get("board", "").split(",") if board.strip()]
537
538		if not boards:
539			raise QueryParametersException("Please provide a board or a comma-separated list of boards to query.")
540
541		# ignore leading r/ for boards
542		query["board"] = ",".join(boards)
543
544		keywordless_query = config.get("reddit-search.can_query_without_keyword", False, user=user)
545
546		# this is the bare minimum, else we can't narrow down the full data set
547		if not user.is_admin and not keywordless_query and not query.get(
548				"body_match", "").strip() and not query.get("subject_match", "").strip() and not query.get(
549			"subject_url", ""):
550			raise QueryParametersException("Please provide a body query or subject query.")
551
552		# body query and full threads are incompatible, returning too many posts
553		# in most cases
554		if query.get("body_match", None):
555			if "full_threads" in query:
556				del query["full_threads"]
557
558		# Make sure no body or subject searches starting with just a minus sign are possible, e.g. "-Trump"
559		if query.get("body_match", None) or query.get("subject_match", None):
560			queries_to_check = []
561
562			if query.get("body_match", None):
563				queries_to_check += [body_query.strip() for body_query in query["body_match"].split(" ")]
564
565			if query.get("subject_match", None):
566				queries_to_check += [subject_query.strip() for subject_query in query["subject_match"].split(" ")]
567
568			startswith_minus = [query_check.startswith("-") for query_check in queries_to_check]
569			if all(startswith_minus):
570				raise QueryParametersException("Please provide body queries that do not start with a minus sign.")
571
572		# URL queries are not possible (yet) for the beta API
573		if query.get("pushshift_track") == "beta" and query.get("subject_url", None):
574			raise QueryParametersException("URL querying is not possible (yet) for the beta endpoint.")
575
576		# both dates need to be set, or none
577		if query.get("min_date", None) and not query.get("max_date", None):
578			raise QueryParametersException("When setting a date range, please provide both an upper and lower limit.")
579
580		# the dates need to make sense as a range to search within
581		query["min_date"], query["max_date"] = query.get("daterange")
582
583		if "*" in query.get("body_match", "") and not keywordless_query:
584			raise QueryParametersException(
585				"Wildcard queries are not allowed as they typically return too many results to properly process.")
586
587		if "*" in query.get("board", "") and not keywordless_query:
588			raise QueryParametersException(
589				"Wildcards are not allowed for boards as this typically returns too many results to properly process.")
590
591		del query["daterange"]
592
593		params = SearchReddit.build_query(query)
594		expected_posts = SearchReddit.get_expected_results(*params[0], query.get("pushshift_track", "regular"))
595		if not expected_posts:
596			expected_posts = 0
597
598		# determine how many results to expect
599		# this adds a small delay since we need to talk to the API before
600		# returning to the user, but the benefit is that we reduce the amount
601		# of too-large queries (because users are warned beforehand) and can
602		# give a progress indication for queries that do go through
603		if query.get("search_scope") != "op-only":
604			expected_replies = SearchReddit.get_expected_results(*params[1], query.get("pushshift_track", "regular"))
605			expected_posts += expected_replies if expected_replies else 0
606
607		if expected_posts:
608			pps = 672 if query.get("pushshift_track") == "beta" else 44
609			expected_seconds = int(expected_posts / pps)  # seems to be about this
610			expected_time = timify_long(expected_seconds)
611			query["expected-results"] = expected_posts
612
613			if expected_seconds > 1800 and not query.get("frontend-confirm"):
614				raise QueryNeedsExplicitConfirmationException(
615					"This query will return approximately %s items. This will take a long time (approximately %s)."
616					" Are you sure you want to run this query?" % ("{:,}".format(expected_posts), expected_time))
617
618		# if we made it this far, the query can be executed
619		return query
class SearchReddit(backend.lib.search.Search):
 14class SearchReddit(Search):
 15	"""
 16	Search Reddit
 17
 18	Defines methods to fetch Reddit data on demand
 19	"""
 20	type = "reddit-search"  # job ID
 21	category = "Search"  # category
 22	title = "Reddit Search"  # title displayed in UI
 23	description = "Query the Pushshift API to retrieve Reddit posts and threads matching the search parameters"  # description displayed in UI
 24	extension = "csv"  # extension of result file, used internally and in UI
 25	is_local = False  # Whether this datasource is locally scraped
 26	is_static = False  # Whether this datasource is still updated
 27
 28	references = [
 29		"[API documentation](https://github.com/pushshift/api)",
 30		"[r/pushshift](https://www.reddit.com/r/pushshift/)",
 31		"[Baumgartner, J., Zannettou, S., Keegan, B., Squire, M., & Blackburn, J. (2020). The Pushshift Reddit Dataset. *Proceedings of the International AAAI Conference on Web and Social Media*, 14(1), 830-839.](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)"
 32	]
 33
 34	# not available as a processor for existing datasets
 35	accepts = [None]
 36
 37	max_workers = 1
 38	max_retries = 5
 39
 40	rate_limit = 0
 41	request_timestamps = []
 42
 43	config = {
 44		"reddit-search.can_query_without_keyword": {
 45			"type": UserInput.OPTION_TOGGLE,
 46			"help": "Can query without keyword",
 47			"default": False,
 48			"tooltip": "Allows users to query Pushshift without specifying a keyword. This can lead to HUGE datasets!"
 49		}
 50	}
 51
 52	# These change depending on the API type used,
 53	# but should be globally accessible.
 54	submission_endpoint = None
 55	comment_endpoint = None
 56	api_type = None
 57	since = "since"
 58	after = "after"
 59
 60	@classmethod
 61	def get_options(cls, parent_dataset=None, user=None):
 62		"""
 63		Determine if user needs to see the 'careful with wildcard queries!'
 64		warning
 65
 66		:param parent_dataset:
 67		:param user:
 68		:return dict:  Options definition
 69		"""
 70		options = {
 71			"wildcard-warning": {
 72				"type": UserInput.OPTION_INFO,
 73				"help": "The requirement for searching by keyword has been lifted for your account; you can search by "
 74						"date range only. This can potentially return hundreds of millions of posts, so **please be "
 75						"careful** when using this privilege."
 76			},
 77			"pushshift_track": {
 78				"type": UserInput.OPTION_CHOICE,
 79				"help": "API version",
 80				"options": {
 81					"beta": "Beta (new version)",
 82					"regular": "Regular"
 83				},
 84				"default": "beta",
 85				"tooltip": "The beta version retrieves more comments per request but may be incomplete."
 86			},
 87			"board": {
 88				"type": UserInput.OPTION_TEXT,
 89				"help": "Subreddit(s)",
 90				"tooltip": "Comma-separated"
 91			},
 92			"divider": {
 93				"type": UserInput.OPTION_DIVIDER
 94			},
 95			"intro": {
 96				"type": UserInput.OPTION_INFO,
 97				"help": "Reddit data is retrieved from [Pushshift](https://pushshift.io) (see also [this "
 98						"paper](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)). Note that Pushshift's dataset "
 99						"*may not be complete* depending on the parameters used,"
100						" data from the last few days might not be there yet,"
101						" and post scores can be out of date. "
102						"See [this paper](https://arxiv.org/pdf/1803.05046.pdf) for an overview of the gaps in data. "
103						"Double-check manually or via the official Reddit API if completeness is a concern. Check the "
104						"documentation ([beta](https://beta.pushshift.io/redoc), [regular](https://github.com/pushshift/api)) for "
105						"more information (e.g. query syntax)."
106			},
107			"body_match": {
108				"type": UserInput.OPTION_TEXT,
109				"help": "Message search",
110				"tooltip": "Matches anything in the body of a comment or post."
111			},
112			"subject_match": {
113				"type": UserInput.OPTION_TEXT,
114				"help": "Subject search",
115				"tooltip": "Matches anything in the title of a post."
116			},
117			"subject_url": {
118				"type": UserInput.OPTION_TEXT,
119				"help": "URL/domain in post",
120				"tooltip": "Regular API only; Filter for posts that link to certain sites or domains (e.g. only posts linking to reddit.com)",
121			},
122			"divider-2": {
123				"type": UserInput.OPTION_DIVIDER
124			},
125			"daterange": {
126				"type": UserInput.OPTION_DATERANGE,
127				"help": "Date range"
128			},
129			"search_scope": {
130				"type": UserInput.OPTION_CHOICE,
131				"help": "Search scope",
132				"options": {
133					"op-only": "Opening posts only (no replies/comments)",
134					"posts-only": "All matching posts",
135				},
136				"default": "posts-only"
137			}
138		}
139
140		# this warning isn't needed if the user can't search for everything
141		# anyway
142		if not config.get("reddit-search.can_query_without_keyword"):
143			del options["wildcard-warning"]
144
145		return options
146
147	@staticmethod
148	def build_query(query):
149		"""
150		Determine API call parameters
151
152		Decides what endpoints to call and with which parameters based on the
153		parameters provided by the user. There is some complexity here because
154		we support two versions of the API, each with their own protocol.
155
156		:param dict query:  Query parameters, as part of the DataSet object
157		:return tuple:  Tuple of tuples. First tuple is (submissions endpoint,
158		submission parameters), the second the same but for replies.
159		"""
160		api_type = query.get("pushshift_track", "beta")
161
162		# first, build the request parameters
163		if api_type == "regular":
164			submission_endpoint = "https://api.pushshift.io/reddit/submission/search"
165			post_endpoint = "https://api.pushshift.io/reddit/comment/search"
166
167			post_parameters = {
168				"order": "asc",
169				"sort_type": "created_utc",
170				"size": 100,  # max value
171				"metadata": True
172			}
173			since = "after"
174			until = "before"
175
176		# beta fields are a bit different.
177		elif api_type == "beta":
178			submission_endpoint = "https://beta.pushshift.io/reddit/search/submissions"
179			post_endpoint = "https://beta.pushshift.io/reddit/search/comments"
180
181			# For beta requests, we're sorting by IDs so we're not missing data.
182			# This is unavailable for the regular API.
183			post_parameters = {
184				"sort_type": "created_utc",
185				"order": "asc",
186				"limit": 1000  # max value
187			}
188			since = "since"
189			until = "until"
190
191		else:
192			raise NotImplementedError()
193
194		if query["min_date"]:
195			post_parameters[since] = int(query["min_date"])
196
197		if query["max_date"]:
198			post_parameters[until] = int(query["max_date"])
199
200		if query["board"] and query["board"] != "*":
201			post_parameters["subreddit"] = query["board"]
202
203		if query["body_match"]:
204			post_parameters["q"] = query["body_match"]
205		else:
206			post_parameters["q"] = ""
207
208		# first, search for threads - this is a separate endpoint from comments
209		submission_parameters = post_parameters.copy()
210		submission_parameters["selftext"] = submission_parameters["q"]
211
212		if query["subject_match"]:
213			submission_parameters["title"] = query["subject_match"]
214
215		# Check whether only OPs linking to certain URLs should be retrieved.
216		# Only available for the regular API.
217		if query.get("subject_url", None):
218			urls = []
219			domains = []
220
221			if "," in query["subject_url"]:
222				urls_input = query["subject_url"].split(",")
223			elif "|" in query["subject_url"]:
224				urls_input = query["subject_url"].split("|")
225			else:
226				urls_input = [query["subject_url"]]
227
228			# Input strings
229			for url in urls_input:
230				# Some cleaning
231				url = url.strip()
232				url_clean = url.replace("www.", "")
233
234				# Store urls or domains separately; different fields in Pushshift API
235				if "/" in url_clean:
236					urls.append(url)
237				else:
238					domains.append(url_clean)
239			if urls:
240				# Multiple full URLs is supposedly not supported by Pushshift
241				submission_parameters["url"] = "\'" + (",".join(urls)) + "\'"
242			if domains:
243				submission_parameters["domain"] = ",".join(domains)
244
245		return (
246			(submission_endpoint, submission_parameters),
247			(post_endpoint, post_parameters),
248		)
249
250	def get_items(self, query):
251		"""
252		Execute a query; get post data for given parameters
253
254		This queries the Pushshift API to find posts and threads mathcing the
255		given parameters.
256
257		:param dict query:  Query parameters, as part of the DataSet object
258		:return list:  Posts, sorted by thread and post ID, in ascending order
259		"""
260		scope = query.get("search_scope")
261		submission_call, post_call = self.build_query(query)
262
263		# set up query
264		total_posts = 0
265		max_retries = 3
266
267		# rate limits are not returned by the API server anymore,
268		# so we're manually setting it to 120
269		self.rate_limit = 120
270
271		# this is where we store our progress
272		total_threads = 0
273		seen_threads = set()
274		expected_results = query.get("expected-results", 0)
275
276		# loop through results bit by bit
277		while True:
278			if self.interrupted:
279				raise ProcessorInterruptedException("Interrupted while fetching thread data from the Pushshift API")
280
281			retries = 0
282			response = self.call_pushshift_api(*submission_call)
283
284			if response is None:
285				return response
286
287			threads = response.json()["data"]
288
289			if len([t for t in threads if t["id"] not in seen_threads]) == 0:
290				# we're done here, no more results will be coming
291				break
292
293			# store comment IDs for a thread, and also add the OP to the
294			# return list. This means all OPs will come before all comments
295			# but we can sort later if that turns out to be a problem
296			for thread in threads:
297				if thread.get("promoted", False):
298					continue
299
300				if thread["id"] not in seen_threads:
301					seen_threads.add(thread["id"])
302					yield self.thread_to_4cat(thread)
303					
304					# Increase the time.
305					# this is the only way to go to the next page right now...
306					submission_call[1]["after"] = thread["created_utc"]
307					
308					total_threads += 1
309
310			# update status
311			if expected_results:
312				self.dataset.update_progress(total_threads / expected_results)
313			self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_threads), "{:,}".format(expected_results) if expected_results else "unknown"))
314
315		# okay, search the pushshift API for posts
316		# we have two modes here: by keyword, or by ID. ID is set above where
317		# ID chunks are defined: these chunks are used here if available
318		seen_posts = set()
319
320		# only query for individual posts if no subject keyword is given
321		# since individual posts don't have subjects so if there is a subject
322		# query no results should be returned
323		do_body_query = not bool(query.get("subject_match", "")) and not bool(
324			query.get("subject_url", "")) and scope != "op-only"
325
326		while do_body_query:
327			if self.interrupted:
328				raise ProcessorInterruptedException("Interrupted while fetching post data from the Pushshift API")
329
330			response = self.call_pushshift_api(*post_call)
331
332			if response is None:
333				return response
334
335			if retries >= max_retries:
336				self.log.error("Error during pushshift fetch of query %s" % self.dataset.key)
337				self.dataset.update_status("Error while searching for posts on Pushshift")
338				return None
339
340			# no more posts
341			posts = response.json()["data"]
342
343			if len([p for p in posts if p["id"] not in seen_posts]) == 0:
344				# this could happen in some edge cases if we're searching by
345				# chunk (if no IDs in the chunk match the other parameters)
346				# so only break if that's not the case
347				break
348
349			# store post data
350			for post in posts:
351				if post.get("promoted", False):
352					continue
353
354				if post["id"] not in seen_posts:
355					seen_posts.add(post["id"])
356					yield self.post_to_4cat(post)
357					
358					# Increase the time.
359					# this is the only way to go to the next page right now...
360					post_call[1][self.since] = post["created_utc"]
361
362					total_posts += 1
363
364			# update our progress
365			# update status
366			if expected_results:
367				self.dataset.update_progress((total_threads + total_posts) / expected_results)
368			self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_posts + total_threads), "{:,}".format(expected_results) if expected_results else "unknown"))
369
370		# and done!
371		if total_posts == 0 and total_threads == 0:
372			self.dataset.update_status("No posts found")
373
374	@staticmethod
375	def post_to_4cat(post):
376		"""
377		Convert a pushshift post object to 4CAT post data
378
379		:param dict post:  Post data, as from the pushshift API
380		:return dict:  Re-formatted data
381		"""
382
383		return {
384			"thread_id": post["link_id"].split("_").pop(),
385			"id": post["id"],
386			"timestamp": post["created_utc"],
387			"body": post["body"].strip().replace("\r", ""),
388			"subject": "",
389			"author": post["author"],
390			"author_flair": post.get("author_flair_text", ""),
391			"post_flair": "",
392			"domain": "",
393			"url": "",
394			"image_file": "",
395			"image_md5": "",
396			"subreddit": post["subreddit"],
397			"parent": post["parent_id"],
398			# this is missing sometimes, but upon manual inspection
399			# the post always has 1 point
400			"score": post.get("score", 1)
401		}
402
403	@staticmethod
404	def thread_to_4cat(thread):
405		"""
406		Convert a pushshift thread object to 4CAT post data
407
408		:param dict post:  Post data, as from the pushshift API
409		:return dict:  Re-formatted data
410		"""
411		image_match = re.compile(r"\.(jpg|jpeg|png|gif|webm|mp4)$", flags=re.IGNORECASE)
412
413		return {
414			"thread_id": thread["id"],
415			"id": thread["id"],
416			"timestamp": thread["created_utc"],
417			"body": thread.get("selftext", "").strip().replace("\r", ""),
418			"subject": thread["title"],
419			"author": thread["author"],
420			"author_flair": thread.get("author_flair_text", ""),
421			"post_flair": thread.get("link_flair_text", ""),
422			"image_file": thread.get("url", "") if thread.get("url") and image_match.search(thread.get("url", "")) else "",
423			"domain": thread.get("domain", ""),
424			"url": thread.get("url", ""),
425			"image_md5": "",
426			"subreddit": thread["subreddit"],
427			"parent": "",
428			"score": thread.get("score", 0)
429		}
430
431	def call_pushshift_api(self, *args, **kwargs):
432		"""
433		Call pushshift API and don't crash (immediately) if it fails
434
435		Will also try to respect the rate limit, waiting before making a
436		request until it will not violate the rate limit.
437
438		:param args:
439		:param kwargs:
440		:return: Response, or `None`
441		"""
442
443		retries = 0
444		while retries < self.max_retries:
445			try:
446				self.wait_until_window()
447				response = requests.get(*args, **kwargs)
448				self.request_timestamps.append(time.time())
449				if response.status_code == 200:
450					break
451				else:
452					raise RuntimeError("HTTP %s" % response.status_code)
453			except (RuntimeError, requests.RequestException) as e:
454				self.log.info("Error %s while querying Pushshift API - waiting 15 seconds and retrying..." % e)
455				time.sleep(15)
456				retries += 1
457
458		if retries >= self.max_retries:
459			self.log.error("Error during Pushshift fetch of query %s" % self.dataset.key)
460			self.dataset.update_status("Error while searching for posts on Pushshift - API did not respond as expected")
461			return None
462
463		return response
464
465	@staticmethod
466	def get_expected_results(endpoint, parameters, api_type):
467		"""
468		Get expected result size for a query
469
470		We're not using call_pushshift_api here because that cannot be called
471		statically, which is necessary because this is called from within
472		validate_query.
473
474		:param str endpoint:  URL of the API endpoint
475		:param dict parameters:  Call parameters
476		:param api_type: Type of API (regular or beta)
477
478		:return:  Number of expected results, or `None`
479		"""
480		parameters.update({"metadata": "true", "size": 0,"track_total_hits": True})
481
482		retries = 0
483		response = None
484
485		while retries < 3:
486			try:
487				response = requests.get(endpoint, parameters, timeout=10)
488				break
489			except requests.RequestException:
490				retries += 1
491				time.sleep(retries * 5)
492				continue
493
494		if not response or response.status_code != 200:
495			return None
496		else:
497			try:
498				return response.json()["metadata"]["es"]["hits"]["total"]["value"]
499			except (json.JSONDecodeError, KeyError):
500				return None
501
502	def wait_until_window(self):
503		"""
504		Wait until a request can be made outside of the rate limit
505
506		If we have made more requests in the window (one minute) than allowed
507		by the rate limit, wait until that is no longer the case.
508		"""
509		window_start = time.time() - 60
510		has_warned = False
511
512		while len([timestamp for timestamp in self.request_timestamps if timestamp >= window_start]) >= self.rate_limit:
513			if not has_warned:
514				self.log.info("Hit Pushshift rate limit - throttling...")
515				has_warned = True
516
517			time.sleep(0.25)  # should be enough
518
519		# clean up timestamps outside of window
520		self.request_timestamps = [timestamp for timestamp in self.request_timestamps if timestamp >= window_start]
521
522	def validate_query(query, request, user):
523		"""
524		Validate input for a dataset query on the 4chan data source.
525
526		Will raise a QueryParametersException if invalid parameters are
527		encountered. Mutually exclusive parameters may also be sanitised by
528		ignoring either of the mutually exclusive options.
529
530		:param dict query:  Query parameters, from client-side.
531		:param request:  Flask request
532		:param User user:  User object of user who has submitted the query
533		:return dict:  Safe query parameters
534		"""
535		# we need a board!
536		r_prefix = re.compile(r"^/?r/")
537		boards = [r_prefix.sub("", board).strip() for board in query.get("board", "").split(",") if board.strip()]
538
539		if not boards:
540			raise QueryParametersException("Please provide a board or a comma-separated list of boards to query.")
541
542		# ignore leading r/ for boards
543		query["board"] = ",".join(boards)
544
545		keywordless_query = config.get("reddit-search.can_query_without_keyword", False, user=user)
546
547		# this is the bare minimum, else we can't narrow down the full data set
548		if not user.is_admin and not keywordless_query and not query.get(
549				"body_match", "").strip() and not query.get("subject_match", "").strip() and not query.get(
550			"subject_url", ""):
551			raise QueryParametersException("Please provide a body query or subject query.")
552
553		# body query and full threads are incompatible, returning too many posts
554		# in most cases
555		if query.get("body_match", None):
556			if "full_threads" in query:
557				del query["full_threads"]
558
559		# Make sure no body or subject searches starting with just a minus sign are possible, e.g. "-Trump"
560		if query.get("body_match", None) or query.get("subject_match", None):
561			queries_to_check = []
562
563			if query.get("body_match", None):
564				queries_to_check += [body_query.strip() for body_query in query["body_match"].split(" ")]
565
566			if query.get("subject_match", None):
567				queries_to_check += [subject_query.strip() for subject_query in query["subject_match"].split(" ")]
568
569			startswith_minus = [query_check.startswith("-") for query_check in queries_to_check]
570			if all(startswith_minus):
571				raise QueryParametersException("Please provide body queries that do not start with a minus sign.")
572
573		# URL queries are not possible (yet) for the beta API
574		if query.get("pushshift_track") == "beta" and query.get("subject_url", None):
575			raise QueryParametersException("URL querying is not possible (yet) for the beta endpoint.")
576
577		# both dates need to be set, or none
578		if query.get("min_date", None) and not query.get("max_date", None):
579			raise QueryParametersException("When setting a date range, please provide both an upper and lower limit.")
580
581		# the dates need to make sense as a range to search within
582		query["min_date"], query["max_date"] = query.get("daterange")
583
584		if "*" in query.get("body_match", "") and not keywordless_query:
585			raise QueryParametersException(
586				"Wildcard queries are not allowed as they typically return too many results to properly process.")
587
588		if "*" in query.get("board", "") and not keywordless_query:
589			raise QueryParametersException(
590				"Wildcards are not allowed for boards as this typically returns too many results to properly process.")
591
592		del query["daterange"]
593
594		params = SearchReddit.build_query(query)
595		expected_posts = SearchReddit.get_expected_results(*params[0], query.get("pushshift_track", "regular"))
596		if not expected_posts:
597			expected_posts = 0
598
599		# determine how many results to expect
600		# this adds a small delay since we need to talk to the API before
601		# returning to the user, but the benefit is that we reduce the amount
602		# of too-large queries (because users are warned beforehand) and can
603		# give a progress indication for queries that do go through
604		if query.get("search_scope") != "op-only":
605			expected_replies = SearchReddit.get_expected_results(*params[1], query.get("pushshift_track", "regular"))
606			expected_posts += expected_replies if expected_replies else 0
607
608		if expected_posts:
609			pps = 672 if query.get("pushshift_track") == "beta" else 44
610			expected_seconds = int(expected_posts / pps)  # seems to be about this
611			expected_time = timify_long(expected_seconds)
612			query["expected-results"] = expected_posts
613
614			if expected_seconds > 1800 and not query.get("frontend-confirm"):
615				raise QueryNeedsExplicitConfirmationException(
616					"This query will return approximately %s items. This will take a long time (approximately %s)."
617					" Are you sure you want to run this query?" % ("{:,}".format(expected_posts), expected_time))
618
619		# if we made it this far, the query can be executed
620		return query

Search Reddit

Defines methods to fetch Reddit data on demand

type = 'reddit-search'
category = 'Search'
title = 'Reddit Search'
description = 'Query the Pushshift API to retrieve Reddit posts and threads matching the search parameters'
extension = 'csv'
is_local = False
is_static = False
references = ['[API documentation](https://github.com/pushshift/api)', '[r/pushshift](https://www.reddit.com/r/pushshift/)', '[Baumgartner, J., Zannettou, S., Keegan, B., Squire, M., & Blackburn, J. (2020). The Pushshift Reddit Dataset. *Proceedings of the International AAAI Conference on Web and Social Media*, 14(1), 830-839.](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)']
accepts = [None]
max_workers = 1
max_retries = 5
rate_limit = 0
request_timestamps = []
config = {'reddit-search.can_query_without_keyword': {'type': 'toggle', 'help': 'Can query without keyword', 'default': False, 'tooltip': 'Allows users to query Pushshift without specifying a keyword. This can lead to HUGE datasets!'}}
submission_endpoint = None
comment_endpoint = None
api_type = None
since = 'since'
after = 'after'
@classmethod
def get_options(cls, parent_dataset=None, user=None):
 60	@classmethod
 61	def get_options(cls, parent_dataset=None, user=None):
 62		"""
 63		Determine if user needs to see the 'careful with wildcard queries!'
 64		warning
 65
 66		:param parent_dataset:
 67		:param user:
 68		:return dict:  Options definition
 69		"""
 70		options = {
 71			"wildcard-warning": {
 72				"type": UserInput.OPTION_INFO,
 73				"help": "The requirement for searching by keyword has been lifted for your account; you can search by "
 74						"date range only. This can potentially return hundreds of millions of posts, so **please be "
 75						"careful** when using this privilege."
 76			},
 77			"pushshift_track": {
 78				"type": UserInput.OPTION_CHOICE,
 79				"help": "API version",
 80				"options": {
 81					"beta": "Beta (new version)",
 82					"regular": "Regular"
 83				},
 84				"default": "beta",
 85				"tooltip": "The beta version retrieves more comments per request but may be incomplete."
 86			},
 87			"board": {
 88				"type": UserInput.OPTION_TEXT,
 89				"help": "Subreddit(s)",
 90				"tooltip": "Comma-separated"
 91			},
 92			"divider": {
 93				"type": UserInput.OPTION_DIVIDER
 94			},
 95			"intro": {
 96				"type": UserInput.OPTION_INFO,
 97				"help": "Reddit data is retrieved from [Pushshift](https://pushshift.io) (see also [this "
 98						"paper](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)). Note that Pushshift's dataset "
 99						"*may not be complete* depending on the parameters used,"
100						" data from the last few days might not be there yet,"
101						" and post scores can be out of date. "
102						"See [this paper](https://arxiv.org/pdf/1803.05046.pdf) for an overview of the gaps in data. "
103						"Double-check manually or via the official Reddit API if completeness is a concern. Check the "
104						"documentation ([beta](https://beta.pushshift.io/redoc), [regular](https://github.com/pushshift/api)) for "
105						"more information (e.g. query syntax)."
106			},
107			"body_match": {
108				"type": UserInput.OPTION_TEXT,
109				"help": "Message search",
110				"tooltip": "Matches anything in the body of a comment or post."
111			},
112			"subject_match": {
113				"type": UserInput.OPTION_TEXT,
114				"help": "Subject search",
115				"tooltip": "Matches anything in the title of a post."
116			},
117			"subject_url": {
118				"type": UserInput.OPTION_TEXT,
119				"help": "URL/domain in post",
120				"tooltip": "Regular API only; Filter for posts that link to certain sites or domains (e.g. only posts linking to reddit.com)",
121			},
122			"divider-2": {
123				"type": UserInput.OPTION_DIVIDER
124			},
125			"daterange": {
126				"type": UserInput.OPTION_DATERANGE,
127				"help": "Date range"
128			},
129			"search_scope": {
130				"type": UserInput.OPTION_CHOICE,
131				"help": "Search scope",
132				"options": {
133					"op-only": "Opening posts only (no replies/comments)",
134					"posts-only": "All matching posts",
135				},
136				"default": "posts-only"
137			}
138		}
139
140		# this warning isn't needed if the user can't search for everything
141		# anyway
142		if not config.get("reddit-search.can_query_without_keyword"):
143			del options["wildcard-warning"]
144
145		return options

Determine if user needs to see the 'careful with wildcard queries!' warning

Parameters
  • parent_dataset:
  • user:
Returns

Options definition

@staticmethod
def build_query(query):
147	@staticmethod
148	def build_query(query):
149		"""
150		Determine API call parameters
151
152		Decides what endpoints to call and with which parameters based on the
153		parameters provided by the user. There is some complexity here because
154		we support two versions of the API, each with their own protocol.
155
156		:param dict query:  Query parameters, as part of the DataSet object
157		:return tuple:  Tuple of tuples. First tuple is (submissions endpoint,
158		submission parameters), the second the same but for replies.
159		"""
160		api_type = query.get("pushshift_track", "beta")
161
162		# first, build the request parameters
163		if api_type == "regular":
164			submission_endpoint = "https://api.pushshift.io/reddit/submission/search"
165			post_endpoint = "https://api.pushshift.io/reddit/comment/search"
166
167			post_parameters = {
168				"order": "asc",
169				"sort_type": "created_utc",
170				"size": 100,  # max value
171				"metadata": True
172			}
173			since = "after"
174			until = "before"
175
176		# beta fields are a bit different.
177		elif api_type == "beta":
178			submission_endpoint = "https://beta.pushshift.io/reddit/search/submissions"
179			post_endpoint = "https://beta.pushshift.io/reddit/search/comments"
180
181			# For beta requests, we're sorting by IDs so we're not missing data.
182			# This is unavailable for the regular API.
183			post_parameters = {
184				"sort_type": "created_utc",
185				"order": "asc",
186				"limit": 1000  # max value
187			}
188			since = "since"
189			until = "until"
190
191		else:
192			raise NotImplementedError()
193
194		if query["min_date"]:
195			post_parameters[since] = int(query["min_date"])
196
197		if query["max_date"]:
198			post_parameters[until] = int(query["max_date"])
199
200		if query["board"] and query["board"] != "*":
201			post_parameters["subreddit"] = query["board"]
202
203		if query["body_match"]:
204			post_parameters["q"] = query["body_match"]
205		else:
206			post_parameters["q"] = ""
207
208		# first, search for threads - this is a separate endpoint from comments
209		submission_parameters = post_parameters.copy()
210		submission_parameters["selftext"] = submission_parameters["q"]
211
212		if query["subject_match"]:
213			submission_parameters["title"] = query["subject_match"]
214
215		# Check whether only OPs linking to certain URLs should be retrieved.
216		# Only available for the regular API.
217		if query.get("subject_url", None):
218			urls = []
219			domains = []
220
221			if "," in query["subject_url"]:
222				urls_input = query["subject_url"].split(",")
223			elif "|" in query["subject_url"]:
224				urls_input = query["subject_url"].split("|")
225			else:
226				urls_input = [query["subject_url"]]
227
228			# Input strings
229			for url in urls_input:
230				# Some cleaning
231				url = url.strip()
232				url_clean = url.replace("www.", "")
233
234				# Store urls or domains separately; different fields in Pushshift API
235				if "/" in url_clean:
236					urls.append(url)
237				else:
238					domains.append(url_clean)
239			if urls:
240				# Multiple full URLs is supposedly not supported by Pushshift
241				submission_parameters["url"] = "\'" + (",".join(urls)) + "\'"
242			if domains:
243				submission_parameters["domain"] = ",".join(domains)
244
245		return (
246			(submission_endpoint, submission_parameters),
247			(post_endpoint, post_parameters),
248		)

Determine API call parameters

Decides what endpoints to call and with which parameters based on the parameters provided by the user. There is some complexity here because we support two versions of the API, each with their own protocol.

Parameters
  • dict query: Query parameters, as part of the DataSet object
Returns

Tuple of tuples. First tuple is (submissions endpoint, submission parameters), the second the same but for replies.

def get_items(self, query):
250	def get_items(self, query):
251		"""
252		Execute a query; get post data for given parameters
253
254		This queries the Pushshift API to find posts and threads mathcing the
255		given parameters.
256
257		:param dict query:  Query parameters, as part of the DataSet object
258		:return list:  Posts, sorted by thread and post ID, in ascending order
259		"""
260		scope = query.get("search_scope")
261		submission_call, post_call = self.build_query(query)
262
263		# set up query
264		total_posts = 0
265		max_retries = 3
266
267		# rate limits are not returned by the API server anymore,
268		# so we're manually setting it to 120
269		self.rate_limit = 120
270
271		# this is where we store our progress
272		total_threads = 0
273		seen_threads = set()
274		expected_results = query.get("expected-results", 0)
275
276		# loop through results bit by bit
277		while True:
278			if self.interrupted:
279				raise ProcessorInterruptedException("Interrupted while fetching thread data from the Pushshift API")
280
281			retries = 0
282			response = self.call_pushshift_api(*submission_call)
283
284			if response is None:
285				return response
286
287			threads = response.json()["data"]
288
289			if len([t for t in threads if t["id"] not in seen_threads]) == 0:
290				# we're done here, no more results will be coming
291				break
292
293			# store comment IDs for a thread, and also add the OP to the
294			# return list. This means all OPs will come before all comments
295			# but we can sort later if that turns out to be a problem
296			for thread in threads:
297				if thread.get("promoted", False):
298					continue
299
300				if thread["id"] not in seen_threads:
301					seen_threads.add(thread["id"])
302					yield self.thread_to_4cat(thread)
303					
304					# Increase the time.
305					# this is the only way to go to the next page right now...
306					submission_call[1]["after"] = thread["created_utc"]
307					
308					total_threads += 1
309
310			# update status
311			if expected_results:
312				self.dataset.update_progress(total_threads / expected_results)
313			self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_threads), "{:,}".format(expected_results) if expected_results else "unknown"))
314
315		# okay, search the pushshift API for posts
316		# we have two modes here: by keyword, or by ID. ID is set above where
317		# ID chunks are defined: these chunks are used here if available
318		seen_posts = set()
319
320		# only query for individual posts if no subject keyword is given
321		# since individual posts don't have subjects so if there is a subject
322		# query no results should be returned
323		do_body_query = not bool(query.get("subject_match", "")) and not bool(
324			query.get("subject_url", "")) and scope != "op-only"
325
326		while do_body_query:
327			if self.interrupted:
328				raise ProcessorInterruptedException("Interrupted while fetching post data from the Pushshift API")
329
330			response = self.call_pushshift_api(*post_call)
331
332			if response is None:
333				return response
334
335			if retries >= max_retries:
336				self.log.error("Error during pushshift fetch of query %s" % self.dataset.key)
337				self.dataset.update_status("Error while searching for posts on Pushshift")
338				return None
339
340			# no more posts
341			posts = response.json()["data"]
342
343			if len([p for p in posts if p["id"] not in seen_posts]) == 0:
344				# this could happen in some edge cases if we're searching by
345				# chunk (if no IDs in the chunk match the other parameters)
346				# so only break if that's not the case
347				break
348
349			# store post data
350			for post in posts:
351				if post.get("promoted", False):
352					continue
353
354				if post["id"] not in seen_posts:
355					seen_posts.add(post["id"])
356					yield self.post_to_4cat(post)
357					
358					# Increase the time.
359					# this is the only way to go to the next page right now...
360					post_call[1][self.since] = post["created_utc"]
361
362					total_posts += 1
363
364			# update our progress
365			# update status
366			if expected_results:
367				self.dataset.update_progress((total_threads + total_posts) / expected_results)
368			self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_posts + total_threads), "{:,}".format(expected_results) if expected_results else "unknown"))
369
370		# and done!
371		if total_posts == 0 and total_threads == 0:
372			self.dataset.update_status("No posts found")

Execute a query; get post data for given parameters

This queries the Pushshift API to find posts and threads mathcing the given parameters.

Parameters
  • dict query: Query parameters, as part of the DataSet object
Returns

Posts, sorted by thread and post ID, in ascending order

@staticmethod
def post_to_4cat(post):
374	@staticmethod
375	def post_to_4cat(post):
376		"""
377		Convert a pushshift post object to 4CAT post data
378
379		:param dict post:  Post data, as from the pushshift API
380		:return dict:  Re-formatted data
381		"""
382
383		return {
384			"thread_id": post["link_id"].split("_").pop(),
385			"id": post["id"],
386			"timestamp": post["created_utc"],
387			"body": post["body"].strip().replace("\r", ""),
388			"subject": "",
389			"author": post["author"],
390			"author_flair": post.get("author_flair_text", ""),
391			"post_flair": "",
392			"domain": "",
393			"url": "",
394			"image_file": "",
395			"image_md5": "",
396			"subreddit": post["subreddit"],
397			"parent": post["parent_id"],
398			# this is missing sometimes, but upon manual inspection
399			# the post always has 1 point
400			"score": post.get("score", 1)
401		}

Convert a pushshift post object to 4CAT post data

Parameters
  • dict post: Post data, as from the pushshift API
Returns

Re-formatted data

@staticmethod
def thread_to_4cat(thread):
403	@staticmethod
404	def thread_to_4cat(thread):
405		"""
406		Convert a pushshift thread object to 4CAT post data
407
408		:param dict post:  Post data, as from the pushshift API
409		:return dict:  Re-formatted data
410		"""
411		image_match = re.compile(r"\.(jpg|jpeg|png|gif|webm|mp4)$", flags=re.IGNORECASE)
412
413		return {
414			"thread_id": thread["id"],
415			"id": thread["id"],
416			"timestamp": thread["created_utc"],
417			"body": thread.get("selftext", "").strip().replace("\r", ""),
418			"subject": thread["title"],
419			"author": thread["author"],
420			"author_flair": thread.get("author_flair_text", ""),
421			"post_flair": thread.get("link_flair_text", ""),
422			"image_file": thread.get("url", "") if thread.get("url") and image_match.search(thread.get("url", "")) else "",
423			"domain": thread.get("domain", ""),
424			"url": thread.get("url", ""),
425			"image_md5": "",
426			"subreddit": thread["subreddit"],
427			"parent": "",
428			"score": thread.get("score", 0)
429		}

Convert a pushshift thread object to 4CAT post data

Parameters
  • dict post: Post data, as from the pushshift API
Returns

Re-formatted data

def call_pushshift_api(self, *args, **kwargs):
431	def call_pushshift_api(self, *args, **kwargs):
432		"""
433		Call pushshift API and don't crash (immediately) if it fails
434
435		Will also try to respect the rate limit, waiting before making a
436		request until it will not violate the rate limit.
437
438		:param args:
439		:param kwargs:
440		:return: Response, or `None`
441		"""
442
443		retries = 0
444		while retries < self.max_retries:
445			try:
446				self.wait_until_window()
447				response = requests.get(*args, **kwargs)
448				self.request_timestamps.append(time.time())
449				if response.status_code == 200:
450					break
451				else:
452					raise RuntimeError("HTTP %s" % response.status_code)
453			except (RuntimeError, requests.RequestException) as e:
454				self.log.info("Error %s while querying Pushshift API - waiting 15 seconds and retrying..." % e)
455				time.sleep(15)
456				retries += 1
457
458		if retries >= self.max_retries:
459			self.log.error("Error during Pushshift fetch of query %s" % self.dataset.key)
460			self.dataset.update_status("Error while searching for posts on Pushshift - API did not respond as expected")
461			return None
462
463		return response

Call pushshift API and don't crash (immediately) if it fails

Will also try to respect the rate limit, waiting before making a request until it will not violate the rate limit.

Parameters
  • args:
  • kwargs:
Returns

Response, or None

@staticmethod
def get_expected_results(endpoint, parameters, api_type):
465	@staticmethod
466	def get_expected_results(endpoint, parameters, api_type):
467		"""
468		Get expected result size for a query
469
470		We're not using call_pushshift_api here because that cannot be called
471		statically, which is necessary because this is called from within
472		validate_query.
473
474		:param str endpoint:  URL of the API endpoint
475		:param dict parameters:  Call parameters
476		:param api_type: Type of API (regular or beta)
477
478		:return:  Number of expected results, or `None`
479		"""
480		parameters.update({"metadata": "true", "size": 0,"track_total_hits": True})
481
482		retries = 0
483		response = None
484
485		while retries < 3:
486			try:
487				response = requests.get(endpoint, parameters, timeout=10)
488				break
489			except requests.RequestException:
490				retries += 1
491				time.sleep(retries * 5)
492				continue
493
494		if not response or response.status_code != 200:
495			return None
496		else:
497			try:
498				return response.json()["metadata"]["es"]["hits"]["total"]["value"]
499			except (json.JSONDecodeError, KeyError):
500				return None

Get expected result size for a query

We're not using call_pushshift_api here because that cannot be called statically, which is necessary because this is called from within validate_query.

Parameters
  • str endpoint: URL of the API endpoint
  • dict parameters: Call parameters
  • api_type: Type of API (regular or beta)
Returns

Number of expected results, or None

def wait_until_window(self):
502	def wait_until_window(self):
503		"""
504		Wait until a request can be made outside of the rate limit
505
506		If we have made more requests in the window (one minute) than allowed
507		by the rate limit, wait until that is no longer the case.
508		"""
509		window_start = time.time() - 60
510		has_warned = False
511
512		while len([timestamp for timestamp in self.request_timestamps if timestamp >= window_start]) >= self.rate_limit:
513			if not has_warned:
514				self.log.info("Hit Pushshift rate limit - throttling...")
515				has_warned = True
516
517			time.sleep(0.25)  # should be enough
518
519		# clean up timestamps outside of window
520		self.request_timestamps = [timestamp for timestamp in self.request_timestamps if timestamp >= window_start]

Wait until a request can be made outside of the rate limit

If we have made more requests in the window (one minute) than allowed by the rate limit, wait until that is no longer the case.

def validate_query(query, request, user):
522	def validate_query(query, request, user):
523		"""
524		Validate input for a dataset query on the 4chan data source.
525
526		Will raise a QueryParametersException if invalid parameters are
527		encountered. Mutually exclusive parameters may also be sanitised by
528		ignoring either of the mutually exclusive options.
529
530		:param dict query:  Query parameters, from client-side.
531		:param request:  Flask request
532		:param User user:  User object of user who has submitted the query
533		:return dict:  Safe query parameters
534		"""
535		# we need a board!
536		r_prefix = re.compile(r"^/?r/")
537		boards = [r_prefix.sub("", board).strip() for board in query.get("board", "").split(",") if board.strip()]
538
539		if not boards:
540			raise QueryParametersException("Please provide a board or a comma-separated list of boards to query.")
541
542		# ignore leading r/ for boards
543		query["board"] = ",".join(boards)
544
545		keywordless_query = config.get("reddit-search.can_query_without_keyword", False, user=user)
546
547		# this is the bare minimum, else we can't narrow down the full data set
548		if not user.is_admin and not keywordless_query and not query.get(
549				"body_match", "").strip() and not query.get("subject_match", "").strip() and not query.get(
550			"subject_url", ""):
551			raise QueryParametersException("Please provide a body query or subject query.")
552
553		# body query and full threads are incompatible, returning too many posts
554		# in most cases
555		if query.get("body_match", None):
556			if "full_threads" in query:
557				del query["full_threads"]
558
559		# Make sure no body or subject searches starting with just a minus sign are possible, e.g. "-Trump"
560		if query.get("body_match", None) or query.get("subject_match", None):
561			queries_to_check = []
562
563			if query.get("body_match", None):
564				queries_to_check += [body_query.strip() for body_query in query["body_match"].split(" ")]
565
566			if query.get("subject_match", None):
567				queries_to_check += [subject_query.strip() for subject_query in query["subject_match"].split(" ")]
568
569			startswith_minus = [query_check.startswith("-") for query_check in queries_to_check]
570			if all(startswith_minus):
571				raise QueryParametersException("Please provide body queries that do not start with a minus sign.")
572
573		# URL queries are not possible (yet) for the beta API
574		if query.get("pushshift_track") == "beta" and query.get("subject_url", None):
575			raise QueryParametersException("URL querying is not possible (yet) for the beta endpoint.")
576
577		# both dates need to be set, or none
578		if query.get("min_date", None) and not query.get("max_date", None):
579			raise QueryParametersException("When setting a date range, please provide both an upper and lower limit.")
580
581		# the dates need to make sense as a range to search within
582		query["min_date"], query["max_date"] = query.get("daterange")
583
584		if "*" in query.get("body_match", "") and not keywordless_query:
585			raise QueryParametersException(
586				"Wildcard queries are not allowed as they typically return too many results to properly process.")
587
588		if "*" in query.get("board", "") and not keywordless_query:
589			raise QueryParametersException(
590				"Wildcards are not allowed for boards as this typically returns too many results to properly process.")
591
592		del query["daterange"]
593
594		params = SearchReddit.build_query(query)
595		expected_posts = SearchReddit.get_expected_results(*params[0], query.get("pushshift_track", "regular"))
596		if not expected_posts:
597			expected_posts = 0
598
599		# determine how many results to expect
600		# this adds a small delay since we need to talk to the API before
601		# returning to the user, but the benefit is that we reduce the amount
602		# of too-large queries (because users are warned beforehand) and can
603		# give a progress indication for queries that do go through
604		if query.get("search_scope") != "op-only":
605			expected_replies = SearchReddit.get_expected_results(*params[1], query.get("pushshift_track", "regular"))
606			expected_posts += expected_replies if expected_replies else 0
607
608		if expected_posts:
609			pps = 672 if query.get("pushshift_track") == "beta" else 44
610			expected_seconds = int(expected_posts / pps)  # seems to be about this
611			expected_time = timify_long(expected_seconds)
612			query["expected-results"] = expected_posts
613
614			if expected_seconds > 1800 and not query.get("frontend-confirm"):
615				raise QueryNeedsExplicitConfirmationException(
616					"This query will return approximately %s items. This will take a long time (approximately %s)."
617					" Are you sure you want to run this query?" % ("{:,}".format(expected_posts), expected_time))
618
619		# if we made it this far, the query can be executed
620		return query

Validate input for a dataset query on the 4chan data source.

Will raise a QueryParametersException if invalid parameters are encountered. Mutually exclusive parameters may also be sanitised by ignoring either of the mutually exclusive options.

Parameters
  • dict query: Query parameters, from client-side.
  • request: Flask request
  • User user: User object of user who has submitted the query
Returns

Safe query parameters