Edit on GitHub

backend.lib.scraper

Basic scraper worker - should be inherited by workers to scrape specific types of content

  1"""
  2Basic scraper worker - should be inherited by workers to scrape specific types of content
  3"""
  4import collections
  5import requests
  6import random
  7import json
  8import abc
  9
 10from pathlib import Path
 11from backend.lib.worker import BasicWorker
 12
 13from common.config_manager import config
 14
 15class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta):
 16	"""
 17	Abstract JSON scraper class
 18
 19	The job queue is continually checked for jobs of this scraper's type. If any are found,
 20	the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is
 21	then passed to a processor method for further handling.
 22	"""
 23
 24	log_level = "warning"
 25	_logger_method = None
 26	category = "Collector"
 27
 28	def __init__(self, job, logger=None, manager=None, modules=None):
 29		"""
 30		Set up database connection - we need one to store the thread data
 31		"""
 32		super().__init__(logger=logger, manager=manager, job=job, modules=modules)
 33		self.prefix = self.type.split("-")[0]
 34		# Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database
 35		# TODO: update database.sql names and create migrate script, then remove this
 36		self.prefix = {
 37			"fourchan": "4chan",
 38			"eightkun": "8kun",
 39			"eightchan": "8chan",
 40		}[self.prefix]
 41
 42		if not hasattr(logger, self.log_level):
 43			self.log_level = "warning"
 44
 45		self._logger_method = getattr(logger, self.log_level)
 46
 47	def work(self):
 48		"""
 49		Scrape something
 50
 51		This requests data according to the job's parameter - either from a
 52		local file or from a URL. The job is then either finished or released
 53		depending on whether that was successful, and the data is processed
 54		further if available.
 55		"""
 56		if "file" in self.job.details:
 57			# if the file is available locally, use that file
 58			id = self.job.details["file"]
 59			local_path = Path(self.job.details["file"])
 60			if not local_path.exists():
 61				self.job.finish()
 62				self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"])
 63				return
 64
 65			with local_path.open() as source:
 66				datafields = {
 67					"status_code": 200,
 68					"content": source.read()
 69				}
 70
 71				data = collections.namedtuple("object", datafields.keys())(*datafields.values())
 72		else:
 73			# if not, see what URL we need to request data from
 74			url = self.get_url()
 75			try:
 76				# see if any proxies were configured that would work for this URL
 77				protocol = url.split(":")[0]
 78				if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]:
 79					proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])}
 80				else:
 81					proxies = None
 82
 83				# do the request!
 84				data = requests.get(url, timeout=config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
 85			except (requests.exceptions.RequestException, ConnectionRefusedError) as e:
 86				if self.job.data["attempts"] > 2:
 87					self.job.finish()
 88					self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e))
 89				else:
 90					self.job.release(delay=random.randint(45,60))
 91					self.log.info("Could not finish request for %s (%s), releasing job" % (url, e))
 92				return
 93
 94			if "board" in self.job.details:
 95				id = self.job.details["board"] + "/" + self.job.data["remote_id"]
 96			else:
 97				id = self.job.data["remote_id"]
 98
 99		if data.status_code == 404:
