backend.lib.scraper
Basic scraper worker - should be inherited by workers to scrape specific types of content
1""" 2Basic scraper worker - should be inherited by workers to scrape specific types of content 3""" 4import collections 5import requests 6import random 7import json 8import abc 9 10from pathlib import Path 11from backend.lib.worker import BasicWorker 12 13class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta): 14 """ 15 Abstract JSON scraper class 16 17 The job queue is continually checked for jobs of this scraper's type. If any are found, 18 the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is 19 then passed to a processor method for further handling. 20 """ 21 22 log_level = "warning" 23 _logger_method = None 24 category = "Collector" 25 26 def __init__(self, job, logger=None, manager=None, modules=None): 27 """ 28 Set up database connection - we need one to store the thread data 29 """ 30 super().__init__(logger=logger, manager=manager, job=job, modules=modules) 31 self.prefix = self.type.split("-")[0] 32 # Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database 33 # TODO: update database.sql names and create migrate script, then remove this 34 self.prefix = { 35 "fourchan": "4chan", 36 "eightkun": "8kun", 37 "eightchan": "8chan", 38 }[self.prefix] 39 40 if not hasattr(logger, self.log_level): 41 self.log_level = "warning" 42 43 self._logger_method = getattr(logger, self.log_level) 44 45 def work(self): 46 """ 47 Scrape something 48 49 This requests data according to the job's parameter - either from a 50 local file or from a URL. The job is then either finished or released 51 depending on whether that was successful, and the data is processed 52 further if available. 53 """ 54 if "file" in self.job.details: 55 # if the file is available locally, use that file 56 id = self.job.details["file"] 57 local_path = Path(self.job.details["file"]) 58 if not local_path.exists(): 59 self.job.finish() 60 self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"]) 61 return 62 63 with local_path.open() as source: 64 datafields = { 65 "status_code": 200, 66 "content": source.read() 67 } 68 69 data = collections.namedtuple("object", datafields.keys())(*datafields.values()) 70 else: 71 # if not, see what URL we need to request data from 72 url = self.get_url() 73 try: 74 # see if any proxies were configured that would work for this URL 75 protocol = url.split(":")[0] 76 if protocol in self.config.get('SCRAPE_PROXIES', []) and self.config.get('SCRAPE_PROXIES')[protocol]: 77 proxies = {protocol: random.choice(self.config.get('SCRAPE_PROXIES')[protocol])} 78 else: 79 proxies = None 80 81 # do the request! 82 data = requests.get(url, timeout=self.config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"}) 83 except (requests.exceptions.RequestException, ConnectionRefusedError) as e: 84 if self.job.data["attempts"] > 2: 85 self.job.finish() 86 self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e)) 87 else: 88 self.job.release(delay=random.randint(45,60)) 89 self.log.info("Could not finish request for %s (%s), releasing job" % (url, e)) 90 return 91 92 if "board" in self.job.details: 93 id = self.job.details["board"] + "/" + self.job.data["remote_id"] 94 else: 95 id = self.job.data["remote_id"] 96 97 if data.status_code == 404 or data.status_code == 403: 98 # this should be handled differently from an actually erroneous response 99 # because it may indicate that the resource has been deleted 100 self.not_found() 101 else: 102 parsed_data = self.parse(data.content) 103 if parsed_data is None: 104 if self.job.data["attempts"] < 2: 105 self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id)) 106 self.job.release(delay=random.choice(range(15, 45))) # try again later 107 else: 108 self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % ( 109 self.type, id, self.job.data["attempts"])) 110 self.job.finish() 111 return 112 113 # finally, pass it on 114 self.process(parsed_data) 115 self.after_process() 116 117 def after_process(self): 118 """ 119 After processing, declare job finished 120 """ 121 self.job.finish() 122 123 def not_found(self): 124 """ 125 Called if the job could not be completed because the request returned 126 a 404 response. This does not necessarily indicate failure. 127 """ 128 self.job.finish() 129 130 def parse(self, data): 131 """ 132 Parse incoming data 133 134 Can be overridden to, e.g., parse JSON data 135 136 :param data: Body of HTTP request 137 :return: Parsed data 138 """ 139 return data 140 141 @abc.abstractmethod 142 def process(self, data): 143 """ 144 Process scraped data 145 146 :param data: Parsed JSON data 147 """ 148 pass 149 150 @abc.abstractmethod 151 def get_url(self): 152 """ 153 Get URL to scrape 154 155 :return string: URL to scrape 156 """ 157 pass 158 159 160class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta): 161 """ 162 Scraper for JSON-based data 163 """ 164 165 def parse(self, data): 166 """ 167 Parse data as JSON 168 169 :param str data: Incoming JSON-encoded data 170 :return: Decoded JSON object 171 """ 172 try: 173 return json.loads(data) 174 except json.JSONDecodeError: 175 return None
14class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta): 15 """ 16 Abstract JSON scraper class 17 18 The job queue is continually checked for jobs of this scraper's type. If any are found, 19 the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is 20 then passed to a processor method for further handling. 21 """ 22 23 log_level = "warning" 24 _logger_method = None 25 category = "Collector" 26 27 def __init__(self, job, logger=None, manager=None, modules=None): 28 """ 29 Set up database connection - we need one to store the thread data 30 """ 31 super().__init__(logger=logger, manager=manager, job=job, modules=modules) 32 self.prefix = self.type.split("-")[0] 33 # Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database 34 # TODO: update database.sql names and create migrate script, then remove this 35 self.prefix = { 36 "fourchan": "4chan", 37 "eightkun": "8kun", 38 "eightchan": "8chan", 39 }[self.prefix] 40 41 if not hasattr(logger, self.log_level): 42 self.log_level = "warning" 43 44 self._logger_method = getattr(logger, self.log_level) 45 46 def work(self): 47 """ 48 Scrape something 49 50 This requests data according to the job's parameter - either from a 51 local file or from a URL. The job is then either finished or released 52 depending on whether that was successful, and the data is processed 53 further if available. 54 """ 55 if "file" in self.job.details: 56 # if the file is available locally, use that file 57 id = self.job.details["file"] 58 local_path = Path(self.job.details["file"]) 59 if not local_path.exists(): 60 self.job.finish() 61 self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"]) 62 return 63 64 with local_path.open() as source: 65 datafields = { 66 "status_code": 200, 67 "content": source.read() 68 } 69 70 data = collections.namedtuple("object", datafields.keys())(*datafields.values()) 71 else: 72 # if not, see what URL we need to request data from 73 url = self.get_url() 74 try: 75 # see if any proxies were configured that would work for this URL 76 protocol = url.split(":")[0] 77 if protocol in self.config.get('SCRAPE_PROXIES', []) and self.config.get('SCRAPE_PROXIES')[protocol]: 78 proxies = {protocol: random.choice(self.config.get('SCRAPE_PROXIES')[protocol])} 79 else: 80 proxies = None 81 82 # do the request! 83 data = requests.get(url, timeout=self.config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"}) 84 except (requests.exceptions.RequestException, ConnectionRefusedError) as e: 85 if self.job.data["attempts"] > 2: 86 self.job.finish() 87 self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e)) 88 else: 89 self.job.release(delay=random.randint(45,60)) 90 self.log.info("Could not finish request for %s (%s), releasing job" % (url, e)) 91 return 92 93 if "board" in self.job.details: 94 id = self.job.details["board"] + "/" + self.job.data["remote_id"] 95 else: 96 id = self.job.data["remote_id"] 97 98 if data.status_code == 404 or data.status_code == 403: 99 # this should be handled differently from an actually erroneous response 100 # because it may indicate that the resource has been deleted 101 self.not_found() 102 else: 103 parsed_data = self.parse(data.content) 104 if parsed_data is None: 105 if self.job.data["attempts"] < 2: 106 self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id)) 107 self.job.release(delay=random.choice(range(15, 45))) # try again later 108 else: 109 self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % ( 110 self.type, id, self.job.data["attempts"])) 111 self.job.finish() 112 return 113 114 # finally, pass it on 115 self.process(parsed_data) 116 self.after_process() 117 118 def after_process(self): 119 """ 120 After processing, declare job finished 121 """ 122 self.job.finish() 123 124 def not_found(self): 125 """ 126 Called if the job could not be completed because the request returned 127 a 404 response. This does not necessarily indicate failure. 128 """ 129 self.job.finish() 130 131 def parse(self, data): 132 """ 133 Parse incoming data 134 135 Can be overridden to, e.g., parse JSON data 136 137 :param data: Body of HTTP request 138 :return: Parsed data 139 """ 140 return data 141 142 @abc.abstractmethod 143 def process(self, data): 144 """ 145 Process scraped data 146 147 :param data: Parsed JSON data 148 """ 149 pass 150 151 @abc.abstractmethod 152 def get_url(self): 153 """ 154 Get URL to scrape 155 156 :return string: URL to scrape 157 """ 158 pass
Abstract JSON scraper class
The job queue is continually checked for jobs of this scraper's type. If any are found, the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is then passed to a processor method for further handling.
27 def __init__(self, job, logger=None, manager=None, modules=None): 28 """ 29 Set up database connection - we need one to store the thread data 30 """ 31 super().__init__(logger=logger, manager=manager, job=job, modules=modules) 32 self.prefix = self.type.split("-")[0] 33 # Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database 34 # TODO: update database.sql names and create migrate script, then remove this 35 self.prefix = { 36 "fourchan": "4chan", 37 "eightkun": "8kun", 38 "eightchan": "8chan", 39 }[self.prefix] 40 41 if not hasattr(logger, self.log_level): 42 self.log_level = "warning" 43 44 self._logger_method = getattr(logger, self.log_level)
Set up database connection - we need one to store the thread data
46 def work(self): 47 """ 48 Scrape something 49 50 This requests data according to the job's parameter - either from a 51 local file or from a URL. The job is then either finished or released 52 depending on whether that was successful, and the data is processed 53 further if available. 54 """ 55 if "file" in self.job.details: 56 # if the file is available locally, use that file 57 id = self.job.details["file"] 58 local_path = Path(self.job.details["file"]) 59 if not local_path.exists(): 60 self.job.finish() 61 self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"]) 62 return 63 64 with local_path.open() as source: 65 datafields = { 66 "status_code": 200, 67 "content": source.read() 68 } 69 70 data = collections.namedtuple("object", datafields.keys())(*datafields.values()) 71 else: 72 # if not, see what URL we need to request data from 73 url = self.get_url() 74 try: 75 # see if any proxies were configured that would work for this URL 76 protocol = url.split(":")[0] 77 if protocol in self.config.get('SCRAPE_PROXIES', []) and self.config.get('SCRAPE_PROXIES')[protocol]: 78 proxies = {protocol: random.choice(self.config.get('SCRAPE_PROXIES')[protocol])} 79 else: 80 proxies = None 81 82 # do the request! 83 data = requests.get(url, timeout=self.config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"}) 84 except (requests.exceptions.RequestException, ConnectionRefusedError) as e: 85 if self.job.data["attempts"] > 2: 86 self.job.finish() 87 self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e)) 88 else: 89 self.job.release(delay=random.randint(45,60)) 90 self.log.info("Could not finish request for %s (%s), releasing job" % (url, e)) 91 return 92 93 if "board" in self.job.details: 94 id = self.job.details["board"] + "/" + self.job.data["remote_id"] 95 else: 96 id = self.job.data["remote_id"] 97 98 if data.status_code == 404 or data.status_code == 403: 99 # this should be handled differently from an actually erroneous response 100 # because it may indicate that the resource has been deleted 101 self.not_found() 102 else: 103 parsed_data = self.parse(data.content) 104 if parsed_data is None: 105 if self.job.data["attempts"] < 2: 106 self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id)) 107 self.job.release(delay=random.choice(range(15, 45))) # try again later 108 else: 109 self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % ( 110 self.type, id, self.job.data["attempts"])) 111 self.job.finish() 112 return 113 114 # finally, pass it on 115 self.process(parsed_data) 116 self.after_process()
Scrape something
This requests data according to the job's parameter - either from a local file or from a URL. The job is then either finished or released depending on whether that was successful, and the data is processed further if available.
118 def after_process(self): 119 """ 120 After processing, declare job finished 121 """ 122 self.job.finish()
After processing, declare job finished
124 def not_found(self): 125 """ 126 Called if the job could not be completed because the request returned 127 a 404 response. This does not necessarily indicate failure. 128 """ 129 self.job.finish()
Called if the job could not be completed because the request returned a 404 response. This does not necessarily indicate failure.
131 def parse(self, data): 132 """ 133 Parse incoming data 134 135 Can be overridden to, e.g., parse JSON data 136 137 :param data: Body of HTTP request 138 :return: Parsed data 139 """ 140 return data
Parse incoming data
Can be overridden to, e.g., parse JSON data
Parameters
- data: Body of HTTP request
Returns
Parsed data
142 @abc.abstractmethod 143 def process(self, data): 144 """ 145 Process scraped data 146 147 :param data: Parsed JSON data 148 """ 149 pass
Process scraped data
Parameters
- data: Parsed JSON data
161class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta): 162 """ 163 Scraper for JSON-based data 164 """ 165 166 def parse(self, data): 167 """ 168 Parse data as JSON 169 170 :param str data: Incoming JSON-encoded data 171 :return: Decoded JSON object 172 """ 173 try: 174 return json.loads(data) 175 except json.JSONDecodeError: 176 return None
Scraper for JSON-based data
166 def parse(self, data): 167 """ 168 Parse data as JSON 169 170 :param str data: Incoming JSON-encoded data 171 :return: Decoded JSON object 172 """ 173 try: 174 return json.loads(data) 175 except json.JSONDecodeError: 176 return None
Parse data as JSON
Parameters
- str data: Incoming JSON-encoded data
Returns
Decoded JSON object