backend.lib.scraper
Basic scraper worker - should be inherited by workers to scrape specific types of content
1""" 2Basic scraper worker - should be inherited by workers to scrape specific types of content 3""" 4import collections 5import requests 6import random 7import json 8import abc 9 10from pathlib import Path 11from backend.lib.worker import BasicWorker 12 13from common.config_manager import config 14 15class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta): 16 """ 17 Abstract JSON scraper class 18 19 The job queue is continually checked for jobs of this scraper's type. If any are found, 20 the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is 21 then passed to a processor method for further handling. 22 """ 23 24 log_level = "warning" 25 _logger_method = None 26 category = "Collector" 27 28 def __init__(self, job, logger=None, manager=None, modules=None): 29 """ 30 Set up database connection - we need one to store the thread data 31 """ 32 super().__init__(logger=logger, manager=manager, job=job, modules=modules) 33 self.prefix = self.type.split("-")[0] 34 # Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database 35 # TODO: update database.sql names and create migrate script, then remove this 36 self.prefix = { 37 "fourchan": "4chan", 38 "eightkun": "8kun", 39 "eightchan": "8chan", 40 }[self.prefix] 41 42 if not hasattr(logger, self.log_level): 43 self.log_level = "warning" 44 45 self._logger_method = getattr(logger, self.log_level) 46 47 def work(self): 48 """ 49 Scrape something 50 51 This requests data according to the job's parameter - either from a 52 local file or from a URL. The job is then either finished or released 53 depending on whether that was successful, and the data is processed 54 further if available. 55 """ 56 if "file" in self.job.details: 57 # if the file is available locally, use that file 58 id = self.job.details["file"] 59 local_path = Path(self.job.details["file"]) 60 if not local_path.exists(): 61 self.job.finish() 62 self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"]) 63 return 64 65 with local_path.open() as source: 66 datafields = { 67 "status_code": 200, 68 "content": source.read() 69 } 70 71 data = collections.namedtuple("object", datafields.keys())(*datafields.values()) 72 else: 73 # if not, see what URL we need to request data from 74 url = self.get_url() 75 try: 76 # see if any proxies were configured that would work for this URL 77 protocol = url.split(":")[0] 78 if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]: 79 proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])} 80 else: 81 proxies = None 82 83 # do the request! 84 data = requests.get(url, timeout=config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"}) 85 except (requests.exceptions.RequestException, ConnectionRefusedError) as e: 86 if self.job.data["attempts"] > 2: 87 self.job.finish() 88 self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e)) 89 else: 90 self.job.release(delay=random.randint(45,60)) 91 self.log.info("Could not finish request for %s (%s), releasing job" % (url, e)) 92 return 93 94 if "board" in self.job.details: 95 id = self.job.details["board"] + "/" + self.job.data["remote_id"] 96 else: 97 id = self.job.data["remote_id"] 98 99 if data.status_code == 404: 100 # this should be handled differently from an actually erroneous response 101 # because it may indicate that the resource has been deleted 102 self.not_found() 103 else: 104 parsed_data = self.parse(data.content) 105 if parsed_data is None: 106 if self.job.data["attempts"] < 2: 107 self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id)) 108 self.job.release(delay=random.choice(range(15, 45))) # try again later 109 else: 110 self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % ( 111 self.type, id, self.job.data["attempts"])) 112 self.job.finish() 113 return 114 115 # finally, pass it on 116 self.process(parsed_data) 117 self.after_process() 118 119 def after_process(self): 120 """ 121 After processing, declare job finished 122 """ 123 self.job.finish() 124 125 def not_found(self): 126 """ 127 Called if the job could not be completed because the request returned 128 a 404 response. This does not necessarily indicate failure. 129 """ 130 self.job.finish() 131 132 def parse(self, data): 133 """ 134 Parse incoming data 135 136 Can be overridden to, e.g., parse JSON data 137 138 :param data: Body of HTTP request 139 :return: Parsed data 140 """ 141 return data 142 143 @abc.abstractmethod 144 def process(self, data): 145 """ 146 Process scraped data 147 148 :param data: Parsed JSON data 149 """ 150 pass 151 152 @abc.abstractmethod 153 def get_url(self): 154 """ 155 Get URL to scrape 156 157 :return string: URL to scrape 158 """ 159 pass 160 161 162class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta): 163 """ 164 Scraper for JSON-based data 165 """ 166 167 def parse(self, data): 168 """ 169 Parse data as JSON 170 171 :param str data: Incoming JSON-encoded data 172 :return: Decoded JSON object 173 """ 174 try: 175 return json.loads(data) 176 except json.JSONDecodeError: 177 return None
16class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta): 17 """ 18 Abstract JSON scraper class 19 20 The job queue is continually checked for jobs of this scraper's type. If any are found, 21 the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is 22 then passed to a processor method for further handling. 23 """ 24 25 log_level = "warning" 26 _logger_method = None 27 category = "Collector" 28 29 def __init__(self, job, logger=None, manager=None, modules=None): 30 """ 31 Set up database connection - we need one to store the thread data 32 """ 33 super().__init__(logger=logger, manager=manager, job=job, modules=modules) 34 self.prefix = self.type.split("-")[0] 35 # Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database 36 # TODO: update database.sql names and create migrate script, then remove this 37 self.prefix = { 38 "fourchan": "4chan", 39 "eightkun": "8kun", 40 "eightchan": "8chan", 41 }[self.prefix] 42 43 if not hasattr(logger, self.log_level): 44 self.log_level = "warning" 45 46 self._logger_method = getattr(logger, self.log_level) 47 48 def work(self): 49 """ 50 Scrape something 51 52 This requests data according to the job's parameter - either from a 53 local file or from a URL. The job is then either finished or released 54 depending on whether that was successful, and the data is processed 55 further if available. 56 """ 57 if "file" in self.job.details: 58 # if the file is available locally, use that file 59 id = self.job.details["file"] 60 local_path = Path(self.job.details["file"]) 61 if not local_path.exists(): 62 self.job.finish() 63 self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"]) 64 return 65 66 with local_path.open() as source: 67 datafields = { 68 "status_code": 200, 69 "content": source.read() 70 } 71 72 data = collections.namedtuple("object", datafields.keys())(*datafields.values()) 73 else: 74 # if not, see what URL we need to request data from 75 url = self.get_url() 76 try: 77 # see if any proxies were configured that would work for this URL 78 protocol = url.split(":")[0] 79 if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]: 80 proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])} 81 else: 82 proxies = None 83 84 # do the request! 85 data = requests.get(url, timeout=config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"}) 86 except (requests.exceptions.RequestException, ConnectionRefusedError) as e: 87 if self.job.data["attempts"] > 2: 88 self.job.finish() 89 self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e)) 90 else: 91 self.job.release(delay=random.randint(45,60)) 92 self.log.info("Could not finish request for %s (%s), releasing job" % (url, e)) 93 return 94 95 if "board" in self.job.details: 96 id = self.job.details["board"] + "/" + self.job.data["remote_id"] 97 else: 98 id = self.job.data["remote_id"] 99 100 if data.status_code == 404: 101 # this should be handled differently from an actually erroneous response 102 # because it may indicate that the resource has been deleted 103 self.not_found() 104 else: 105 parsed_data = self.parse(data.content) 106 if parsed_data is None: 107 if self.job.data["attempts"] < 2: 108 self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id)) 109 self.job.release(delay=random.choice(range(15, 45))) # try again later 110 else: 111 self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % ( 112 self.type, id, self.job.data["attempts"])) 113 self.job.finish() 114 return 115 116 # finally, pass it on 117 self.process(parsed_data) 118 self.after_process() 119 120 def after_process(self): 121 """ 122 After processing, declare job finished 123 """ 124 self.job.finish() 125 126 def not_found(self): 127 """ 128 Called if the job could not be completed because the request returned 129 a 404 response. This does not necessarily indicate failure. 130 """ 131 self.job.finish() 132 133 def parse(self, data): 134 """ 135 Parse incoming data 136 137 Can be overridden to, e.g., parse JSON data 138 139 :param data: Body of HTTP request 140 :return: Parsed data 141 """ 142 return data 143 144 @abc.abstractmethod 145 def process(self, data): 146 """ 147 Process scraped data 148 149 :param data: Parsed JSON data 150 """ 151 pass 152 153 @abc.abstractmethod 154 def get_url(self): 155 """ 156 Get URL to scrape 157 158 :return string: URL to scrape 159 """ 160 pass
Abstract JSON scraper class
The job queue is continually checked for jobs of this scraper's type. If any are found, the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is then passed to a processor method for further handling.
29 def __init__(self, job, logger=None, manager=None, modules=None): 30 """ 31 Set up database connection - we need one to store the thread data 32 """ 33 super().__init__(logger=logger, manager=manager, job=job, modules=modules) 34 self.prefix = self.type.split("-")[0] 35 # Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database 36 # TODO: update database.sql names and create migrate script, then remove this 37 self.prefix = { 38 "fourchan": "4chan", 39 "eightkun": "8kun", 40 "eightchan": "8chan", 41 }[self.prefix] 42 43 if not hasattr(logger, self.log_level): 44 self.log_level = "warning" 45 46 self._logger_method = getattr(logger, self.log_level)
Set up database connection - we need one to store the thread data
48 def work(self): 49 """ 50 Scrape something 51 52 This requests data according to the job's parameter - either from a 53 local file or from a URL. The job is then either finished or released 54 depending on whether that was successful, and the data is processed 55 further if available. 56 """ 57 if "file" in self.job.details: 58 # if the file is available locally, use that file 59 id = self.job.details["file"] 60 local_path = Path(self.job.details["file"]) 61 if not local_path.exists(): 62 self.job.finish() 63 self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"]) 64 return 65 66 with local_path.open() as source: 67 datafields = { 68 "status_code": 200, 69 "content": source.read() 70 } 71 72 data = collections.namedtuple("object", datafields.keys())(*datafields.values()) 73 else: 74 # if not, see what URL we need to request data from 75 url = self.get_url() 76 try: 77 # see if any proxies were configured that would work for this URL 78 protocol = url.split(":")[0] 79 if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]: 80 proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])} 81 else: 82 proxies = None 83 84 # do the request! 85 data = requests.get(url, timeout=config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"}) 86 except (requests.exceptions.RequestException, ConnectionRefusedError) as e: 87 if self.job.data["attempts"] > 2: 88 self.job.finish() 89 self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e)) 90 else: 91 self.job.release(delay=random.randint(45,60)) 92 self.log.info("Could not finish request for %s (%s), releasing job" % (url, e)) 93 return 94 95 if "board" in self.job.details: 96 id = self.job.details["board"] + "/" + self.job.data["remote_id"] 97 else: 98 id = self.job.data["remote_id"] 99 100 if data.status_code == 404: 101 # this should be handled differently from an actually erroneous response 102 # because it may indicate that the resource has been deleted 103 self.not_found() 104 else: 105 parsed_data = self.parse(data.content) 106 if parsed_data is None: 107 if self.job.data["attempts"] < 2: 108 self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id)) 109 self.job.release(delay=random.choice(range(15, 45))) # try again later 110 else: 111 self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % ( 112 self.type, id, self.job.data["attempts"])) 113 self.job.finish() 114 return 115 116 # finally, pass it on 117 self.process(parsed_data) 118 self.after_process()
Scrape something
This requests data according to the job's parameter - either from a local file or from a URL. The job is then either finished or released depending on whether that was successful, and the data is processed further if available.
120 def after_process(self): 121 """ 122 After processing, declare job finished 123 """ 124 self.job.finish()
After processing, declare job finished
126 def not_found(self): 127 """ 128 Called if the job could not be completed because the request returned 129 a 404 response. This does not necessarily indicate failure. 130 """ 131 self.job.finish()
Called if the job could not be completed because the request returned a 404 response. This does not necessarily indicate failure.
133 def parse(self, data): 134 """ 135 Parse incoming data 136 137 Can be overridden to, e.g., parse JSON data 138 139 :param data: Body of HTTP request 140 :return: Parsed data 141 """ 142 return data
Parse incoming data
Can be overridden to, e.g., parse JSON data
Parameters
- data: Body of HTTP request
Returns
Parsed data
144 @abc.abstractmethod 145 def process(self, data): 146 """ 147 Process scraped data 148 149 :param data: Parsed JSON data 150 """ 151 pass
Process scraped data
Parameters
- data: Parsed JSON data
163class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta): 164 """ 165 Scraper for JSON-based data 166 """ 167 168 def parse(self, data): 169 """ 170 Parse data as JSON 171 172 :param str data: Incoming JSON-encoded data 173 :return: Decoded JSON object 174 """ 175 try: 176 return json.loads(data) 177 except json.JSONDecodeError: 178 return None
Scraper for JSON-based data
168 def parse(self, data): 169 """ 170 Parse data as JSON 171 172 :param str data: Incoming JSON-encoded data 173 :return: Decoded JSON object 174 """ 175 try: 176 return json.loads(data) 177 except json.JSONDecodeError: 178 return None
Parse data as JSON
Parameters
- str data: Incoming JSON-encoded data
Returns
Decoded JSON object