100			# this should be handled differently from an actually erroneous response
101			# because it may indicate that the resource has been deleted
102			self.not_found()
103		else:
104			parsed_data = self.parse(data.content)
105			if parsed_data is None:
106				if self.job.data["attempts"] < 2:
107					self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id))
108					self.job.release(delay=random.choice(range(15, 45)))  # try again later
109				else:
110					self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % (
111					self.type, id, self.job.data["attempts"]))
112					self.job.finish()
113				return
114
115			# finally, pass it on
116			self.process(parsed_data)
117			self.after_process()
118
119	def after_process(self):
120		"""
121		After processing, declare job finished
122		"""
123		self.job.finish()
124
125	def not_found(self):
126		"""
127		Called if the job could not be completed because the request returned
128		a 404 response. This does not necessarily indicate failure.
129		"""
130		self.job.finish()
131
132	def parse(self, data):
133		"""
134		Parse incoming data
135
136		Can be overridden to, e.g., parse JSON data
137
138		:param data:  Body of HTTP request
139		:return:  Parsed data
140		"""
141		return data
142
143	@abc.abstractmethod
144	def process(self, data):
145		"""
146		Process scraped data
147
148		:param data:  Parsed JSON data
149		"""
150		pass
151
152	@abc.abstractmethod
153	def get_url(self):
154		"""
155		Get URL to scrape
156
157		:return string:  URL to scrape
158		"""
159		pass
160
161
162class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta):
163	"""
164	Scraper for JSON-based data
165	"""
166
167	def parse(self, data):
168		"""
169		Parse data as JSON
170
171		:param str data:  Incoming JSON-encoded data
172		:return:  Decoded JSON object
173		"""
174		try:
175			return json.loads(data)
176		except json.JSONDecodeError:
177			return None
class BasicHTTPScraper(backend.lib.worker.BasicWorker):
 16class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta):
 17	"""
 18	Abstract JSON scraper class
 19
 20	The job queue is continually checked for jobs of this scraper's type. If any are found,
 21	the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is
 22	then passed to a processor method for further handling.
 23	"""
 24
 25	log_level = "warning"
 26	_logger_method = None
 27	category = "Collector"
 28
 29	def __init__(self, job, logger=None, manager=None, modules=None):
 30		"""
 31		Set up database connection - we need one to store the thread data
 32		"""
 33		super().__init__(logger=logger, manager=manager, job=job, modules=modules)
 34		self.prefix = self.type.split("-")[0]
 35		# Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database
 36		# TODO: update database.sql names and create migrate script, then remove this
 37		self.prefix = {
 38			"fourchan": "4chan",
 39			"eightkun": "8kun",
 40			"eightchan": "8chan",
 41		}[self.prefix]
 42
 43		if not hasattr(logger, self.log_level):
 44			self.log_level = "warning"
 45
 46		self._logger_method = getattr(logger, self.log_level)
 47
 48	def work(self):
 49		"""
 50		Scrape something
 51
 52		This requests data according to the job's parameter - either from a
 53		local file or from a URL. The job is then either finished or released
 54		depending on whether that was successful, and the data is processed
 55		further if available.
 56		"""
 57		if "file" in self.job.details:
 58			# if the file is available locally, use that file
 59			id = self.job.details["file"]
 60			local_path = Path(self.job.details["file"])
 61			if not local_path.exists():
 62				self.job.finish()
 63				self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"])
 64				return
 65
 66			with local_path.open() as source:
 67				datafields = {
 68					"status_code": 200,
 69					"content": source.read()
 70				}
 71
 72				data = collections.namedtuple("object", datafields.keys())(*datafields.values())
 73		else:
 74			# if not, see what URL we need to request data from
 75			url = self.get_url()
 76			try:
 77				# see if any proxies were configured that would work for this URL
 78				protocol = url.split(":")[0]
 79				if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]:
 80					proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])}
 81				else:
 82					proxies = None
 83
 84				# do the request!
 85				data = requests.get(url, timeout=config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
 86			except (requests.exceptions.RequestException, ConnectionRefusedError) as e:
 87				if self.job.data["attempts"] > 2:
 88					self.job.finish()
 89					self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e))
 90				else:
 91					self.job.release(delay=random.randint(45,60))
 92					self.log.info("Could not finish request for %s (%s), releasing job" % (url, e))
 93				return
 94
 95			if "board" in self.job.details:
 96				id = self.job.details["board"] + "/" + self.job.data["remote_id"]
 97			else:
 98				id = self.job.data["remote_id"]
 99
100		if data.status_code == 404:
101			# this should be handled differently from an actually erroneous response
102			# because it may indicate that the resource has been deleted
103			self.not_found()
104		else:
105			parsed_data = self.parse(data.content)
106			if parsed_data is None:
107				if self.job.data["attempts"] < 2:
108					self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id))
109					self.job.release(delay=random.choice(range(15, 45)))  # try again later
110				else:
111					self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % (
112					self.type, id, self.job.data["attempts"]))
113					self.job.finish()
114				return
115
116			# finally, pass it on
117			self.process(parsed_data)
118			self.after_process()
119
120	def after_process(self):
121		"""
122		After processing, declare job finished
123		"""
124		self.job.finish()
125
126	def not_found(self):
127		"""
128		Called if the job could not be completed because the request returned
129		a 404 response. This does not necessarily indicate failure.
130		"""
131		self.job.finish()
132
133	def parse(self, data):
134		"""
135		Parse incoming data
136
137		Can be overridden to, e.g., parse JSON data
138
139		:param data:  Body of HTTP request
140		:return:  Parsed data
141		"""
142		return data
143
144	@abc.abstractmethod
145	def process(self, data):
146		"""
147		Process scraped data
148
149		:param data:  Parsed JSON data
150		"""
151		pass
152
153	@abc.abstractmethod
154	def get_url(self):
155		"""
156		Get URL to scrape
157
158		:return string:  URL to scrape
159		"""
160		pass

Abstract JSON scraper class

The job queue is continually checked for jobs of this scraper's type. If any are found, the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is then passed to a processor method for further handling.

BasicHTTPScraper(job, logger=None, manager=None, modules=None)
29	def __init__(self, job, logger=None, manager=None, modules=None):
30		"""
31		Set up database connection - we need one to store the thread data
32		"""
33		super().__init__(logger=logger, manager=manager, job=job, modules=modules)
34		self.prefix = self.type.split("-")[0]
35		# Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database
36		# TODO: update database.sql names and create migrate script, then remove this
37		self.prefix = {
38			"fourchan": "4chan",
39			"eightkun": "8kun",
40			"eightchan": "8chan",
41		}[self.prefix]
42
43		if not hasattr(logger, self.log_level):
44			self.log_level = "warning"
45
46		self._logger_method = getattr(logger, self.log_level)

