datasources.reddit.search_reddit
1import requests 2import json 3import time 4import re 5 6from backend.lib.search import Search 7from common.lib.exceptions import QueryParametersException, ProcessorInterruptedException, QueryNeedsExplicitConfirmationException 8from common.lib.helpers import UserInput, timify_long 9 10from common.config_manager import config 11 12 13class SearchReddit(Search): 14 """ 15 Search Reddit 16 17 Defines methods to fetch Reddit data on demand 18 """ 19 type = "reddit-search" # job ID 20 category = "Search" # category 21 title = "Reddit Search" # title displayed in UI 22 description = "Query the Pushshift API to retrieve Reddit posts and threads matching the search parameters" # description displayed in UI 23 extension = "csv" # extension of result file, used internally and in UI 24 is_local = False # Whether this datasource is locally scraped 25 is_static = False # Whether this datasource is still updated 26 27 references = [ 28 "[API documentation](https://github.com/pushshift/api)", 29 "[r/pushshift](https://www.reddit.com/r/pushshift/)", 30 "[Baumgartner, J., Zannettou, S., Keegan, B., Squire, M., & Blackburn, J. (2020). The Pushshift Reddit Dataset. *Proceedings of the International AAAI Conference on Web and Social Media*, 14(1), 830-839.](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)" 31 ] 32 33 # not available as a processor for existing datasets 34 accepts = [None] 35 36 max_workers = 1 37 max_retries = 5 38 39 rate_limit = 0 40 request_timestamps = [] 41 42 config = { 43 "reddit-search.can_query_without_keyword": { 44 "type": UserInput.OPTION_TOGGLE, 45 "help": "Can query without keyword", 46 "default": False, 47 "tooltip": "Allows users to query Pushshift without specifying a keyword. This can lead to HUGE datasets!" 48 } 49 } 50 51 # These change depending on the API type used, 52 # but should be globally accessible. 53 submission_endpoint = None 54 comment_endpoint = None 55 api_type = None 56 since = "since" 57 after = "after" 58 59 @classmethod 60 def get_options(cls, parent_dataset=None, user=None): 61 """ 62 Determine if user needs to see the 'careful with wildcard queries!' 63 warning 64 65 :param parent_dataset: 66 :param user: 67 :return dict: Options definition 68 """ 69 options = { 70 "wildcard-warning": { 71 "type": UserInput.OPTION_INFO, 72 "help": "The requirement for searching by keyword has been lifted for your account; you can search by " 73 "date range only. This can potentially return hundreds of millions of posts, so **please be " 74 "careful** when using this privilege." 75 }, 76 "pushshift_track": { 77 "type": UserInput.OPTION_CHOICE, 78 "help": "API version", 79 "options": { 80 "beta": "Beta (new version)", 81 "regular": "Regular" 82 }, 83 "default": "beta", 84 "tooltip": "The beta version retrieves more comments per request but may be incomplete." 85 }, 86 "board": { 87 "type": UserInput.OPTION_TEXT, 88 "help": "Subreddit(s)", 89 "tooltip": "Comma-separated" 90 }, 91 "divider": { 92 "type": UserInput.OPTION_DIVIDER 93 }, 94 "intro": { 95 "type": UserInput.OPTION_INFO, 96 "help": "Reddit data is retrieved from [Pushshift](https://pushshift.io) (see also [this " 97 "paper](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)). Note that Pushshift's dataset " 98 "*may not be complete* depending on the parameters used," 99 " data from the last few days might not be there yet," 100 " and post scores can be out of date. " 101 "See [this paper](https://arxiv.org/pdf/1803.05046.pdf) for an overview of the gaps in data. " 102 "Double-check manually or via the official Reddit API if completeness is a concern. Check the " 103 "documentation ([beta](https://beta.pushshift.io/redoc), [regular](https://github.com/pushshift/api)) for " 104 "more information (e.g. query syntax)." 105 }, 106 "body_match": { 107 "type": UserInput.OPTION_TEXT, 108 "help": "Message search", 109 "tooltip": "Matches anything in the body of a comment or post." 110 }, 111 "subject_match": { 112 "type": UserInput.OPTION_TEXT, 113 "help": "Subject search", 114 "tooltip": "Matches anything in the title of a post." 115 }, 116 "subject_url": { 117 "type": UserInput.OPTION_TEXT, 118 "help": "URL/domain in post", 119 "tooltip": "Regular API only; Filter for posts that link to certain sites or domains (e.g. only posts linking to reddit.com)", 120 }, 121 "divider-2": { 122 "type": UserInput.OPTION_DIVIDER 123 }, 124 "daterange": { 125 "type": UserInput.OPTION_DATERANGE, 126 "help": "Date range" 127 }, 128 "search_scope": { 129 "type": UserInput.OPTION_CHOICE, 130 "help": "Search scope", 131 "options": { 132 "op-only": "Opening posts only (no replies/comments)", 133 "posts-only": "All matching posts", 134 }, 135 "default": "posts-only" 136 } 137 } 138 139 # this warning isn't needed if the user can't search for everything 140 # anyway 141 if not config.get("reddit-search.can_query_without_keyword"): 142 del options["wildcard-warning"] 143 144 return options 145 146 @staticmethod 147 def build_query(query): 148 """ 149 Determine API call parameters 150 151 Decides what endpoints to call and with which parameters based on the 152 parameters provided by the user. There is some complexity here because 153 we support two versions of the API, each with their own protocol. 154 155 :param dict query: Query parameters, as part of the DataSet object 156 :return tuple: Tuple of tuples. First tuple is (submissions endpoint, 157 submission parameters), the second the same but for replies. 158 """ 159 api_type = query.get("pushshift_track", "beta") 160 161 # first, build the request parameters 162 if api_type == "regular": 163 submission_endpoint = "https://api.pushshift.io/reddit/submission/search" 164 post_endpoint = "https://api.pushshift.io/reddit/comment/search" 165 166 post_parameters = { 167 "order": "asc", 168 "sort_type": "created_utc", 169 "size": 100, # max value 170 "metadata": True 171 } 172 since = "after" 173 until = "before" 174 175 # beta fields are a bit different. 176 elif api_type == "beta": 177 submission_endpoint = "https://beta.pushshift.io/reddit/search/submissions" 178 post_endpoint = "https://beta.pushshift.io/reddit/search/comments" 179 180 # For beta requests, we're sorting by IDs so we're not missing data. 181 # This is unavailable for the regular API. 182 post_parameters = { 183 "sort_type": "created_utc", 184 "order": "asc", 185 "limit": 1000 # max value 186 } 187 since = "since" 188 until = "until" 189 190 else: 191 raise NotImplementedError() 192 193 if query["min_date"]: 194 post_parameters[since] = int(query["min_date"]) 195 196 if query["max_date"]: 197 post_parameters[until] = int(query["max_date"]) 198 199 if query["board"] and query["board"] != "*": 200 post_parameters["subreddit"] = query["board"] 201 202 if query["body_match"]: 203 post_parameters["q"] = query["body_match"] 204 else: 205 post_parameters["q"] = "" 206 207 # first, search for threads - this is a separate endpoint from comments 208 submission_parameters = post_parameters.copy() 209 submission_parameters["selftext"] = submission_parameters["q"] 210 211 if query["subject_match"]: 212 submission_parameters["title"] = query["subject_match"] 213 214 # Check whether only OPs linking to certain URLs should be retrieved. 215 # Only available for the regular API. 216 if query.get("subject_url", None): 217 urls = [] 218 domains = [] 219 220 if "," in query["subject_url"]: 221 urls_input = query["subject_url"].split(",") 222 elif "|" in query["subject_url"]: 223 urls_input = query["subject_url"].split("|") 224 else: 225 urls_input = [query["subject_url"]] 226 227 # Input strings 228 for url in urls_input: 229 # Some cleaning 230 url = url.strip() 231 url_clean = url.replace("www.", "") 232 233 # Store urls or domains separately; different fields in Pushshift API 234 if "/" in url_clean: 235 urls.append(url) 236 else: 237 domains.append(url_clean) 238 if urls: 239 # Multiple full URLs is supposedly not supported by Pushshift 240 submission_parameters["url"] = "\'" + (",".join(urls)) + "\'" 241 if domains: 242 submission_parameters["domain"] = ",".join(domains) 243 244 return ( 245 (submission_endpoint, submission_parameters), 246 (post_endpoint, post_parameters), 247 ) 248 249 def get_items(self, query): 250 """ 251 Execute a query; get post data for given parameters 252 253 This queries the Pushshift API to find posts and threads mathcing the 254 given parameters. 255 256 :param dict query: Query parameters, as part of the DataSet object 257 :return list: Posts, sorted by thread and post ID, in ascending order 258 """ 259 scope = query.get("search_scope") 260 submission_call, post_call = self.build_query(query) 261 262 # set up query 263 total_posts = 0 264 max_retries = 3 265 266 # rate limits are not returned by the API server anymore, 267 # so we're manually setting it to 120 268 self.rate_limit = 120 269 270 # this is where we store our progress 271 total_threads = 0 272 seen_threads = set() 273 expected_results = query.get("expected-results", 0) 274 275 # loop through results bit by bit 276 while True: 277 if self.interrupted: 278 raise ProcessorInterruptedException("Interrupted while fetching thread data from the Pushshift API") 279 280 retries = 0 281 response = self.call_pushshift_api(*submission_call) 282 283 if response is None: 284 return response 285 286 threads = response.json()["data"] 287 288 if len([t for t in threads if t["id"] not in seen_threads]) == 0: 289 # we're done here, no more results will be coming 290 break 291 292 # store comment IDs for a thread, and also add the OP to the 293 # return list. This means all OPs will come before all comments 294 # but we can sort later if that turns out to be a problem 295 for thread in threads: 296 if thread.get("promoted", False): 297 continue 298 299 if thread["id"] not in seen_threads: 300 seen_threads.add(thread["id"]) 301 yield self.thread_to_4cat(thread) 302 303 # Increase the time. 304 # this is the only way to go to the next page right now... 305 submission_call[1]["after"] = thread["created_utc"] 306 307 total_threads += 1 308 309 # update status 310 if expected_results: 311 self.dataset.update_progress(total_threads / expected_results) 312 self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_threads), "{:,}".format(expected_results) if expected_results else "unknown")) 313 314 # okay, search the pushshift API for posts 315 # we have two modes here: by keyword, or by ID. ID is set above where 316 # ID chunks are defined: these chunks are used here if available 317 seen_posts = set() 318 319 # only query for individual posts if no subject keyword is given 320 # since individual posts don't have subjects so if there is a subject 321 # query no results should be returned 322 do_body_query = not bool(query.get("subject_match", "")) and not bool( 323 query.get("subject_url", "")) and scope != "op-only" 324 325 while do_body_query: 326 if self.interrupted: 327 raise ProcessorInterruptedException("Interrupted while fetching post data from the Pushshift API") 328 329 response = self.call_pushshift_api(*post_call) 330 331 if response is None: 332 return response 333 334 if retries >= max_retries: 335 self.log.error("Error during pushshift fetch of query %s" % self.dataset.key) 336 self.dataset.update_status("Error while searching for posts on Pushshift") 337 return None 338 339 # no more posts 340 posts = response.json()["data"] 341 342 if len([p for p in posts if p["id"] not in seen_posts]) == 0: 343 # this could happen in some edge cases if we're searching by 344 # chunk (if no IDs in the chunk match the other parameters) 345 # so only break if that's not the case 346 break 347 348 # store post data 349 for post in posts: 350 if post.get("promoted", False): 351 continue 352 353 if post["id"] not in seen_posts: 354 seen_posts.add(post["id"]) 355 yield self.post_to_4cat(post) 356 357 # Increase the time. 358 # this is the only way to go to the next page right now... 359 post_call[1][self.since] = post["created_utc"] 360 361 total_posts += 1 362 363 # update our progress 364 # update status 365 if expected_results: 366 self.dataset.update_progress((total_threads + total_posts) / expected_results) 367 self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_posts + total_threads), "{:,}".format(expected_results) if expected_results else "unknown")) 368 369 # and done! 370 if total_posts == 0 and total_threads == 0: 371 self.dataset.update_status("No posts found") 372 373 @staticmethod 374 def post_to_4cat(post): 375 """ 376 Convert a pushshift post object to 4CAT post data 377 378 :param dict post: Post data, as from the pushshift API 379 :return dict: Re-formatted data 380 """ 381 382 return { 383 "thread_id": post["link_id"].split("_").pop(), 384 "id": post["id"], 385 "timestamp": post["created_utc"], 386 "body": post["body"].strip().replace("\r", ""), 387 "subject": "", 388 "author": post["author"], 389 "author_flair": post.get("author_flair_text", ""), 390 "post_flair": "", 391 "domain": "", 392 "url": "", 393 "image_file": "", 394 "image_md5": "", 395 "subreddit": post["subreddit"], 396 "parent": post["parent_id"], 397 # this is missing sometimes, but upon manual inspection 398 # the post always has 1 point 399 "score": post.get("score", 1) 400 } 401 402 @staticmethod 403 def thread_to_4cat(thread): 404 """ 405 Convert a pushshift thread object to 4CAT post data 406 407 :param dict post: Post data, as from the pushshift API 408 :return dict: Re-formatted data 409 """ 410 image_match = re.compile(r"\.(jpg|jpeg|png|gif|webm|mp4)$", flags=re.IGNORECASE) 411 412 return { 413 "thread_id": thread["id"], 414 "id": thread["id"], 415 "timestamp": thread["created_utc"], 416 "body": thread.get("selftext", "").strip().replace("\r", ""), 417 "subject": thread["title"], 418 "author": thread["author"], 419 "author_flair": thread.get("author_flair_text", ""), 420 "post_flair": thread.get("link_flair_text", ""), 421 "image_file": thread.get("url", "") if thread.get("url") and image_match.search(thread.get("url", "")) else "", 422 "domain": thread.get("domain", ""), 423 "url": thread.get("url", ""), 424 "image_md5": "", 425 "subreddit": thread["subreddit"], 426 "parent": "", 427 "score": thread.get("score", 0) 428 } 429 430 def call_pushshift_api(self, *args, **kwargs): 431 """ 432 Call pushshift API and don't crash (immediately) if it fails 433 434 Will also try to respect the rate limit, waiting before making a 435 request until it will not violate the rate limit. 436 437 :param args: 438 :param kwargs: 439 :return: Response, or `None` 440 """ 441 442 retries = 0 443 while retries < self.max_retries: 444 try: 445 self.wait_until_window() 446 response = requests.get(*args, **kwargs) 447 self.request_timestamps.append(time.time()) 448 if response.status_code == 200: 449 break 450 else: 451 raise RuntimeError("HTTP %s" % response.status_code) 452 except (RuntimeError, requests.RequestException) as e: 453 self.log.info("Error %s while querying Pushshift API - waiting 15 seconds and retrying..." % e) 454 time.sleep(15) 455 retries += 1 456 457 if retries >= self.max_retries: 458 self.log.error("Error during Pushshift fetch of query %s" % self.dataset.key) 459 self.dataset.update_status("Error while searching for posts on Pushshift - API did not respond as expected") 460 return None 461 462 return response 463 464 @staticmethod 465 def get_expected_results(endpoint, parameters, api_type): 466 """ 467 Get expected result size for a query 468 469 We're not using call_pushshift_api here because that cannot be called 470 statically, which is necessary because this is called from within 471 validate_query. 472 473 :param str endpoint: URL of the API endpoint 474 :param dict parameters: Call parameters 475 :param api_type: Type of API (regular or beta) 476 477 :return: Number of expected results, or `None` 478 """ 479 parameters.update({"metadata": "true", "size": 0,"track_total_hits": True}) 480 481 retries = 0 482 response = None 483 484 while retries < 3: 485 try: 486 response = requests.get(endpoint, parameters, timeout=10) 487 break 488 except requests.RequestException: 489 retries += 1 490 time.sleep(retries * 5) 491 continue 492 493 if not response or response.status_code != 200: 494 return None 495 else: 496 try: 497 return response.json()["metadata"]["es"]["hits"]["total"]["value"] 498 except (json.JSONDecodeError, KeyError): 499 return None 500 501 def wait_until_window(self): 502 """ 503 Wait until a request can be made outside of the rate limit 504 505 If we have made more requests in the window (one minute) than allowed 506 by the rate limit, wait until that is no longer the case. 507 """ 508 window_start = time.time() - 60 509 has_warned = False 510 511 while len([timestamp for timestamp in self.request_timestamps if timestamp >= window_start]) >= self.rate_limit: 512 if not has_warned: 513 self.log.info("Hit Pushshift rate limit - throttling...") 514 has_warned = True 515 516 time.sleep(0.25) # should be enough 517 518 # clean up timestamps outside of window 519 self.request_timestamps = [timestamp for timestamp in self.request_timestamps if timestamp >= window_start] 520 521 def validate_query(query, request, user): 522 """ 523 Validate input for a dataset query on the 4chan data source. 524 525 Will raise a QueryParametersException if invalid parameters are 526 encountered. Mutually exclusive parameters may also be sanitised by 527 ignoring either of the mutually exclusive options. 528 529 :param dict query: Query parameters, from client-side. 530 :param request: Flask request 531 :param User user: User object of user who has submitted the query 532 :return dict: Safe query parameters 533 """ 534 # we need a board! 535 r_prefix = re.compile(r"^/?r/") 536 boards = [r_prefix.sub("", board).strip() for board in query.get("board", "").split(",") if board.strip()] 537 538 if not boards: 539 raise QueryParametersException("Please provide a board or a comma-separated list of boards to query.") 540 541 # ignore leading r/ for boards 542 query["board"] = ",".join(boards) 543 544 keywordless_query = config.get("reddit-search.can_query_without_keyword", False, user=user) 545 546 # this is the bare minimum, else we can't narrow down the full data set 547 if not user.is_admin and not keywordless_query and not query.get( 548 "body_match", "").strip() and not query.get("subject_match", "").strip() and not query.get( 549 "subject_url", ""): 550 raise QueryParametersException("Please provide a body query or subject query.") 551 552 # body query and full threads are incompatible, returning too many posts 553 # in most cases 554 if query.get("body_match", None): 555 if "full_threads" in query: 556 del query["full_threads"] 557 558 # Make sure no body or subject searches starting with just a minus sign are possible, e.g. "-Trump" 559 if query.get("body_match", None) or query.get("subject_match", None): 560 queries_to_check = [] 561 562 if query.get("body_match", None): 563 queries_to_check += [body_query.strip() for body_query in query["body_match"].split(" ")] 564 565 if query.get("subject_match", None): 566 queries_to_check += [subject_query.strip() for subject_query in query["subject_match"].split(" ")] 567 568 startswith_minus = [query_check.startswith("-") for query_check in queries_to_check] 569 if all(startswith_minus): 570 raise QueryParametersException("Please provide body queries that do not start with a minus sign.") 571 572 # URL queries are not possible (yet) for the beta API 573 if query.get("pushshift_track") == "beta" and query.get("subject_url", None): 574 raise QueryParametersException("URL querying is not possible (yet) for the beta endpoint.") 575 576 # both dates need to be set, or none 577 if query.get("min_date", None) and not query.get("max_date", None): 578 raise QueryParametersException("When setting a date range, please provide both an upper and lower limit.") 579 580 # the dates need to make sense as a range to search within 581 query["min_date"], query["max_date"] = query.get("daterange") 582 583 if "*" in query.get("body_match", "") and not keywordless_query: 584 raise QueryParametersException( 585 "Wildcard queries are not allowed as they typically return too many results to properly process.") 586 587 if "*" in query.get("board", "") and not keywordless_query: 588 raise QueryParametersException( 589 "Wildcards are not allowed for boards as this typically returns too many results to properly process.") 590 591 del query["daterange"] 592 593 params = SearchReddit.build_query(query) 594 expected_posts = SearchReddit.get_expected_results(*params[0], query.get("pushshift_track", "regular")) 595 if not expected_posts: 596 expected_posts = 0 597 598 # determine how many results to expect 599 # this adds a small delay since we need to talk to the API before 600 # returning to the user, but the benefit is that we reduce the amount 601 # of too-large queries (because users are warned beforehand) and can 602 # give a progress indication for queries that do go through 603 if query.get("search_scope") != "op-only": 604 expected_replies = SearchReddit.get_expected_results(*params[1], query.get("pushshift_track", "regular")) 605 expected_posts += expected_replies if expected_replies else 0 606 607 if expected_posts: 608 pps = 672 if query.get("pushshift_track") == "beta" else 44 609 expected_seconds = int(expected_posts / pps) # seems to be about this 610 expected_time = timify_long(expected_seconds) 611 query["expected-results"] = expected_posts 612 613 if expected_seconds > 1800 and not query.get("frontend-confirm"): 614 raise QueryNeedsExplicitConfirmationException( 615 "This query will return approximately %s items. This will take a long time (approximately %s)." 616 " Are you sure you want to run this query?" % ("{:,}".format(expected_posts), expected_time)) 617 618 # if we made it this far, the query can be executed 619 return query
14class SearchReddit(Search): 15 """ 16 Search Reddit 17 18 Defines methods to fetch Reddit data on demand 19 """ 20 type = "reddit-search" # job ID 21 category = "Search" # category 22 title = "Reddit Search" # title displayed in UI 23 description = "Query the Pushshift API to retrieve Reddit posts and threads matching the search parameters" # description displayed in UI 24 extension = "csv" # extension of result file, used internally and in UI 25 is_local = False # Whether this datasource is locally scraped 26 is_static = False # Whether this datasource is still updated 27 28 references = [ 29 "[API documentation](https://github.com/pushshift/api)", 30 "[r/pushshift](https://www.reddit.com/r/pushshift/)", 31 "[Baumgartner, J., Zannettou, S., Keegan, B., Squire, M., & Blackburn, J. (2020). The Pushshift Reddit Dataset. *Proceedings of the International AAAI Conference on Web and Social Media*, 14(1), 830-839.](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)" 32 ] 33 34 # not available as a processor for existing datasets 35 accepts = [None] 36 37 max_workers = 1 38 max_retries = 5 39 40 rate_limit = 0 41 request_timestamps = [] 42 43 config = { 44 "reddit-search.can_query_without_keyword": { 45 "type": UserInput.OPTION_TOGGLE, 46 "help": "Can query without keyword", 47 "default": False, 48 "tooltip": "Allows users to query Pushshift without specifying a keyword. This can lead to HUGE datasets!" 49 } 50 } 51 52 # These change depending on the API type used, 53 # but should be globally accessible. 54 submission_endpoint = None 55 comment_endpoint = None 56 api_type = None 57 since = "since" 58 after = "after" 59 60 @classmethod 61 def get_options(cls, parent_dataset=None, user=None): 62 """ 63 Determine if user needs to see the 'careful with wildcard queries!' 64 warning 65 66 :param parent_dataset: 67 :param user: 68 :return dict: Options definition 69 """ 70 options = { 71 "wildcard-warning": { 72 "type": UserInput.OPTION_INFO, 73 "help": "The requirement for searching by keyword has been lifted for your account; you can search by " 74 "date range only. This can potentially return hundreds of millions of posts, so **please be " 75 "careful** when using this privilege." 76 }, 77 "pushshift_track": { 78 "type": UserInput.OPTION_CHOICE, 79 "help": "API version", 80 "options": { 81 "beta": "Beta (new version)", 82 "regular": "Regular" 83 }, 84 "default": "beta", 85 "tooltip": "The beta version retrieves more comments per request but may be incomplete." 86 }, 87 "board": { 88 "type": UserInput.OPTION_TEXT, 89 "help": "Subreddit(s)", 90 "tooltip": "Comma-separated" 91 }, 92 "divider": { 93 "type": UserInput.OPTION_DIVIDER 94 }, 95 "intro": { 96 "type": UserInput.OPTION_INFO, 97 "help": "Reddit data is retrieved from [Pushshift](https://pushshift.io) (see also [this " 98 "paper](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)). Note that Pushshift's dataset " 99 "*may not be complete* depending on the parameters used," 100 " data from the last few days might not be there yet," 101 " and post scores can be out of date. " 102 "See [this paper](https://arxiv.org/pdf/1803.05046.pdf) for an overview of the gaps in data. " 103 "Double-check manually or via the official Reddit API if completeness is a concern. Check the " 104 "documentation ([beta](https://beta.pushshift.io/redoc), [regular](https://github.com/pushshift/api)) for " 105 "more information (e.g. query syntax)." 106 }, 107 "body_match": { 108 "type": UserInput.OPTION_TEXT, 109 "help": "Message search", 110 "tooltip": "Matches anything in the body of a comment or post." 111 }, 112 "subject_match": { 113 "type": UserInput.OPTION_TEXT, 114 "help": "Subject search", 115 "tooltip": "Matches anything in the title of a post." 116 }, 117 "subject_url": { 118 "type": UserInput.OPTION_TEXT, 119 "help": "URL/domain in post", 120 "tooltip": "Regular API only; Filter for posts that link to certain sites or domains (e.g. only posts linking to reddit.com)", 121 }, 122 "divider-2": { 123 "type": UserInput.OPTION_DIVIDER 124 }, 125 "daterange": { 126 "type": UserInput.OPTION_DATERANGE, 127 "help": "Date range" 128 }, 129 "search_scope": { 130 "type": UserInput.OPTION_CHOICE, 131 "help": "Search scope", 132 "options": { 133 "op-only": "Opening posts only (no replies/comments)", 134 "posts-only": "All matching posts", 135 }, 136 "default": "posts-only" 137 } 138 } 139 140 # this warning isn't needed if the user can't search for everything 141 # anyway 142 if not config.get("reddit-search.can_query_without_keyword"): 143 del options["wildcard-warning"] 144 145 return options 146 147 @staticmethod 148 def build_query(query): 149 """ 150 Determine API call parameters 151 152 Decides what endpoints to call and with which parameters based on the 153 parameters provided by the user. There is some complexity here because 154 we support two versions of the API, each with their own protocol. 155 156 :param dict query: Query parameters, as part of the DataSet object 157 :return tuple: Tuple of tuples. First tuple is (submissions endpoint, 158 submission parameters), the second the same but for replies. 159 """ 160 api_type = query.get("pushshift_track", "beta") 161 162 # first, build the request parameters 163 if api_type == "regular": 164 submission_endpoint = "https://api.pushshift.io/reddit/submission/search" 165 post_endpoint = "https://api.pushshift.io/reddit/comment/search" 166 167 post_parameters = { 168 "order": "asc", 169 "sort_type": "created_utc", 170 "size": 100, # max value 171 "metadata": True 172 } 173 since = "after" 174 until = "before" 175 176 # beta fields are a bit different. 177 elif api_type == "beta": 178 submission_endpoint = "https://beta.pushshift.io/reddit/search/submissions" 179 post_endpoint = "https://beta.pushshift.io/reddit/search/comments" 180 181 # For beta requests, we're sorting by IDs so we're not missing data. 182 # This is unavailable for the regular API. 183 post_parameters = { 184 "sort_type": "created_utc", 185 "order": "asc", 186 "limit": 1000 # max value 187 } 188 since = "since" 189 until = "until" 190 191 else: 192 raise NotImplementedError() 193 194 if query["min_date"]: 195 post_parameters[since] = int(query["min_date"]) 196 197 if query["max_date"]: 198 post_parameters[until] = int(query["max_date"]) 199 200 if query["board"] and query["board"] != "*": 201 post_parameters["subreddit"] = query["board"] 202 203 if query["body_match"]: 204 post_parameters["q"] = query["body_match"] 205 else: 206 post_parameters["q"] = "" 207 208 # first, search for threads - this is a separate endpoint from comments 209 submission_parameters = post_parameters.copy() 210 submission_parameters["selftext"] = submission_parameters["q"] 211 212 if query["subject_match"]: 213 submission_parameters["title"] = query["subject_match"] 214 215 # Check whether only OPs linking to certain URLs should be retrieved. 216 # Only available for the regular API. 217 if query.get("subject_url", None): 218 urls = [] 219 domains = [] 220 221 if "," in query["subject_url"]: 222 urls_input = query["subject_url"].split(",") 223 elif "|" in query["subject_url"]: 224 urls_input = query["subject_url"].split("|") 225 else: 226 urls_input = [query["subject_url"]] 227 228 # Input strings 229 for url in urls_input: 230 # Some cleaning 231 url = url.strip() 232 url_clean = url.replace("www.", "") 233 234 # Store urls or domains separately; different fields in Pushshift API 235 if "/" in url_clean: 236 urls.append(url) 237 else: 238 domains.append(url_clean) 239 if urls: 240 # Multiple full URLs is supposedly not supported by Pushshift 241 submission_parameters["url"] = "\'" + (",".join(urls)) + "\'" 242 if domains: 243 submission_parameters["domain"] = ",".join(domains) 244 245 return ( 246 (submission_endpoint, submission_parameters), 247 (post_endpoint, post_parameters), 248 ) 249 250 def get_items(self, query): 251 """ 252 Execute a query; get post data for given parameters 253 254 This queries the Pushshift API to find posts and threads mathcing the 255 given parameters. 256 257 :param dict query: Query parameters, as part of the DataSet object 258 :return list: Posts, sorted by thread and post ID, in ascending order 259 """ 260 scope = query.get("search_scope") 261 submission_call, post_call = self.build_query(query) 262 263 # set up query 264 total_posts = 0 265 max_retries = 3 266 267 # rate limits are not returned by the API server anymore, 268 # so we're manually setting it to 120 269 self.rate_limit = 120 270 271 # this is where we store our progress 272 total_threads = 0 273 seen_threads = set() 274 expected_results = query.get("expected-results", 0) 275 276 # loop through results bit by bit 277 while True: 278 if self.interrupted: 279 raise ProcessorInterruptedException("Interrupted while fetching thread data from the Pushshift API") 280 281 retries = 0 282 response = self.call_pushshift_api(*submission_call) 283 284 if response is None: 285 return response 286 287 threads = response.json()["data"] 288 289 if len([t for t in threads if t["id"] not in seen_threads]) == 0: 290 # we're done here, no more results will be coming 291 break 292 293 # store comment IDs for a thread, and also add the OP to the 294 # return list. This means all OPs will come before all comments 295 # but we can sort later if that turns out to be a problem 296 for thread in threads: 297 if thread.get("promoted", False): 298 continue 299 300 if thread["id"] not in seen_threads: 301 seen_threads.add(thread["id"]) 302 yield self.thread_to_4cat(thread) 303 304 # Increase the time. 305 # this is the only way to go to the next page right now... 306 submission_call[1]["after"] = thread["created_utc"] 307 308 total_threads += 1 309 310 # update status 311 if expected_results: 312 self.dataset.update_progress(total_threads / expected_results) 313 self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_threads), "{:,}".format(expected_results) if expected_results else "unknown")) 314 315 # okay, search the pushshift API for posts 316 # we have two modes here: by keyword, or by ID. ID is set above where 317 # ID chunks are defined: these chunks are used here if available 318 seen_posts = set() 319 320 # only query for individual posts if no subject keyword is given 321 # since individual posts don't have subjects so if there is a subject 322 # query no results should be returned 323 do_body_query = not bool(query.get("subject_match", "")) and not bool( 324 query.get("subject_url", "")) and scope != "op-only" 325 326 while do_body_query: 327 if self.interrupted: 328 raise ProcessorInterruptedException("Interrupted while fetching post data from the Pushshift API") 329 330 response = self.call_pushshift_api(*post_call) 331 332 if response is None: 333 return response 334 335 if retries >= max_retries: 336 self.log.error("Error during pushshift fetch of query %s" % self.dataset.key) 337 self.dataset.update_status("Error while searching for posts on Pushshift") 338 return None 339 340 # no more posts 341 posts = response.json()["data"] 342 343 if len([p for p in posts if p["id"] not in seen_posts]) == 0: 344 # this could happen in some edge cases if we're searching by 345 # chunk (if no IDs in the chunk match the other parameters) 346 # so only break if that's not the case 347 break 348 349 # store post data 350 for post in posts: 351 if post.get("promoted", False): 352 continue 353 354 if post["id"] not in seen_posts: 355 seen_posts.add(post["id"]) 356 yield self.post_to_4cat(post) 357 358 # Increase the time. 359 # this is the only way to go to the next page right now... 360 post_call[1][self.since] = post["created_utc"] 361 362 total_posts += 1 363 364 # update our progress 365 # update status 366 if expected_results: 367 self.dataset.update_progress((total_threads + total_posts) / expected_results) 368 self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_posts + total_threads), "{:,}".format(expected_results) if expected_results else "unknown")) 369 370 # and done! 371 if total_posts == 0 and total_threads == 0: 372 self.dataset.update_status("No posts found") 373 374 @staticmethod 375 def post_to_4cat(post): 376 """ 377 Convert a pushshift post object to 4CAT post data 378 379 :param dict post: Post data, as from the pushshift API 380 :return dict: Re-formatted data 381 """ 382 383 return { 384 "thread_id": post["link_id"].split("_").pop(), 385 "id": post["id"], 386 "timestamp": post["created_utc"], 387 "body": post["body"].strip().replace("\r", ""), 388 "subject": "", 389 "author": post["author"], 390 "author_flair": post.get("author_flair_text", ""), 391 "post_flair": "", 392 "domain": "", 393 "url": "", 394 "image_file": "", 395 "image_md5": "", 396 "subreddit": post["subreddit"], 397 "parent": post["parent_id"], 398 # this is missing sometimes, but upon manual inspection 399 # the post always has 1 point 400 "score": post.get("score", 1) 401 } 402 403 @staticmethod 404 def thread_to_4cat(thread): 405 """ 406 Convert a pushshift thread object to 4CAT post data 407 408 :param dict post: Post data, as from the pushshift API 409 :return dict: Re-formatted data 410 """ 411 image_match = re.compile(r"\.(jpg|jpeg|png|gif|webm|mp4)$", flags=re.IGNORECASE) 412 413 return { 414 "thread_id": thread["id"], 415 "id": thread["id"], 416 "timestamp": thread["created_utc"], 417 "body": thread.get("selftext", "").strip().replace("\r", ""), 418 "subject": thread["title"], 419 "author": thread["author"], 420 "author_flair": thread.get("author_flair_text", ""), 421 "post_flair": thread.get("link_flair_text", ""), 422 "image_file": thread.get("url", "") if thread.get("url") and image_match.search(thread.get("url", "")) else "", 423 "domain": thread.get("domain", ""), 424 "url": thread.get("url", ""), 425 "image_md5": "", 426 "subreddit": thread["subreddit"], 427 "parent": "", 428 "score": thread.get("score", 0) 429 } 430 431 def call_pushshift_api(self, *args, **kwargs): 432 """ 433 Call pushshift API and don't crash (immediately) if it fails 434 435 Will also try to respect the rate limit, waiting before making a 436 request until it will not violate the rate limit. 437 438 :param args: 439 :param kwargs: 440 :return: Response, or `None` 441 """ 442 443 retries = 0 444 while retries < self.max_retries: 445 try: 446 self.wait_until_window() 447 response = requests.get(*args, **kwargs) 448 self.request_timestamps.append(time.time()) 449 if response.status_code == 200: 450 break 451 else: 452 raise RuntimeError("HTTP %s" % response.status_code) 453 except (RuntimeError, requests.RequestException) as e: 454 self.log.info("Error %s while querying Pushshift API - waiting 15 seconds and retrying..." % e) 455 time.sleep(15) 456 retries += 1 457 458 if retries >= self.max_retries: 459 self.log.error("Error during Pushshift fetch of query %s" % self.dataset.key) 460 self.dataset.update_status("Error while searching for posts on Pushshift - API did not respond as expected") 461 return None 462 463 return response 464 465 @staticmethod 466 def get_expected_results(endpoint, parameters, api_type): 467 """ 468 Get expected result size for a query 469 470 We're not using call_pushshift_api here because that cannot be called 471 statically, which is necessary because this is called from within 472 validate_query. 473 474 :param str endpoint: URL of the API endpoint 475 :param dict parameters: Call parameters 476 :param api_type: Type of API (regular or beta) 477 478 :return: Number of expected results, or `None` 479 """ 480 parameters.update({"metadata": "true", "size": 0,"track_total_hits": True}) 481 482 retries = 0 483 response = None 484 485 while retries < 3: 486 try: 487 response = requests.get(endpoint, parameters, timeout=10) 488 break 489 except requests.RequestException: 490 retries += 1 491 time.sleep(retries * 5) 492 continue 493 494 if not response or response.status_code != 200: 495 return None 496 else: 497 try: 498 return response.json()["metadata"]["es"]["hits"]["total"]["value"] 499 except (json.JSONDecodeError, KeyError): 500 return None 501 502 def wait_until_window(self): 503 """ 504 Wait until a request can be made outside of the rate limit 505 506 If we have made more requests in the window (one minute) than allowed 507 by the rate limit, wait until that is no longer the case. 508 """ 509 window_start = time.time() - 60 510 has_warned = False 511 512 while len([timestamp for timestamp in self.request_timestamps if timestamp >= window_start]) >= self.rate_limit: 513 if not has_warned: 514 self.log.info("Hit Pushshift rate limit - throttling...") 515 has_warned = True 516 517 time.sleep(0.25) # should be enough 518 519 # clean up timestamps outside of window 520 self.request_timestamps = [timestamp for timestamp in self.request_timestamps if timestamp >= window_start] 521 522 def validate_query(query, request, user): 523 """ 524 Validate input for a dataset query on the 4chan data source. 525 526 Will raise a QueryParametersException if invalid parameters are 527 encountered. Mutually exclusive parameters may also be sanitised by 528 ignoring either of the mutually exclusive options. 529 530 :param dict query: Query parameters, from client-side. 531 :param request: Flask request 532 :param User user: User object of user who has submitted the query 533 :return dict: Safe query parameters 534 """ 535 # we need a board! 536 r_prefix = re.compile(r"^/?r/") 537 boards = [r_prefix.sub("", board).strip() for board in query.get("board", "").split(",") if board.strip()] 538 539 if not boards: 540 raise QueryParametersException("Please provide a board or a comma-separated list of boards to query.") 541 542 # ignore leading r/ for boards 543 query["board"] = ",".join(boards) 544 545 keywordless_query = config.get("reddit-search.can_query_without_keyword", False, user=user) 546 547 # this is the bare minimum, else we can't narrow down the full data set 548 if not user.is_admin and not keywordless_query and not query.get( 549 "body_match", "").strip() and not query.get("subject_match", "").strip() and not query.get( 550 "subject_url", ""): 551 raise QueryParametersException("Please provide a body query or subject query.") 552 553 # body query and full threads are incompatible, returning too many posts 554 # in most cases 555 if query.get("body_match", None): 556 if "full_threads" in query: 557 del query["full_threads"] 558 559 # Make sure no body or subject searches starting with just a minus sign are possible, e.g. "-Trump" 560 if query.get("body_match", None) or query.get("subject_match", None): 561 queries_to_check = [] 562 563 if query.get("body_match", None): 564 queries_to_check += [body_query.strip() for body_query in query["body_match"].split(" ")] 565 566 if query.get("subject_match", None): 567 queries_to_check += [subject_query.strip() for subject_query in query["subject_match"].split(" ")] 568 569 startswith_minus = [query_check.startswith("-") for query_check in queries_to_check] 570 if all(startswith_minus): 571 raise QueryParametersException("Please provide body queries that do not start with a minus sign.") 572 573 # URL queries are not possible (yet) for the beta API 574 if query.get("pushshift_track") == "beta" and query.get("subject_url", None): 575 raise QueryParametersException("URL querying is not possible (yet) for the beta endpoint.") 576 577 # both dates need to be set, or none 578 if query.get("min_date", None) and not query.get("max_date", None): 579 raise QueryParametersException("When setting a date range, please provide both an upper and lower limit.") 580 581 # the dates need to make sense as a range to search within 582 query["min_date"], query["max_date"] = query.get("daterange") 583 584 if "*" in query.get("body_match", "") and not keywordless_query: 585 raise QueryParametersException( 586 "Wildcard queries are not allowed as they typically return too many results to properly process.") 587 588 if "*" in query.get("board", "") and not keywordless_query: 589 raise QueryParametersException( 590 "Wildcards are not allowed for boards as this typically returns too many results to properly process.") 591 592 del query["daterange"] 593 594 params = SearchReddit.build_query(query) 595 expected_posts = SearchReddit.get_expected_results(*params[0], query.get("pushshift_track", "regular")) 596 if not expected_posts: 597 expected_posts = 0 598 599 # determine how many results to expect 600 # this adds a small delay since we need to talk to the API before 601 # returning to the user, but the benefit is that we reduce the amount 602 # of too-large queries (because users are warned beforehand) and can 603 # give a progress indication for queries that do go through 604 if query.get("search_scope") != "op-only": 605 expected_replies = SearchReddit.get_expected_results(*params[1], query.get("pushshift_track", "regular")) 606 expected_posts += expected_replies if expected_replies else 0 607 608 if expected_posts: 609 pps = 672 if query.get("pushshift_track") == "beta" else 44 610 expected_seconds = int(expected_posts / pps) # seems to be about this 611 expected_time = timify_long(expected_seconds) 612 query["expected-results"] = expected_posts 613 614 if expected_seconds > 1800 and not query.get("frontend-confirm"): 615 raise QueryNeedsExplicitConfirmationException( 616 "This query will return approximately %s items. This will take a long time (approximately %s)." 617 " Are you sure you want to run this query?" % ("{:,}".format(expected_posts), expected_time)) 618 619 # if we made it this far, the query can be executed 620 return query
Search Reddit
Defines methods to fetch Reddit data on demand
60 @classmethod 61 def get_options(cls, parent_dataset=None, user=None): 62 """ 63 Determine if user needs to see the 'careful with wildcard queries!' 64 warning 65 66 :param parent_dataset: 67 :param user: 68 :return dict: Options definition 69 """ 70 options = { 71 "wildcard-warning": { 72 "type": UserInput.OPTION_INFO, 73 "help": "The requirement for searching by keyword has been lifted for your account; you can search by " 74 "date range only. This can potentially return hundreds of millions of posts, so **please be " 75 "careful** when using this privilege." 76 }, 77 "pushshift_track": { 78 "type": UserInput.OPTION_CHOICE, 79 "help": "API version", 80 "options": { 81 "beta": "Beta (new version)", 82 "regular": "Regular" 83 }, 84 "default": "beta", 85 "tooltip": "The beta version retrieves more comments per request but may be incomplete." 86 }, 87 "board": { 88 "type": UserInput.OPTION_TEXT, 89 "help": "Subreddit(s)", 90 "tooltip": "Comma-separated" 91 }, 92 "divider": { 93 "type": UserInput.OPTION_DIVIDER 94 }, 95 "intro": { 96 "type": UserInput.OPTION_INFO, 97 "help": "Reddit data is retrieved from [Pushshift](https://pushshift.io) (see also [this " 98 "paper](https://ojs.aaai.org/index.php/ICWSM/article/view/7347)). Note that Pushshift's dataset " 99 "*may not be complete* depending on the parameters used," 100 " data from the last few days might not be there yet," 101 " and post scores can be out of date. " 102 "See [this paper](https://arxiv.org/pdf/1803.05046.pdf) for an overview of the gaps in data. " 103 "Double-check manually or via the official Reddit API if completeness is a concern. Check the " 104 "documentation ([beta](https://beta.pushshift.io/redoc), [regular](https://github.com/pushshift/api)) for " 105 "more information (e.g. query syntax)." 106 }, 107 "body_match": { 108 "type": UserInput.OPTION_TEXT, 109 "help": "Message search", 110 "tooltip": "Matches anything in the body of a comment or post." 111 }, 112 "subject_match": { 113 "type": UserInput.OPTION_TEXT, 114 "help": "Subject search", 115 "tooltip": "Matches anything in the title of a post." 116 }, 117 "subject_url": { 118 "type": UserInput.OPTION_TEXT, 119 "help": "URL/domain in post", 120 "tooltip": "Regular API only; Filter for posts that link to certain sites or domains (e.g. only posts linking to reddit.com)", 121 }, 122 "divider-2": { 123 "type": UserInput.OPTION_DIVIDER 124 }, 125 "daterange": { 126 "type": UserInput.OPTION_DATERANGE, 127 "help": "Date range" 128 }, 129 "search_scope": { 130 "type": UserInput.OPTION_CHOICE, 131 "help": "Search scope", 132 "options": { 133 "op-only": "Opening posts only (no replies/comments)", 134 "posts-only": "All matching posts", 135 }, 136 "default": "posts-only" 137 } 138 } 139 140 # this warning isn't needed if the user can't search for everything 141 # anyway 142 if not config.get("reddit-search.can_query_without_keyword"): 143 del options["wildcard-warning"] 144 145 return options
Determine if user needs to see the 'careful with wildcard queries!' warning
Parameters
- parent_dataset:
- user:
Returns
Options definition
147 @staticmethod 148 def build_query(query): 149 """ 150 Determine API call parameters 151 152 Decides what endpoints to call and with which parameters based on the 153 parameters provided by the user. There is some complexity here because 154 we support two versions of the API, each with their own protocol. 155 156 :param dict query: Query parameters, as part of the DataSet object 157 :return tuple: Tuple of tuples. First tuple is (submissions endpoint, 158 submission parameters), the second the same but for replies. 159 """ 160 api_type = query.get("pushshift_track", "beta") 161 162 # first, build the request parameters 163 if api_type == "regular": 164 submission_endpoint = "https://api.pushshift.io/reddit/submission/search" 165 post_endpoint = "https://api.pushshift.io/reddit/comment/search" 166 167 post_parameters = { 168 "order": "asc", 169 "sort_type": "created_utc", 170 "size": 100, # max value 171 "metadata": True 172 } 173 since = "after" 174 until = "before" 175 176 # beta fields are a bit different. 177 elif api_type == "beta": 178 submission_endpoint = "https://beta.pushshift.io/reddit/search/submissions" 179 post_endpoint = "https://beta.pushshift.io/reddit/search/comments" 180 181 # For beta requests, we're sorting by IDs so we're not missing data. 182 # This is unavailable for the regular API. 183 post_parameters = { 184 "sort_type": "created_utc", 185 "order": "asc", 186 "limit": 1000 # max value 187 } 188 since = "since" 189 until = "until" 190 191 else: 192 raise NotImplementedError() 193 194 if query["min_date"]: 195 post_parameters[since] = int(query["min_date"]) 196 197 if query["max_date"]: 198 post_parameters[until] = int(query["max_date"]) 199 200 if query["board"] and query["board"] != "*": 201 post_parameters["subreddit"] = query["board"] 202 203 if query["body_match"]: 204 post_parameters["q"] = query["body_match"] 205 else: 206 post_parameters["q"] = "" 207 208 # first, search for threads - this is a separate endpoint from comments 209 submission_parameters = post_parameters.copy() 210 submission_parameters["selftext"] = submission_parameters["q"] 211 212 if query["subject_match"]: 213 submission_parameters["title"] = query["subject_match"] 214 215 # Check whether only OPs linking to certain URLs should be retrieved. 216 # Only available for the regular API. 217 if query.get("subject_url", None): 218 urls = [] 219 domains = [] 220 221 if "," in query["subject_url"]: 222 urls_input = query["subject_url"].split(",") 223 elif "|" in query["subject_url"]: 224 urls_input = query["subject_url"].split("|") 225 else: 226 urls_input = [query["subject_url"]] 227 228 # Input strings 229 for url in urls_input: 230 # Some cleaning 231 url = url.strip() 232 url_clean = url.replace("www.", "") 233 234 # Store urls or domains separately; different fields in Pushshift API 235 if "/" in url_clean: 236 urls.append(url) 237 else: 238 domains.append(url_clean) 239 if urls: 240 # Multiple full URLs is supposedly not supported by Pushshift 241 submission_parameters["url"] = "\'" + (",".join(urls)) + "\'" 242 if domains: 243 submission_parameters["domain"] = ",".join(domains) 244 245 return ( 246 (submission_endpoint, submission_parameters), 247 (post_endpoint, post_parameters), 248 )
Determine API call parameters
Decides what endpoints to call and with which parameters based on the parameters provided by the user. There is some complexity here because we support two versions of the API, each with their own protocol.
Parameters
- dict query: Query parameters, as part of the DataSet object
Returns
Tuple of tuples. First tuple is (submissions endpoint, submission parameters), the second the same but for replies.
250 def get_items(self, query): 251 """ 252 Execute a query; get post data for given parameters 253 254 This queries the Pushshift API to find posts and threads mathcing the 255 given parameters. 256 257 :param dict query: Query parameters, as part of the DataSet object 258 :return list: Posts, sorted by thread and post ID, in ascending order 259 """ 260 scope = query.get("search_scope") 261 submission_call, post_call = self.build_query(query) 262 263 # set up query 264 total_posts = 0 265 max_retries = 3 266 267 # rate limits are not returned by the API server anymore, 268 # so we're manually setting it to 120 269 self.rate_limit = 120 270 271 # this is where we store our progress 272 total_threads = 0 273 seen_threads = set() 274 expected_results = query.get("expected-results", 0) 275 276 # loop through results bit by bit 277 while True: 278 if self.interrupted: 279 raise ProcessorInterruptedException("Interrupted while fetching thread data from the Pushshift API") 280 281 retries = 0 282 response = self.call_pushshift_api(*submission_call) 283 284 if response is None: 285 return response 286 287 threads = response.json()["data"] 288 289 if len([t for t in threads if t["id"] not in seen_threads]) == 0: 290 # we're done here, no more results will be coming 291 break 292 293 # store comment IDs for a thread, and also add the OP to the 294 # return list. This means all OPs will come before all comments 295 # but we can sort later if that turns out to be a problem 296 for thread in threads: 297 if thread.get("promoted", False): 298 continue 299 300 if thread["id"] not in seen_threads: 301 seen_threads.add(thread["id"]) 302 yield self.thread_to_4cat(thread) 303 304 # Increase the time. 305 # this is the only way to go to the next page right now... 306 submission_call[1]["after"] = thread["created_utc"] 307 308 total_threads += 1 309 310 # update status 311 if expected_results: 312 self.dataset.update_progress(total_threads / expected_results) 313 self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_threads), "{:,}".format(expected_results) if expected_results else "unknown")) 314 315 # okay, search the pushshift API for posts 316 # we have two modes here: by keyword, or by ID. ID is set above where 317 # ID chunks are defined: these chunks are used here if available 318 seen_posts = set() 319 320 # only query for individual posts if no subject keyword is given 321 # since individual posts don't have subjects so if there is a subject 322 # query no results should be returned 323 do_body_query = not bool(query.get("subject_match", "")) and not bool( 324 query.get("subject_url", "")) and scope != "op-only" 325 326 while do_body_query: 327 if self.interrupted: 328 raise ProcessorInterruptedException("Interrupted while fetching post data from the Pushshift API") 329 330 response = self.call_pushshift_api(*post_call) 331 332 if response is None: 333 return response 334 335 if retries >= max_retries: 336 self.log.error("Error during pushshift fetch of query %s" % self.dataset.key) 337 self.dataset.update_status("Error while searching for posts on Pushshift") 338 return None 339 340 # no more posts 341 posts = response.json()["data"] 342 343 if len([p for p in posts if p["id"] not in seen_posts]) == 0: 344 # this could happen in some edge cases if we're searching by 345 # chunk (if no IDs in the chunk match the other parameters) 346 # so only break if that's not the case 347 break 348 349 # store post data 350 for post in posts: 351 if post.get("promoted", False): 352 continue 353 354 if post["id"] not in seen_posts: 355 seen_posts.add(post["id"]) 356 yield self.post_to_4cat(post) 357 358 # Increase the time. 359 # this is the only way to go to the next page right now... 360 post_call[1][self.since] = post["created_utc"] 361 362 total_posts += 1 363 364 # update our progress 365 # update status 366 if expected_results: 367 self.dataset.update_progress((total_threads + total_posts) / expected_results) 368 self.dataset.update_status("Received %s of ~%s posts and threads from Reddit via Pushshift's API" % ("{:,}".format(total_posts + total_threads), "{:,}".format(expected_results) if expected_results else "unknown")) 369 370 # and done! 371 if total_posts == 0 and total_threads == 0: 372 self.dataset.update_status("No posts found")
Execute a query; get post data for given parameters
This queries the Pushshift API to find posts and threads mathcing the given parameters.
Parameters
- dict query: Query parameters, as part of the DataSet object
Returns
Posts, sorted by thread and post ID, in ascending order
374 @staticmethod 375 def post_to_4cat(post): 376 """ 377 Convert a pushshift post object to 4CAT post data 378 379 :param dict post: Post data, as from the pushshift API 380 :return dict: Re-formatted data 381 """ 382 383 return { 384 "thread_id": post["link_id"].split("_").pop(), 385 "id": post["id"], 386 "timestamp": post["created_utc"], 387 "body": post["body"].strip().replace("\r", ""), 388 "subject": "", 389 "author": post["author"], 390 "author_flair": post.get("author_flair_text", ""), 391 "post_flair": "", 392 "domain": "", 393 "url": "", 394 "image_file": "", 395 "image_md5": "", 396 "subreddit": post["subreddit"], 397 "parent": post["parent_id"], 398 # this is missing sometimes, but upon manual inspection 399 # the post always has 1 point 400 "score": post.get("score", 1) 401 }
Convert a pushshift post object to 4CAT post data
Parameters
- dict post: Post data, as from the pushshift API
Returns
Re-formatted data
403 @staticmethod 404 def thread_to_4cat(thread): 405 """ 406 Convert a pushshift thread object to 4CAT post data 407 408 :param dict post: Post data, as from the pushshift API 409 :return dict: Re-formatted data 410 """ 411 image_match = re.compile(r"\.(jpg|jpeg|png|gif|webm|mp4)$", flags=re.IGNORECASE) 412 413 return { 414 "thread_id": thread["id"], 415 "id": thread["id"], 416 "timestamp": thread["created_utc"], 417 "body": thread.get("selftext", "").strip().replace("\r", ""), 418 "subject": thread["title"], 419 "author": thread["author"], 420 "author_flair": thread.get("author_flair_text", ""), 421 "post_flair": thread.get("link_flair_text", ""), 422 "image_file": thread.get("url", "") if thread.get("url") and image_match.search(thread.get("url", "")) else "", 423 "domain": thread.get("domain", ""), 424 "url": thread.get("url", ""), 425 "image_md5": "", 426 "subreddit": thread["subreddit"], 427 "parent": "", 428 "score": thread.get("score", 0) 429 }
Convert a pushshift thread object to 4CAT post data
Parameters
- dict post: Post data, as from the pushshift API
Returns
Re-formatted data
431 def call_pushshift_api(self, *args, **kwargs): 432 """ 433 Call pushshift API and don't crash (immediately) if it fails 434 435 Will also try to respect the rate limit, waiting before making a 436 request until it will not violate the rate limit. 437 438 :param args: 439 :param kwargs: 440 :return: Response, or `None` 441 """ 442 443 retries = 0 444 while retries < self.max_retries: 445 try: 446 self.wait_until_window() 447 response = requests.get(*args, **kwargs) 448 self.request_timestamps.append(time.time()) 449 if response.status_code == 200: 450 break 451 else: 452 raise RuntimeError("HTTP %s" % response.status_code) 453 except (RuntimeError, requests.RequestException) as e: 454 self.log.info("Error %s while querying Pushshift API - waiting 15 seconds and retrying..." % e) 455 time.sleep(15) 456 retries += 1 457 458 if retries >= self.max_retries: 459 self.log.error("Error during Pushshift fetch of query %s" % self.dataset.key) 460 self.dataset.update_status("Error while searching for posts on Pushshift - API did not respond as expected") 461 return None 462 463 return response
Call pushshift API and don't crash (immediately) if it fails
Will also try to respect the rate limit, waiting before making a request until it will not violate the rate limit.
Parameters
- args:
- kwargs:
Returns
Response, or
None
465 @staticmethod 466 def get_expected_results(endpoint, parameters, api_type): 467 """ 468 Get expected result size for a query 469 470 We're not using call_pushshift_api here because that cannot be called 471 statically, which is necessary because this is called from within 472 validate_query. 473 474 :param str endpoint: URL of the API endpoint 475 :param dict parameters: Call parameters 476 :param api_type: Type of API (regular or beta) 477 478 :return: Number of expected results, or `None` 479 """ 480 parameters.update({"metadata": "true", "size": 0,"track_total_hits": True}) 481 482 retries = 0 483 response = None 484 485 while retries < 3: 486 try: 487 response = requests.get(endpoint, parameters, timeout=10) 488 break 489 except requests.RequestException: 490 retries += 1 491 time.sleep(retries * 5) 492 continue 493 494 if not response or response.status_code != 200: 495 return None 496 else: 497 try: 498 return response.json()["metadata"]["es"]["hits"]["total"]["value"] 499 except (json.JSONDecodeError, KeyError): 500 return None
Get expected result size for a query
We're not using call_pushshift_api here because that cannot be called statically, which is necessary because this is called from within validate_query.
Parameters
- str endpoint: URL of the API endpoint
- dict parameters: Call parameters
- api_type: Type of API (regular or beta)
Returns
Number of expected results, or
None
502 def wait_until_window(self): 503 """ 504 Wait until a request can be made outside of the rate limit 505 506 If we have made more requests in the window (one minute) than allowed 507 by the rate limit, wait until that is no longer the case. 508 """ 509 window_start = time.time() - 60 510 has_warned = False 511 512 while len([timestamp for timestamp in self.request_timestamps if timestamp >= window_start]) >= self.rate_limit: 513 if not has_warned: 514 self.log.info("Hit Pushshift rate limit - throttling...") 515 has_warned = True 516 517 time.sleep(0.25) # should be enough 518 519 # clean up timestamps outside of window 520 self.request_timestamps = [timestamp for timestamp in self.request_timestamps if timestamp >= window_start]
Wait until a request can be made outside of the rate limit
If we have made more requests in the window (one minute) than allowed by the rate limit, wait until that is no longer the case.
522 def validate_query(query, request, user): 523 """ 524 Validate input for a dataset query on the 4chan data source. 525 526 Will raise a QueryParametersException if invalid parameters are 527 encountered. Mutually exclusive parameters may also be sanitised by 528 ignoring either of the mutually exclusive options. 529 530 :param dict query: Query parameters, from client-side. 531 :param request: Flask request 532 :param User user: User object of user who has submitted the query 533 :return dict: Safe query parameters 534 """ 535 # we need a board! 536 r_prefix = re.compile(r"^/?r/") 537 boards = [r_prefix.sub("", board).strip() for board in query.get("board", "").split(",") if board.strip()] 538 539 if not boards: 540 raise QueryParametersException("Please provide a board or a comma-separated list of boards to query.") 541 542 # ignore leading r/ for boards 543 query["board"] = ",".join(boards) 544 545 keywordless_query = config.get("reddit-search.can_query_without_keyword", False, user=user) 546 547 # this is the bare minimum, else we can't narrow down the full data set 548 if not user.is_admin and not keywordless_query and not query.get( 549 "body_match", "").strip() and not query.get("subject_match", "").strip() and not query.get( 550 "subject_url", ""): 551 raise QueryParametersException("Please provide a body query or subject query.") 552 553 # body query and full threads are incompatible, returning too many posts 554 # in most cases 555 if query.get("body_match", None): 556 if "full_threads" in query: 557 del query["full_threads"] 558 559 # Make sure no body or subject searches starting with just a minus sign are possible, e.g. "-Trump" 560 if query.get("body_match", None) or query.get("subject_match", None): 561 queries_to_check = [] 562 563 if query.get("body_match", None): 564 queries_to_check += [body_query.strip() for body_query in query["body_match"].split(" ")] 565 566 if query.get("subject_match", None): 567 queries_to_check += [subject_query.strip() for subject_query in query["subject_match"].split(" ")] 568 569 startswith_minus = [query_check.startswith("-") for query_check in queries_to_check] 570 if all(startswith_minus): 571 raise QueryParametersException("Please provide body queries that do not start with a minus sign.") 572 573 # URL queries are not possible (yet) for the beta API 574 if query.get("pushshift_track") == "beta" and query.get("subject_url", None): 575 raise QueryParametersException("URL querying is not possible (yet) for the beta endpoint.") 576 577 # both dates need to be set, or none 578 if query.get("min_date", None) and not query.get("max_date", None): 579 raise QueryParametersException("When setting a date range, please provide both an upper and lower limit.") 580 581 # the dates need to make sense as a range to search within 582 query["min_date"], query["max_date"] = query.get("daterange") 583 584 if "*" in query.get("body_match", "") and not keywordless_query: 585 raise QueryParametersException( 586 "Wildcard queries are not allowed as they typically return too many results to properly process.") 587 588 if "*" in query.get("board", "") and not keywordless_query: 589 raise QueryParametersException( 590 "Wildcards are not allowed for boards as this typically returns too many results to properly process.") 591 592 del query["daterange"] 593 594 params = SearchReddit.build_query(query) 595 expected_posts = SearchReddit.get_expected_results(*params[0], query.get("pushshift_track", "regular")) 596 if not expected_posts: 597 expected_posts = 0 598 599 # determine how many results to expect 600 # this adds a small delay since we need to talk to the API before 601 # returning to the user, but the benefit is that we reduce the amount 602 # of too-large queries (because users are warned beforehand) and can 603 # give a progress indication for queries that do go through 604 if query.get("search_scope") != "op-only": 605 expected_replies = SearchReddit.get_expected_results(*params[1], query.get("pushshift_track", "regular")) 606 expected_posts += expected_replies if expected_replies else 0 607 608 if expected_posts: 609 pps = 672 if query.get("pushshift_track") == "beta" else 44 610 expected_seconds = int(expected_posts / pps) # seems to be about this 611 expected_time = timify_long(expected_seconds) 612 query["expected-results"] = expected_posts 613 614 if expected_seconds > 1800 and not query.get("frontend-confirm"): 615 raise QueryNeedsExplicitConfirmationException( 616 "This query will return approximately %s items. This will take a long time (approximately %s)." 617 " Are you sure you want to run this query?" % ("{:,}".format(expected_posts), expected_time)) 618 619 # if we made it this far, the query can be executed 620 return query
Validate input for a dataset query on the 4chan data source.
Will raise a QueryParametersException if invalid parameters are encountered. Mutually exclusive parameters may also be sanitised by ignoring either of the mutually exclusive options.
Parameters
- dict query: Query parameters, from client-side.
- request: Flask request
- User user: User object of user who has submitted the query
Returns
Safe query parameters
Inherited Members
- backend.lib.worker.BasicWorker
- BasicWorker
- INTERRUPT_NONE
- INTERRUPT_RETRY
- INTERRUPT_CANCEL
- queue
- log
- manager
- interrupted
- modules
- init_time
- name
- run
- clean_up
- request_interrupt
- is_4cat_class
- backend.lib.search.Search
- prefix
- return_cols
- import_error_count
- import_warning_count
- process
- search
- import_from_file
- items_to_csv
- items_to_ndjson
- items_to_archive
- backend.lib.processor.BasicProcessor
- db
- job
- dataset
- owner
- source_dataset
- source_file
- is_running_in_preset
- filepath
- work
- after_process
- remove_files
- abort
- add_field_to_parent
- iterate_archive_contents
- unpack_archive_contents
- extract_archived_file_by_name
- write_csv_items_and_finish
- write_archive_and_finish
- create_standalone
- map_item_method_available
- get_mapped_item
- is_filter
- get_status
- is_top_dataset
- is_from_collector
- get_extension
- is_rankable
- exclude_followup_processors
- is_4cat_processor