Edit on GitHub

backend.lib.scraper

Basic scraper worker - should be inherited by workers to scrape specific types of content

  1"""
  2Basic scraper worker - should be inherited by workers to scrape specific types of content
  3"""
  4import collections
  5import requests
  6import random
  7import json
  8import abc
  9
 10from pathlib import Path
 11from backend.lib.worker import BasicWorker
 12
 13class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta):
 14	"""
 15	Abstract JSON scraper class
 16
 17	The job queue is continually checked for jobs of this scraper's type. If any are found,
 18	the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is
 19	then passed to a processor method for further handling.
 20	"""
 21
 22	log_level = "warning"
 23	_logger_method = None
 24	category = "Collector"
 25
 26	def __init__(self, job, logger=None, manager=None, modules=None):
 27		"""
 28		Set up database connection - we need one to store the thread data
 29		"""
 30		super().__init__(logger=logger, manager=manager, job=job, modules=modules)
 31		self.prefix = self.type.split("-")[0]
 32		# Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database
 33		# TODO: update database.sql names and create migrate script, then remove this
 34		self.prefix = {
 35			"fourchan": "4chan",
 36			"eightkun": "8kun",
 37			"eightchan": "8chan",
 38		}[self.prefix]
 39
 40		if not hasattr(logger, self.log_level):
 41			self.log_level = "warning"
 42
 43		self._logger_method = getattr(logger, self.log_level)
 44
 45	def work(self):
 46		"""
 47		Scrape something
 48
 49		This requests data according to the job's parameter - either from a
 50		local file or from a URL. The job is then either finished or released
 51		depending on whether that was successful, and the data is processed
 52		further if available.
 53		"""
 54		if "file" in self.job.details:
 55			# if the file is available locally, use that file
 56			id = self.job.details["file"]
 57			local_path = Path(self.job.details["file"])
 58			if not local_path.exists():
 59				self.job.finish()
 60				self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"])
 61				return
 62
 63			with local_path.open() as source:
 64				datafields = {
 65					"status_code": 200,
 66					"content": source.read()
 67				}
 68
 69				data = collections.namedtuple("object", datafields.keys())(*datafields.values())
 70		else:
 71			# if not, see what URL we need to request data from
 72			url = self.get_url()
 73			try:
 74				# see if any proxies were configured that would work for this URL
 75				protocol = url.split(":")[0]
 76				if protocol in self.config.get('SCRAPE_PROXIES', []) and self.config.get('SCRAPE_PROXIES')[protocol]:
 77					proxies = {protocol: random.choice(self.config.get('SCRAPE_PROXIES')[protocol])}
 78				else:
 79					proxies = None
 80
 81				# do the request!
 82				data = requests.get(url, timeout=self.config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
 83			except (requests.exceptions.RequestException, ConnectionRefusedError) as e:
 84				if self.job.data["attempts"] > 2:
 85					self.job.finish()
 86					self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e))
 87				else:
 88					self.job.release(delay=random.randint(45,60))
 89					self.log.info("Could not finish request for %s (%s), releasing job" % (url, e))
 90				return
 91
 92			if "board" in self.job.details:
 93				id = self.job.details["board"] + "/" + self.job.data["remote_id"]
 94			else:
 95				id = self.job.data["remote_id"]
 96
 97		if data.status_code == 404 or data.status_code == 403:
 98			# this should be handled differently from an actually erroneous response
 99			# because it may indicate that the resource has been deleted