Set up database connection - we need one to store the thread data

log_level = 'warning'
category = 'Collector'
prefix
def work(self):
 48	def work(self):
 49		"""
 50		Scrape something
 51
 52		This requests data according to the job's parameter - either from a
 53		local file or from a URL. The job is then either finished or released
 54		depending on whether that was successful, and the data is processed
 55		further if available.
 56		"""
 57		if "file" in self.job.details:
 58			# if the file is available locally, use that file
 59			id = self.job.details["file"]
 60			local_path = Path(self.job.details["file"])
 61			if not local_path.exists():
 62				self.job.finish()
 63				self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"])
 64				return
 65
 66			with local_path.open() as source:
 67				datafields = {
 68					"status_code": 200,
 69					"content": source.read()
 70				}
 71
 72				data = collections.namedtuple("object", datafields.keys())(*datafields.values())
 73		else:
 74			# if not, see what URL we need to request data from
 75			url = self.get_url()
 76			try:
 77				# see if any proxies were configured that would work for this URL
 78				protocol = url.split(":")[0]
 79				if protocol in config.get('SCRAPE_PROXIES', []) and config.get('SCRAPE_PROXIES')[protocol]:
 80					proxies = {protocol: random.choice(config.get('SCRAPE_PROXIES')[protocol])}
 81				else:
 82					proxies = None
 83
 84				# do the request!
 85				data = requests.get(url, timeout=config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
 86			except (requests.exceptions.RequestException, ConnectionRefusedError) as e:
 87				if self.job.data["attempts"] > 2:
 88					self.job.finish()
 89					self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e))
 90				else:
 91					self.job.release(delay=random.randint(45,60))
 92					self.log.info("Could not finish request for %s (%s), releasing job" % (url, e))
 93				return
 94
 95			if "board" in self.job.details:
 96				id = self.job.details["board"] + "/" + self.job.data["remote_id"]
 97			else:
 98				id = self.job.data["remote_id"]
 99
100		if data.status_code == 404:
101			# this should be handled differently from an actually erroneous response
102			# because it may indicate that the resource has been deleted
103			self.not_found()
104		else:
105			parsed_data = self.parse(data.content)
106			if parsed_data is None:
107				if self.job.data["attempts"] < 2:
108					self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id))
109					self.job.release(delay=random.choice(range(15, 45)))  # try again later
110				else:
111					self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % (
112					self.type, id, self.job.data["attempts"]))
113					self.job.finish()
114				return
115
116			# finally, pass it on
117			self.process(parsed_data)
118			self.after_process()

Scrape something

This requests data according to the job's parameter - either from a local file or from a URL. The job is then either finished or released depending on whether that was successful, and the data is processed further if available.

def after_process(self):
120	def after_process(self):
121		"""
122		After processing, declare job finished
123		"""
124		self.job.finish()

After processing, declare job finished

def not_found(self):
126	def not_found(self):
127		"""
128		Called if the job could not be completed because the request returned
129		a 404 response. This does not necessarily indicate failure.
130		"""
131		self.job.finish()

Called if the job could not be completed because the request returned a 404 response. This does not necessarily indicate failure.

def parse(self, data):
133	def parse(self, data):
134		"""
135		Parse incoming data
136
137		Can be overridden to, e.g., parse JSON data
138
139		:param data:  Body of HTTP request
140		:return:  Parsed data
141		"""
142		return data

Parse incoming data

Can be overridden to, e.g., parse JSON data

Parameters
  • data: Body of HTTP request
Returns

Parsed data

@abc.abstractmethod
def process(self, data):
144	@abc.abstractmethod
145	def process(self, data):
146		"""
147		Process scraped data
148
149		:param data:  Parsed JSON data
150		"""
151		pass

Process scraped data

Parameters
  • data: Parsed JSON data
@abc.abstractmethod
def get_url(self):
153	@abc.abstractmethod
154	def get_url(self):
155		"""
156		Get URL to scrape
157
158		:return string:  URL to scrape
159		"""
160		pass

Get URL to scrape

Returns

URL to scrape

class BasicJSONScraper(BasicHTTPScraper):
163class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta):
164	"""
165	Scraper for JSON-based data
166	"""
167
168	def parse(self, data):
169		"""
170		Parse data as JSON
171
172		:param str data:  Incoming JSON-encoded data
173		:return:  Decoded JSON object
174		"""
175		try:
176			return json.loads(data)
177		except json.JSONDecodeError:
178			return None

Scraper for JSON-based data

def parse(self, data):
168	def parse(self, data):
169		"""
170		Parse data as JSON
171
172		:param str data:  Incoming JSON-encoded data
173		:return:  Decoded JSON object
174		"""
175		try:
176			return json.loads(data)
177		except json.JSONDecodeError:
178			return None

Parse data as JSON

Parameters
  • str data: Incoming JSON-encoded data
Returns

Decoded JSON object