100			self.not_found()
101		else:
102			parsed_data = self.parse(data.content)
103			if parsed_data is None:
104				if self.job.data["attempts"] < 2:
105					self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id))
106					self.job.release(delay=random.choice(range(15, 45)))  # try again later
107				else:
108					self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % (
109					self.type, id, self.job.data["attempts"]))
110					self.job.finish()
111				return
112
113			# finally, pass it on
114			self.process(parsed_data)
115			self.after_process()
116
117	def after_process(self):
118		"""
119		After processing, declare job finished
120		"""
121		self.job.finish()
122
123	def not_found(self):
124		"""
125		Called if the job could not be completed because the request returned
126		a 404 response. This does not necessarily indicate failure.
127		"""
128		self.job.finish()
129
130	def parse(self, data):
131		"""
132		Parse incoming data
133
134		Can be overridden to, e.g., parse JSON data
135
136		:param data:  Body of HTTP request
137		:return:  Parsed data
138		"""
139		return data
140
141	@abc.abstractmethod
142	def process(self, data):
143		"""
144		Process scraped data
145
146		:param data:  Parsed JSON data
147		"""
148		pass
149
150	@abc.abstractmethod
151	def get_url(self):
152		"""
153		Get URL to scrape
154
155		:return string:  URL to scrape
156		"""
157		pass
158
159
160class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta):
161	"""
162	Scraper for JSON-based data
163	"""
164
165	def parse(self, data):
166		"""
167		Parse data as JSON
168
169		:param str data:  Incoming JSON-encoded data
170		:return:  Decoded JSON object
171		"""
172		try:
173			return json.loads(data)
174		except json.JSONDecodeError:
175			return None
class BasicHTTPScraper(backend.lib.worker.BasicWorker):
 14class BasicHTTPScraper(BasicWorker, metaclass=abc.ABCMeta):
 15	"""
 16	Abstract JSON scraper class
 17
 18	The job queue is continually checked for jobs of this scraper's type. If any are found,
 19	the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is
 20	then passed to a processor method for further handling.
 21	"""
 22
 23	log_level = "warning"
 24	_logger_method = None
 25	category = "Collector"
 26
 27	def __init__(self, job, logger=None, manager=None, modules=None):
 28		"""
 29		Set up database connection - we need one to store the thread data
 30		"""
 31		super().__init__(logger=logger, manager=manager, job=job, modules=modules)
 32		self.prefix = self.type.split("-")[0]
 33		# Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database
 34		# TODO: update database.sql names and create migrate script, then remove this
 35		self.prefix = {
 36			"fourchan": "4chan",
 37			"eightkun": "8kun",
 38			"eightchan": "8chan",
 39		}[self.prefix]
 40
 41		if not hasattr(logger, self.log_level):
 42			self.log_level = "warning"
 43
 44		self._logger_method = getattr(logger, self.log_level)
 45
 46	def work(self):
 47		"""
 48		Scrape something
 49
 50		This requests data according to the job's parameter - either from a
 51		local file or from a URL. The job is then either finished or released
 52		depending on whether that was successful, and the data is processed
 53		further if available.
 54		"""
 55		if "file" in self.job.details:
 56			# if the file is available locally, use that file
 57			id = self.job.details["file"]
 58			local_path = Path(self.job.details["file"])
 59			if not local_path.exists():
 60				self.job.finish()
 61				self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"])
 62				return
 63
 64			with local_path.open() as source:
 65				datafields = {
 66					"status_code": 200,
 67					"content": source.read()
 68				}
 69
 70				data = collections.namedtuple("object", datafields.keys())(*datafields.values())
 71		else:
 72			# if not, see what URL we need to request data from
 73			url = self.get_url()
 74			try:
 75				# see if any proxies were configured that would work for this URL
 76				protocol = url.split(":")[0]
 77				if protocol in self.config.get('SCRAPE_PROXIES', []) and self.config.get('SCRAPE_PROXIES')[protocol]:
 78					proxies = {protocol: random.choice(self.config.get('SCRAPE_PROXIES')[protocol])}
 79				else:
 80					proxies = None
 81
 82				# do the request!
 83				data = requests.get(url, timeout=self.config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
 84			except (requests.exceptions.RequestException, ConnectionRefusedError) as e:
 85				if self.job.data["attempts"] > 2:
 86					self.job.finish()
 87					self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e))
 88				else:
 89					self.job.release(delay=random.randint(45,60))
 90					self.log.info("Could not finish request for %s (%s), releasing job" % (url, e))
 91				return
 92
 93			if "board" in self.job.details:
 94				id = self.job.details["board"] + "/" + self.job.data["remote_id"]
 95			else:
 96				id = self.job.data["remote_id"]
 97
 98		if data.status_code == 404 or data.status_code == 403:
 99			# this should be handled differently from an actually erroneous response
100			# because it may indicate that the resource has been deleted
101			self.not_found()
102		else:
103			parsed_data = self.parse(data.content)
104			if parsed_data is None:
105				if self.job.data["attempts"] < 2:
106					self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id))
107					self.job.release(delay=random.choice(range(15, 45)))  # try again later
108				else:
109					self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % (
110					self.type, id, self.job.data["attempts"]))
111					self.job.finish()
112				return
113
114			# finally, pass it on
115			self.process(parsed_data)
116			self.after_process()
117
118	def after_process(self):
119		"""
120		After processing, declare job finished
121		"""
122		self.job.finish()
123
124	def not_found(self):
125		"""
126		Called if the job could not be completed because the request returned
127		a 404 response. This does not necessarily indicate failure.
128		"""
129		self.job.finish()
130
131	def parse(self, data):
132		"""
133		Parse incoming data
134
135		Can be overridden to, e.g., parse JSON data
136
137		:param data:  Body of HTTP request
138		:return:  Parsed data
139		"""
140		return data
141
142	@abc.abstractmethod
143	def process(self, data):
144		"""
145		Process scraped data
146
147		:param data:  Parsed JSON data
148		"""
149		pass
150
151	@abc.abstractmethod
152	def get_url(self):
153		"""
154		Get URL to scrape
155
156		:return string:  URL to scrape
157		"""
158		pass

Abstract JSON scraper class

The job queue is continually checked for jobs of this scraper's type. If any are found, the URL for that job is scraped and the result is parsed as JSON. The parsed JSON is then passed to a processor method for further handling.

BasicHTTPScraper(job, logger=None, manager=None, modules=None)
27	def __init__(self, job, logger=None, manager=None, modules=None):
28		"""
29		Set up database connection - we need one to store the thread data
30		"""
31		super().__init__(logger=logger, manager=manager, job=job, modules=modules)
32		self.prefix = self.type.split("-")[0]
33		# Names were updated to be more consistent with the rest of the codebase, but we still need to support the old database
34		# TODO: update database.sql names and create migrate script, then remove this
35		self.prefix = {
36			"fourchan": "4chan",
37			"eightkun": "8kun",
38			"eightchan": "8chan",
39		}[self.prefix]
40
41		if not hasattr(logger, self.log_level):
42			self.log_level = "warning"
43
44		self._logger_method = getattr(logger, self.log_level)

Set up database connection - we need one to store the thread data

log_level = 'warning'
category = 'Collector'
prefix
def work(self):
 46	def work(self):
 47		"""
 48		Scrape something
 49
 50		This requests data according to the job's parameter - either from a
 51		local file or from a URL. The job is then either finished or released
 52		depending on whether that was successful, and the data is processed
 53		further if available.
 54		"""
 55		if "file" in self.job.details:
 56			# if the file is available locally, use that file
 57			id = self.job.details["file"]
 58			local_path = Path(self.job.details["file"])
 59			if not local_path.exists():
 60				self.job.finish()
 61				self.log.error("Scraper was told to use source file %s, but file does not exist, cancelling job." % self.job.details["file"])
 62				return
 63
 64			with local_path.open() as source:
 65				datafields = {
 66					"status_code": 200,
 67					"content": source.read()
 68				}
 69
 70				data = collections.namedtuple("object", datafields.keys())(*datafields.values())
 71		else:
 72			# if not, see what URL we need to request data from
 73			url = self.get_url()
 74			try:
 75				# see if any proxies were configured that would work for this URL
 76				protocol = url.split(":")[0]
 77				if protocol in self.config.get('SCRAPE_PROXIES', []) and self.config.get('SCRAPE_PROXIES')[protocol]:
 78					proxies = {protocol: random.choice(self.config.get('SCRAPE_PROXIES')[protocol])}
 79				else:
 80					proxies = None
 81
 82				# do the request!
 83				data = requests.get(url, timeout=self.config.get('SCRAPE_TIMEOUT', 60), proxies=proxies, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15"})
 84			except (requests.exceptions.RequestException, ConnectionRefusedError) as e:
 85				if self.job.data["attempts"] > 2:
 86					self.job.finish()
 87					self.log.error("Could not finish request for %s (%s), cancelling job" % (url, e))
 88				else:
 89					self.job.release(delay=random.randint(45,60))
 90					self.log.info("Could not finish request for %s (%s), releasing job" % (url, e))
 91				return
 92
 93			if "board" in self.job.details:
 94				id = self.job.details["board"] + "/" + self.job.data["remote_id"]
 95			else:
 96				id = self.job.data["remote_id"]
 97
 98		if data.status_code == 404 or data.status_code == 403:
 99			# this should be handled differently from an actually erroneous response
100			# because it may indicate that the resource has been deleted
101			self.not_found()
102		else:
103			parsed_data = self.parse(data.content)
104			if parsed_data is None:
105				if self.job.data["attempts"] < 2:
106					self.log.info("Data for %s %s could not be parsed, retrying later" % (self.type, id))
107					self.job.release(delay=random.choice(range(15, 45)))  # try again later
108				else:
109					self._logger_method("Data for %s %s could not be parsed after %i attempts, aborting" % (
110					self.type, id, self.job.data["attempts"]))
111					self.job.finish()
112				return
113
114			# finally, pass it on
115			self.process(parsed_data)
116			self.after_process()

Scrape something

This requests data according to the job's parameter - either from a local file or from a URL. The job is then either finished or released depending on whether that was successful, and the data is processed further if available.

def after_process(self):
118	def after_process(self):
119		"""
120		After processing, declare job finished
121		"""
122		self.job.finish()

After processing, declare job finished

def not_found(self):
124	def not_found(self):
125		"""
126		Called if the job could not be completed because the request returned
127		a 404 response. This does not necessarily indicate failure.
128		"""
129		self.job.finish()

Called if the job could not be completed because the request returned a 404 response. This does not necessarily indicate failure.

def parse(self, data):
131	def parse(self, data):
132		"""
133		Parse incoming data
134
135		Can be overridden to, e.g., parse JSON data
136
137		:param data:  Body of HTTP request
138		:return:  Parsed data
139		"""
140		return data

Parse incoming data

Can be overridden to, e.g., parse JSON data

Parameters
  • data: Body of HTTP request
Returns

Parsed data

@abc.abstractmethod
def process(self, data):
142	@abc.abstractmethod
143	def process(self, data):
144		"""
145		Process scraped data
146
147		:param data:  Parsed JSON data
148		"""
149		pass

Process scraped data

Parameters
  • data: Parsed JSON data
@abc.abstractmethod
def get_url(self):
151	@abc.abstractmethod
152	def get_url(self):
153		"""
154		Get URL to scrape
155
156		:return string:  URL to scrape
157		"""
158		pass

Get URL to scrape

Returns

URL to scrape

class BasicJSONScraper(BasicHTTPScraper):
161class BasicJSONScraper(BasicHTTPScraper, metaclass=abc.ABCMeta):
162	"""
163	Scraper for JSON-based data
164	"""
165
166	def parse(self, data):
167		"""
168		Parse data as JSON
169
170		:param str data:  Incoming JSON-encoded data
171		:return:  Decoded JSON object
172		"""
173		try:
174			return json.loads(data)
175		except json.JSONDecodeError:
176			return None

Scraper for JSON-based data

def parse(self, data):
166	def parse(self, data):
167		"""
168		Parse data as JSON
169
170		:param str data:  Incoming JSON-encoded data
171		:return:  Decoded JSON object
172		"""
173		try:
174			return json.loads(data)
175		except json.JSONDecodeError:
176			return None

Parse data as JSON

Parameters
  • str data: Incoming JSON-encoded data
Returns

Decoded JSON object